You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/12/08 13:06:19 UTC

[GitHub] [iceberg] ottensjors opened a new issue, #6388: Spark Structured Streaming - Cannot invoke "org.apache.iceberg.Snapshot.operation()" because "snapshot" is null

ottensjors opened a new issue, #6388:
URL: https://github.com/apache/iceberg/issues/6388

   ### Apache Iceberg version
   
   1.0.0
   
   ### Query engine
   
   Spark
   
   ### Please describe the bug 🐞
   
   Hi team,
   
   i'm currently using apache spark 3.3 with apache-iceberg 1.0.0 and AWS S3 and GlueCatalog-integration. please find my spark-config below:
   
   ```
   # add Iceberg dependency
   ICEBERG_VERSION=1.0.0
   DEPENDENCIES="org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:$ICEBERG_VERSION"
   
   DEPENDENCIES+=",org.apache.iceberg:iceberg-hive-runtime:1.0.0,org.apache.hadoop:hadoop-aws:3.3.2"
   
   # add AWS dependnecy
   AWS_SDK_VERSION=2.17.257
   AWS_MAVEN_GROUP=software.amazon.awssdk
   AWS_PACKAGES=(
       "bundle"
       "url-connection-client"
   )
   for pkg in "${AWS_PACKAGES[@]}"; do
       DEPENDENCIES+=",$AWS_MAVEN_GROUP:$pkg:$AWS_SDK_VERSION"
   done
   
   
   # start Spark SQL client shell --> works with aws glue catalog
   
   spark-shell --packages $DEPENDENCIES \
       --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.my_catalog.warehouse=s3://<s3-bucket>/adhoc-otten/iceberg \
       --conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
       --conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
       --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.hive.metastore.version=2.3.9 \
       --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain \
       --conf hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory    
   ```
   the spark-session has successfully integrated with Glue catalog and AWS S3. which is great. I'm able to use all functionallity when it comes to regular dataframes.
   
   However, I'm currently evaluating apache iceberg with spark structured streaming. Unfortunately i'm running into an issue. below you can find the code i use to eveluate it:
   
   ```
   val df = spark.readStream.format("iceberg").option("stream-from-timestamp",1670487707799L).option("streaming-skip-overwrite-snapshots","true").load("<table_name>")
   
   df: org.apache.spark.sql.DataFrame = [ITEM_ID: int, ITEM_KEY: bigint ... 5 more fields]
   
   df.writeStream.format("iceberg").outputMode("append").format("iceberg").option("path","default.dimitem_stream").option("checkpointLocation","s3a://<bucket-name>/adhoc-otten/iceberg/default/dimitem_stream/checkpoint").start()
   
   res4: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@22dac77c
   ```
   it all seems to go well. but after a few seconds the following error manifests itself:
   
   ```
   22/12/08 14:00:37 ERROR MicroBatchExecution: Query [id = 4db07e32-5e9e-44d9-8a89-7d1c0f64e716, runId = ce0a9e92-efff-4be8-b9f7-ab8ae9ac179c] terminated with error
   org.apache.spark.SparkException: The Spark SQL phase planning failed with an internal error. Please, fill a bug report in, and provide the full stack trace.
   	at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:500)
   	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:512)
   	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
   	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
   	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:145)
   	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:138)
   	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:158)
   	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
   	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
   	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
   	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
   	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
   	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:158)
   	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:657)
   	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
   	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
   	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:647)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
   	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
   	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
   	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
   	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
   	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
   	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
   	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
   	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
   Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.iceberg.Snapshot.operation()" because "snapshot" is null
   	at org.apache.iceberg.spark.source.SparkMicroBatchStream.shouldProcess(SparkMicroBatchStream.java:230)
   	at org.apache.iceberg.spark.source.SparkMicroBatchStream.planFiles(SparkMicroBatchStream.java:211)
   	at org.apache.iceberg.spark.source.SparkMicroBatchStream.planInputPartitions(SparkMicroBatchStream.java:148)
   	at org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec.inputPartitions$lzycompute(MicroBatchScanExec.scala:45)
   	at org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec.inputPartitions(MicroBatchScanExec.scala:45)
   	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar(DataSourceV2ScanExecBase.scala:142)
   	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar$(DataSourceV2ScanExecBase.scala:141)
   	at org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec.supportsColumnar(MicroBatchScanExec.scala:29)
   	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.apply(DataSourceV2Strategy.scala:153)
   	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
   	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
   	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
   	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
   	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
   	at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
   	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
   	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
   	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
   	at scala.collection.Iterator.foreach(Iterator.scala:943)
   	at scala.collection.Iterator.foreach$(Iterator.scala:943)
   	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
   	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
   	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
   	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
   	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
   	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
   	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
   	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
   	at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
   	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
   	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
   	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
   	at scala.collection.Iterator.foreach(Iterator.scala:943)
   	at scala.collection.Iterator.foreach$(Iterator.scala:943)
   	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
   	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
   	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
   	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
   	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
   	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
   	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
   	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
   	at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
   	at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:459)
   	at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:145)
   	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
   	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
   	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
   ```
   
   the main takeaway here is the following line:
   
   `Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.iceberg.Snapshot.operation()" because "snapshot" is null`
   
   it's probably me doing something stupid and being PICNIC (Problem In Chair Not In Computer).
   
   However i went to the sourcecode of apache iceberg and was unable to find any assertion or message that reflects the above statement or any additional information on the internet.
   
   I was wondering if someone could provide me with some much needed insights to get this resolved.
   
   Thanks in advance!
   
   regards,
   
   Sjors Otten
   principal architect
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on issue #6388: Spark Structured Streaming - Cannot invoke "org.apache.iceberg.Snapshot.operation()" because "snapshot" is null

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on issue #6388:
URL: https://github.com/apache/iceberg/issues/6388#issuecomment-1362366455

   Hey Sjors,
   
   Is this happening while snapshot expiration is being performed on the table you're reading from? From my reading of the code this error will happen like this:
   
   1.) [The snapshot ID for current offset no longer exists (my hunch is due to expiration)](https://github.com/apache/iceberg/blob/master/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java#L210). So table.snapshot(currentOffset.snapshotId()) returns null. 
   
   2.) Then we throw an [NPE](https://github.com/apache/iceberg/blob/master/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java#L229) here when trying to get the operation associated with the snapshot.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 closed issue #6388: Spark Structured Streaming - Cannot invoke "org.apache.iceberg.Snapshot.operation()" because "snapshot" is null

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 closed issue #6388: Spark Structured Streaming - Cannot invoke "org.apache.iceberg.Snapshot.operation()" because "snapshot" is null
URL: https://github.com/apache/iceberg/issues/6388


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org