You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2019/09/09 11:40:53 UTC
[GitHub] [incubator-iceberg] shawnding opened a new issue #464: add optional
stringType column after a new long type column ,
when write data get an exception
shawnding opened a new issue #464: add optional stringType column after a new long type column ,when write data get an exception
URL: https://github.com/apache/incubator-iceberg/issues/464
**step1:** use the api '_table.updateSchema.addColumn()_' , add a new column
import org.apache.spark.sql.SparkSession
import org.apache.iceberg.hive.HiveCatalog
import org.apache.iceberg.Schema
import org.apache.iceberg.types.Types._
import org.apache.iceberg.PartitionSpec
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.log4j.Logger
import java.sql.Timestamp
val schema = new Schema(
NestedField.optional(1, "id", LongType.get())
)
val spec = PartitionSpec.builderFor(schema).identity("id").build()
val catalog = new HiveCatalog(spark.sparkContext.hadoopConfiguration)
val table_name = "iceberg_partition_test_120"
val name = TableIdentifier.of("default", table_name)
val table = catalog.createTable(name, schema, spec)
import spark.implicits._
case class Record(id: Int)
val recordsDF = spark.createDataFrame((1 to 3).map(i => Record(i)))
recordsDF.write.format("iceberg").mode("append").save(s"default.${table_name}")
spark.read.format("iceberg").load(s"default.${table_name}").show
//----show return ----
+---+
| id|
+---+
| 1|
| 2|
| 3|
+---+
table.updateSchema().addColumn("phone", LongType.get()).commit();
val recordsDF = spark.createDataFrame((4 to 6).map(i => Record(i)))
recordsDF.write.format("iceberg").mode("append").save(s"default.${table_name}")
spark.read.format("iceberg").load(s"default.${table_name}").show
//----show return ----( then some dirty numbers in the new column )
+---+-----+
| id|phone|
+---+-----+
| 4| 5| ( dirty data)
| 5| 5| ( dirty data)
| 6| 5| (dirty data )
| 1| null|
| 2| null|
| 3| null|
+---+-----+
table.updateSchema().addColumn("name", StringType.get()).commit();
val recordsDF = spark.createDataFrame((7 to 9).map(i => Record(i)))
recordsDF.write.format("iceberg").mode("append").save(s"default.${table_name}")
//----show return ----( return an Exception)
**the error msg is :**
`Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:64)
... 69 more
Caused by: java.lang.NegativeArraySizeException
at org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:297)
at org.apache.iceberg.spark.data.SparkParquetWriters$UTF8StringWriter.write(SparkParquetWriters.java:246)
at org.apache.iceberg.spark.data.SparkParquetWriters$UTF8StringWriter.write(SparkParquetWriters.java:239)
at org.apache.iceberg.parquet.ParquetValueWriters$OptionWriter.write(ParquetValueWriters.java:264)
at org.apache.iceberg.parquet.ParquetValueWriters$StructWriter.write(ParquetValueWriters.java:446)
at org.apache.iceberg.parquet.ParquetWriter.add(ParquetWriter.java:116)
at org.apache.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:417)
at org.apache.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:370)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:118)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:403)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:409)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Suppressed: java.lang.NullPointerException
at org.apache.iceberg.shaded.org.apache.parquet.format.converter.ParquetMetadataConverter.addRowGroup(ParquetMetadataConverter.java:241)
at org.apache.iceberg.shaded.org.apache.parquet.format.converter.ParquetMetadataConverter.toParquetMetadata(ParquetMetadataConverter.java:129)
at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileWriter.serializeFooter(ParquetFileWriter.java:690)
at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:684)
at org.apache.iceberg.parquet.ParquetWriter.close(ParquetWriter.java:193)
at org.apache.iceberg.spark.source.Writer$PartitionedWriter.abort(Writer.java:435)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$1.apply$mcV$sp(WriteToDataSourceV2Exec.scala:150)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1403)
... 11 more`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org