You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/09/22 21:53:47 UTC

[GitHub] [iceberg] kbendick commented on issue #2962: Parquet 1.11.1 update causes regressions while reading iceberg data written with v1.11.0

kbendick commented on issue #2962:
URL: https://github.com/apache/iceberg/issues/2962#issuecomment-925357611


   Hi @prodeezy @hankfanchiu , sorry for the delay in getting back to you on this. I was looking into a different parquet issue,[ PARQUET-2078](https://issues.apache.org/jira/browse/PARQUET-2078) so my apologies for overlooking this one.
   
   I'm also able to reproduce this using Spark 3.1.2 with Iceberg 0.9.0 (compile only paquet@1.11.0) to Iceberg 0.11.0 (compile only parquet@1.11.1).
   
   I get the same error
   ```
   java.lang.IllegalArgumentException: [mapCol, key_value, key] required binary key (STRING) = 2 is not in the store: [] 12
   	at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ColumnChunkPageReadStore.getPageReader(ColumnChunkPageReadStore.java:231)
   	at org.apache.iceberg.parquet.ParquetValueReaders$PrimitiveReader.setPageSource(ParquetValueReaders.java:154)
   	at org.apache.iceberg.parquet.ParquetValueReaders$RepeatedKeyValueReader.setPageSource(ParquetValueReaders.java:487)
   	at org.apache.iceberg.parquet.ParquetValueReaders$StructReader.setPageSource(ParquetValueReaders.java:643)
   	at org.apache.iceberg.parquet.ParquetReader$FileIterator.advance(ParquetReader.java:139)
   	at org.apache.iceberg.parquet.ParquetReader$FileIterator.next(ParquetReader.java:110)
   	at org.apache.iceberg.spark.source.BaseDataReader.next(BaseDataReader.java:69)
   	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
   	at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
   	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
   	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
   	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
   	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
   	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
   	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
   	at org.apache.spark.scheduler.Task.run(Task.scala:131)
   	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:748)
   ```
   
   Here's my minimal reproduction:
   
   I downloaded the Spark tarball for 3.1.2 from https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz, untarred it, and created a test table similar to yours.
   
   **Notice that by default, it's parquet-1.10.1, so I removed the parquet jars from /opt/spark/jars**
   ```bash
   # Note that this is parquet 1.10.1 using Apache Spark 3.1.2.
   root@spark:/opt/spark# ls -la jars | grep parquet
   parquet-column-1.10.1.jar  parquet-common-1.10.1.jar  parquet-encoding-1.10.1.jar  parquet-format-2.4.0.jar  parquet-hadoop-1.10.1.jar  parquet-jackson-1.10.1.jar
   
   root@spark:/opt/spark# rm -f jars/parquet-*
   
   root@spark-box:/opt/spark# ./bin/spark-shell     --packages org.apache.iceberg:iceberg-spark3-runtime:0.9.0    --driver-memory 2g     --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog     --conf spark.sql.catalog.spark_catalog.type=hive      --conf spark.hadoop.hive.metastore.uris=thrift://hive:9083
   ```
   
   ```scala
   scala>  import spark.implicits._
   import spark.implicits._
   
   scala> import org.apache.spark.sql._
   import org.apache.spark.sql._
   
   scala> import org.apache.spark.sql.functions._
   import org.apache.spark.sql.functions._
   
   scala> import org.apache.spark.sql.expressions._
   import org.apache.spark.sql.expressions._
   
   scala>  spark.sql("CREATE TABLE IF NOT EXISTS test_parquet_map_regression_iceberg_090(mapCol MAP<STRING, STRUCT<payload: STRUCT<bool: BOOLEAN, dbl: DOUBLE, str: STRING>, str: STRING>>) USING ICEBERG TBLPROPERTIES('type'='hive')")
   
   scala> :paste
   // Entering paste mode (ctrl-D to finish)
   
       var df = spark.range(NUM_ROWS)
         .withColumnRenamed("id", "longCol")
         .withColumn("intCol", expr("CAST(longCol AS INT)"))
         .withColumn("dbl", expr("CAST(longCol AS DOUBLE)"))
         .withColumn("str", expr("CAST(longCol AS STRING)"))
         .withColumn("bool", expr("IF(intCol % 2 = 0, true, false)"))
         .withColumn("payload", struct($"bool", $"dbl", $"str"))
         .withColumn("value", struct($"payload", $"str"))
         .withColumn("mapCol", map($"str", $"value"))
         .select("mapCol")
   
   // Exiting paste mode, now interpreting.
   
   df: org.apache.spark.sql.DataFrame = [mapCol: map<string,struct<payload:struct<bool:boolean,dbl:double,str:string>,str:string>>]
   
   scala> df.writeTo("default.test_parquet_map_regression_iceberg_090").append
   
   scala>  spark.table("default.test_parquet_map_regression_iceberg_090").show(false)
   +-------------------------------+
   |mapCol                         |
   +-------------------------------+
   |{0 -> {{true, 0.0, 0}, 0}}     |
   |{1 -> {{false, 1.0, 1}, 1}}    |
   |{2 -> {{true, 2.0, 2}, 2}}     |
   |{3 -> {{false, 3.0, 3}, 3}}    |
   |{4 -> {{true, 4.0, 4}, 4}}     |
   |{5 -> {{false, 5.0, 5}, 5}}    |
   |{6 -> {{true, 6.0, 6}, 6}}     |
   |{7 -> {{false, 7.0, 7}, 7}}    |
   |{8 -> {{true, 8.0, 8}, 8}}     |
   |{9 -> {{false, 9.0, 9}, 9}}    |
   |{10 -> {{true, 10.0, 10}, 10}} |
   |{11 -> {{false, 11.0, 11}, 11}}|
   |{12 -> {{true, 12.0, 12}, 12}} |
   |{13 -> {{false, 13.0, 13}, 13}}|
   |{14 -> {{true, 14.0, 14}, 14}} |
   |{15 -> {{false, 15.0, 15}, 15}}|
   |{16 -> {{true, 16.0, 16}, 16}} |
   |{17 -> {{false, 17.0, 17}, 17}}|
   |{18 -> {{true, 18.0, 18}, 18}} |
   |{19 -> {{false, 19.0, 19}, 19}}|
   +-------------------------------+
   only showing top 20 rows
   
   scala> :quit
   ```
   
   I then upgraded from Iceberg 0.9.0 (compiled against Parquet 0.11.0) to Iceberg 0.10.0 (compiled against Parquet 0.11.1)
   
   First, I tried writing a new table (to ensure that data is working when using the new format).
   ```bash
   root@spark-box:/opt/spark#  ./bin/spark-shell     --packages org.apache.iceberg:iceberg-spark3-runtime:0.10.0    --driver-memory 2g     --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog     --conf spark.sql.catalog.spark_catalog.type=hive      --conf spark.hadoop.hive.metastore.uris=thrift://hive-box:9083
   ```
   
   First, I tried writing a new table (to ensure that data is working when using the new format).
   ```scala
   scala>  spark.sql("CREATE TABLE IF NOT EXISTS test_parquet_map_regression_iceberg_010(mapCol MAP<STRING, STRUCT<payload: STRUCT<bool: BOOLEAN, dbl: DOUBLE, str: STRING>, str: STRING>>) USING ICEBERG TBLPROPERTIES('type'='hive')")
   
   ... create df as above ....
   
   scala> spark.table("default.test_parquet_map_regression_iceberg_010").show(false)
   +-------------------------------+
   |mapCol                         |
   +-------------------------------+
   |{0 -> {{true, 0.0, 0}, 0}}     |
   |{1 -> {{false, 1.0, 1}, 1}}    |
   |{2 -> {{true, 2.0, 2}, 2}}     |
   |{3 -> {{false, 3.0, 3}, 3}}    |
   |{4 -> {{true, 4.0, 4}, 4}}     |
   |{5 -> {{false, 5.0, 5}, 5}}    |
   |{6 -> {{true, 6.0, 6}, 6}}     |
   |{7 -> {{false, 7.0, 7}, 7}}    |
   |{8 -> {{true, 8.0, 8}, 8}}     |
   |{9 -> {{false, 9.0, 9}, 9}}    |
   |{10 -> {{true, 10.0, 10}, 10}} |
   |{11 -> {{false, 11.0, 11}, 11}}|
   |{12 -> {{true, 12.0, 12}, 12}} |
   |{13 -> {{false, 13.0, 13}, 13}}|
   |{14 -> {{true, 14.0, 14}, 14}} |
   |{15 -> {{false, 15.0, 15}, 15}}|
   |{16 -> {{true, 16.0, 16}, 16}} |
   |{17 -> {{false, 17.0, 17}, 17}}|
   |{18 -> {{true, 18.0, 18}, 18}} |
   |{19 -> {{false, 19.0, 19}, 19}}|
   +-------------------------------+
   only showing top 20 rows
   ```
   
   So Parquet 1.11.1 data can be written and then read.
   
   However, the older data written with Parquet 1.11.0 cannot be read on the newer Iceberg version.
   
   ```scala
   scala> spark.table("default.test_parquet_map_regression_iceberg_090").show(false)
   21/09/22 21:49:24 ERROR executor.Executor: Exception in task 0.0 in stage 2.0 (TID 2)
   java.lang.IllegalArgumentException: [mapCol, map, key] required binary key (STRING) = 2 is not in the store: [] 12
   	at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ColumnChunkPageReadStore.getPageReader(ColumnChunkPageReadStore.java:231)
   	at org.apache.iceberg.parquet.ParquetValueReaders$PrimitiveReader.setPageSource(ParquetValueReaders.java:185)
   	at org.apache.iceberg.parquet.ParquetValueReaders$RepeatedKeyValueReader.setPageSource(ParquetValueReaders.java:529)
   	at org.apache.iceberg.parquet.ParquetValueReaders$StructReader.setPageSource(ParquetValueReaders.java:685)
   	at org.apache.iceberg.parquet.ParquetReader$FileIterator.advance(ParquetReader.java:142)
   	at org.apache.iceberg.parquet.ParquetReader$FileIterator.next(ParquetReader.java:112)
   	at org.apache.iceberg.spark.source.BaseDataReader.next(BaseDataReader.java:81)
   	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
   	at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
   	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
   	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
   	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
   	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
   	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
   	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
   	at org.apache.spark.scheduler.Task.run(Task.scala:131)
   	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:748)
   ```
   
   So this issue has actually been present for this particular map data since Iceberg 0.9.0 -> Iceberg 0.10.0 Upgrade (which is when we went from Parquet 1.11.0 -> Parquet 1.11.1).
   
   I also tried with Iceberg 0.12.0, and as expected, the data written from Iceberg 0.10.0 is readable, but the data from Iceberg 0.9.0 (Parquet 1.11.0) is not readable.
   
   We should investigate if there's a fix in upcoming Parquet 1.12.1.
   
   I'm also going to check to see if this issue affects OSS Spark and not just Iceberg (I suspect that it does but will double check).
   
   cc @rdblue 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org