You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "adigerber (via GitHub)" <gi...@apache.org> on 2023/03/28 14:00:32 UTC

[GitHub] [iceberg] adigerber opened a new issue, #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions

adigerber opened a new issue, #7226:
URL: https://github.com/apache/iceberg/issues/7226

   ### Apache Iceberg version
   
   1.2.0 (latest release)
   
   ### Query engine
   
   Spark
   
   ### Please describe the bug 🐞
   
   According to the documentation, when using Iceberg, one should set `spark.sql.extensions` to `org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions`, but setting this property seems to cause an exception to be thrown when trying to write to an Iceberg table using Spark structured streaming.
   
   The exception that is thrown is:
   
   ```
   Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: days(ts) is not currently supported
   === Streaming Query ===
   Identifier: [id = cfb83943-cd87-4c84-bf25-a290e8891e19, runId = ddf71690-7e5d-41f6-8a8e-84c425683a26]
   Current Committed Offsets: {}
   Current Available Offsets: {MemoryStream[ts#3,a#4,b#5]: 0}
   
   Current State: ACTIVE
   Thread State: RUNNABLE
   
   Logical Plan:
   WriteToMicroBatchDataSource spark_catalog.default.test_iceberg_table, cfb83943-cd87-4c84-bf25-a290e8891e19, [path=test_iceberg_table, fanout-enabled=true, checkpointLocation=/tmp/spark-checkpoint-16659193840247202419], Append
   +- StreamingDataSourceV2Relation [ts#3, a#4, b#5], org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@4533d1cf, MemoryStream[ts#3,a#4,b#5]
   ```
   
   Code to reproduce:
   
   ```scala
   package com.example
   
   import org.apache.spark.sql.execution.streaming.MemoryStream
   import org.apache.spark.sql.streaming.Trigger
   import org.apache.spark.sql.{Encoders, SparkSession}
   
   import java.nio.file.Files
   import java.sql.Timestamp
   
   case class Bla(ts: Timestamp, a: String, b: Double)
   
   object MinEx {
     def main(args: Array[String]): Unit = {
       val warehouseDir = Files.createTempDirectory("spark-warehouse-iceberg-").toString
       val checkpointDir = Files.createTempDirectory("spark-checkpoint-").toString
       val spark = SparkSession.builder()
         .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
         .config("spark.sql.catalog.spark_catalog.type", "hadoop")
         .config("spark.sql.catalog.spark_catalog.warehouse", warehouseDir)
         .config("spark.sql.warehouse.dir", warehouseDir)
         .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
         .appName("BugRepro")
         .master("local[*]")
         .enableHiveSupport()
         .getOrCreate()
   
       spark.sql("create table test_iceberg_table(ts timestamp, a string, b double) using iceberg partitioned by (days(ts))")
   
       implicit val sqlContext = spark.sqlContext
       implicit val encoder = Encoders.product[Bla]
       val memStream = MemoryStream[Bla]
       val now = System.currentTimeMillis()
       val day = 86400000
       memStream.addData(List(
         Bla(new Timestamp(now), "test", 12.34),
         Bla(new Timestamp(now - 1 * day), "test 1d", 33.34),
         Bla(new Timestamp(now - 3 * day), "test 3d", 44.34),
         Bla(new Timestamp(now - 2 * day), "test 2d", 55.34),
       ))
   
       memStream.toDF()
         .writeStream
         .format("iceberg")
         .outputMode("append")
         .option("path", "test_iceberg_table")
         .option("fanout-enabled", true)
         .option("checkpointLocation", checkpointDir)
         .trigger(Trigger.Once())
         .start()
         .awaitTermination()
     }
   }
   ```
   
   The code works as expected when the statement that configures `spark.sql.extensions` is commented out.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Marcus-Rosti commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions

Posted by "Marcus-Rosti (via GitHub)" <gi...@apache.org>.
Marcus-Rosti commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1668836893

   same issue for me


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1596258379

   Able to reproduce this using:
   
   ```sh
   rm -rf /tmp/warehouse
   rm -rf /tmp/spark-checkpoint
   
   mkdir -p /tmp/warehouse
   mkdir -p /tmp/spark-checkpoint
   
   ./bin/spark-shell \
    --packages org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.0.0 \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.demo.type=hadoop \
    --conf spark.sql.catalog.demo.warehouse=/tmp/warehouse \
    --conf spark.sql.warehouse.dir=/tmp/warehouse \
    --conf spark.sql.defaultCatalog=demo 
   ```
   
   ```scala
   spark.sql("create table test_iceberg_table(ts timestamp, a string, b double) using iceberg partitioned by (days(ts))")
   
   
   import java.nio.file.Files
   import java.sql.Timestamp
   import org.apache.spark.sql.execution.streaming.MemoryStream
   import org.apache.spark.sql.streaming.Trigger
   import org.apache.spark.sql.{Encoders, SparkSession}
   
   case class Bla(ts: Timestamp, a: String, b: Double)
   
   implicit val sqlContext = spark.sqlContext
   implicit val encoder = Encoders.product[Bla]
   val memStream = MemoryStream[Bla]
   val now = System.currentTimeMillis()
   val day = 86400000
   memStream.addData(List(
     Bla(new Timestamp(now), "test", 12.34),
     Bla(new Timestamp(now - 1 * day), "test 1d", 33.34),
     Bla(new Timestamp(now - 3 * day), "test 3d", 44.34),
     Bla(new Timestamp(now - 2 * day), "test 2d", 55.34),
   ))
   
   memStream.toDF()
     .writeStream
     .format("iceberg")
     .outputMode("append")
     .option("path", "/tmp/warehouse/test_iceberg_table")
     .option("checkpointLocation", "/tmp/spark-checkpoint")
     .option("fanout-enabled", true)
     .trigger(Trigger.Once())
     .start()
     .awaitTermination()
   ```
   
   Any idea @aokolnychyi? Not really looking forward to bisecting between 0.13.1 and 1.0.0 :3


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1603956247

   Closing in. One workaround is to use `toTable('test_iceberg_table ')` instead of `start()`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [I] Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions [iceberg]

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1758774011

   If I remember correctly, Spark 3.3 does not have the function catalog. Therefore, we can't resolve Iceberg transforms. I think the right approach is either to use `toTable` in Spark 3.4 and above or fix Spark.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [I] Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions [iceberg]

Posted by "greg-roberts-bbc (via GitHub)" <gi...@apache.org>.
greg-roberts-bbc commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-2012098543

   We've found a workaround in our use case. (Iceberg 1.4.3, Spark 3.3.0 on Glue 4.0).
   
   Our previous flow was:
   
   ```
   # set up readStream
   read_stream = spark.readStream.format(
   <setup read stream>
   .load()
   
   # dataframe operations
   df = read_stream.select(
   <various dataframe operations>
   )
   
   # setup write stream
   write_stream = df.writeStream.format("iceberg").outputMode("append").trigger(
       processingTime=job_args["TRIGGER_PROCESSING_TIME"]
   ).options(**{
       "fanout-enabled": job_args["FANOUT_ENABLED"],
       "checkpointLocation": job_args["CHECKPOINT_LOCATION"],
   }).toTable(TABLE)
   ```
   
   which always failed on the insert with the above described error.
   
   Our new flow is to use processBatch:
   
   ```
   def process_batch(df, batch_id):
       df = df.select(
       <various dataframe operations>
       )
   
       df.writeTo(TABLE).append()
   
   
   read_stream.writeStream.forEachBatch(process_batch).start()
   ```
   
   The above is for completeness, as we're actually using Glue's inbuilt [`GlueContext.forEachBatch`](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-forEachBatch) but it [does exactly the same thing](https://github.com/awslabs/aws-glue-libs/blob/master/awsglue/context.py#L602).
   
   and this is no longer failing. We're able to write to the table with partition transforms (we're using `hour()` to partition our data).
   
   Interestingly, the data is now being written to S3 as you'd expect for the S3FileIO implementation (i.e. writes are prefixed with a [random string](https://iceberg.apache.org/docs/latest/aws/#object-store-file-layout).
   
   It would be nice to use the inbuilt write triggers as described [in the docs](https://iceberg.apache.org/docs/latest/spark-structured-streaming/#streaming-writes) but we are happy with a working solution. and this allows us to add MERGE behaviour in with SQL. 
   
   Hope someone else finds this useful!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] meetsitaram commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions

Posted by "meetsitaram (via GitHub)" <gi...@apache.org>.
meetsitaram commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1556564782

   I had the same issue, but worked when I changed the configs from `spark.sql.catalog.spark_catalog` to `spark.sql.catalog.iceberg`. 
   
   ```scala
   SparkConf()
       .set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
       .set("spark.sql.catalog.iceberg.catalog-impl", "hadoop")
       .set("spark.sql.catalog.iceberg.catalog.warehouse", config.warehouse)
       .set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
   ```    


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1634209715

   @shuvaevv Sorry for the late reply, I missed this one. Do you have a stacktrace? I've cleaned up the PR, and that should be ready for the next release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Marcus-Rosti commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions

Posted by "Marcus-Rosti (via GitHub)" <gi...@apache.org>.
Marcus-Rosti commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1694345235

   > Hello, my friends.
   > 
   > I'm using Structured streaming writes to partitioned table, and I'm using Iceberg's transformations to partition.
   > 
   > Trying to record with .start(), I get the same error, but testing to use the approach of passing toTable('iceberg_table') instead of start(), it worked.
   > 
   > I am using Iceberg 1.3.0 and Spark 3.4.0.
   > 
   > This way it didn't work
   > 
   > ```
   > df.writeStream.format("iceberg").outputMode("append").trigger(
   >     once=True
   > ).option("path", iceberg_table).option("fanout-enabled", "true").option(
   >     "checkpointLocation",
   >     checkpoint_location,
   > ).start().awaitTermination()
   > ```
   > 
   > This way it worked:
   > 
   > ```
   > df.writeStream.format("iceberg").outputMode("append").trigger(
   >     once=True
   > ).option("path", iceberg_table).option("fanout-enabled", "true").option(
   >     "checkpointLocation",
   >     checkpoint_location,
   > ).toTable(
   >     iceberg_table
   > ).awaitTermination()
   > ```
   
   Yeah I got an error with iceberg 1.3 and spark 3.3, that's interesting


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] adigerber commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions

Posted by "adigerber (via GitHub)" <gi...@apache.org>.
adigerber commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1597723086

   > @adigerber I know that it is a while ago, but which version of Spark are you using?
   
   Either 3.3.1 or 3.3.2.
   IIRC it still happens in Spark 3.3.2 + Iceberg 1.2.1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] shuvaevv commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions

Posted by "shuvaevv (via GitHub)" <gi...@apache.org>.
shuvaevv commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1611676271

   > Closing in. One workaround is to use `toTable('test_iceberg_table ')` instead of `start()`.
   
   @Fokko For Spark 3.3.0 and + Iceberg 1.2.1, calling the `toTable` method didn't solve the issue.
   Code:
   ```
   df.writeStream
     .format("iceberg")
     .outputMode("append")
     .option("checkpointLocation", checkpointLocation)
     .toTable(tableName)
     .awaitTermination()
   ```
   
   Error: 
   `org.apache.spark.sql.AnalysisException: hours(timestamp) is not currently supported`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Marcus-Rosti commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions

Posted by "Marcus-Rosti (via GitHub)" <gi...@apache.org>.
Marcus-Rosti commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1669546435

   > The stack trace:
   > 
   > ```
   > 23/06/19 17:45:53 ERROR MicroBatchExecution: Query [id = 2507e68d-0fb2-4aa2-98e4-427bfa1326c5, runId = 308060c0-6572-4328-9588-ece2b00c054a] terminated with error
   > org.apache.spark.sql.AnalysisException: days(ts) is not currently supported
   > 	at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.$anonfun$toCatalyst$1(V2ExpressionUtils.scala:64)
   > 	at scala.Option.getOrElse(Option.scala:189)
   > 	at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.toCatalyst(V2ExpressionUtils.scala:64)
   > 	at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.$anonfun$prepareQuery$1(DistributionAndOrderingUtils.scala:36)
   > 	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
   > 	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
   > 	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
   > 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
   > 	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
   > 	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
   > 	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
   > 	at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.prepareQuery(DistributionAndOrderingUtils.scala:36)
   > 	at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:93)
   > 	at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:43)
   > 	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
   > 	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
   > 	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
   > 	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
   > 	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
   > 	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
   > 	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
   > 	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
   > 	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
   > 	at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:43)
   > 	at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:39)
   > 	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
   > 	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
   > 	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
   > 	at scala.collection.immutable.List.foldLeft(List.scala:91)
   > 	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
   > 	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
   > 	at scala.collection.immutable.List.foreach(List.scala:431)
   > 	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
   > 	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
   > 	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
   > 	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
   > 	at org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:81)
   > 	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
   > 	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
   > 	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
   > 	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
   > 	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
   > 	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
   > 	at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:82)
   > 	at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:79)
   > 	at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:136)
   > 	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:154)
   > 	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
   > 	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:656)
   > 	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
   > 	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
   > 	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
   > 	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:646)
   > 	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
   > 	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   > 	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
   > 	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
   > 	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
   > 	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
   > 	at org.apache.spark.sql.execution.streaming.SingleBatchExecutor.execute(TriggerExecutor.scala:39)
   > 	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
   > 	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
   > 	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   > 	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
   > 	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
   > 	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
   > ```
   > 
   > Spark 3.2 uses a V2 data source.
   
   same stack trace for me on 1.3.1 and spark 3.3


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1597426712

   Spark 3.1 with Iceberg 1.3.0 works fine. It seems that something is off with Spark 3.2+


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1596255719

   @adigerber I know that it is a while ago, but which version of Spark are you using?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [I] Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions [iceberg]

Posted by "kaijiezhang0319 (via GitHub)" <gi...@apache.org>.
kaijiezhang0319 commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1912673470

   hello @aokolnychyi .  We are meeting similar issue.  Spark 3.3.1  and iceberg 1.1.0.   We previously use spark3.2.1 and iceberg 1.1.0 it works fine.   And we find the issue when we bump up to spark 3.3.1.     Any suggestion is appreciate. thanks 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1596254885

   Can reproduce this locally. Found in Iceberg 1.0.0 onward.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1603932406

   Was able to chase this down. Looks like the catalog is not passed down to the execution. 
   
   ```scala
       // TODO (SPARK-27484): we should add the writing node before the plan is analyzed.
       sink match {
         case s: SupportsWrite =>
           val relationOpt = plan.catalogAndIdent.map {
             case (catalog, ident) => DataSourceV2Relation.create(s, Some(catalog), Some(ident))
           }
           WriteToMicroBatchDataSource(
             relationOpt,
             table = s,
             query = _logicalPlan,
             queryId = id.toString,
             extraOptions,
             outputMode)
   ```
   
   The `relationOpt` is `None`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Marcus-Rosti commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions

Posted by "Marcus-Rosti (via GitHub)" <gi...@apache.org>.
Marcus-Rosti commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1669778478

   toTable wasn't a fix


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [I] Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions [iceberg]

Posted by "greg-roberts-bbc (via GitHub)" <gi...@apache.org>.
greg-roberts-bbc commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1997509467

   We're encountering this issue using iceberg 1.4.3 on Spark 3.3.0 (AWS Glue 4.0) and using `toTable` hasn't fixed the issue.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1598538623

   Able to reproduce this:
   ```java
   @Test
   public void testStructuredStreaming() throws TimeoutException, StreamingQueryException {
     sql("DROP TABLE IF EXISTS %s", tableName("structured_streaming"));
   
     sql("create table %s(ts timestamp) using iceberg partitioned by (days(ts))", tableName("structured_streaming"));
   
     List<TimestampRow> nums = new ArrayList<TimestampRow>();
     nums.add(new TimestampRow(Timestamp.valueOf("2021-01-01 00:00:00")));
     nums.add(new TimestampRow(Timestamp.valueOf("2021-01-02 00:00:00")));
     nums.add(new TimestampRow(Timestamp.valueOf("2021-01-03 00:00:00")));
   
     MemoryStream<TimestampRow> ms = new MemoryStream<>(100, spark.sqlContext(), null, Encoders.bean(TimestampRow.class));
   
     ms.addData(JavaConverters.asScalaIteratorConverter(nums.iterator()).asScala().toSeq());
   
     ms.toDF()
             .writeStream()
             .format("iceberg")
             .outputMode("append")
             .option("checkpointLocation", "/tmp/spark-checkpoint")
             .option("fanout-enabled", true)
             .trigger(Trigger.Once())
             .toTable(tableName("structured_streaming"))
             .awaitTermination();
   }
   ```
   
   Seems that a regular query does not hit `V2ExpressionUtils`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1687307732

   This is a Spark issue, not an Iceberg issue (at least in 3.4). We may consider fixing Spark 3.3 and older but I am not so sure about 3.4. In Spark 3.4, we are relying on the function catalog API to resolve transforms. @Marcus-Rosti, could you confirm `toTable` works in 3.4? I believe you tried 3.3 before.
   
   @Fokko is right that `start()`, unlike `toTable()` does not populate the catalog, hence we can't resolve the transforms. I believe the right solution would be to fix Spark to use `SupportsCatalogOptions` when loading `Table` from `TableProvider`.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] arun-dhingra commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions

Posted by "arun-dhingra (via GitHub)" <gi...@apache.org>.
arun-dhingra commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1672887119

   same issue for me


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [I] Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions [iceberg]

Posted by "UtkarshSharma2612 (via GitHub)" <gi...@apache.org>.
UtkarshSharma2612 commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-2069280084

   I am writing using structured spark streaming to iceberg, my spark version is 3.4.2 and iceberg version is 2. I am facing this issue also and changing `.option()` to `.toTable` didn't help. 
   `24/04/22 12:15:45 WARN V2ExpressionUtils: V2 function years with parameter types (timestamp) is used in partition transforms, but its definition couldn't be found in the function catalog provided
   24/04/22 12:15:45 WARN V2ExpressionUtils: V2 function years with parameter types (timestamp) is used in partition transforms, but its definition couldn't be found in the function catalog provided
   24/04/22 12:15:45 ERROR MicroBatchExecution: Query [id = 5222ef61-86e7-4f64-981c-6f330483730b, runId = c873f460-5c5b-43f3-a182-9bfa09d29b34] terminated with error
   org.apache.spark.sql.AnalysisException: years(timestamp) is not currently supported
   	at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.$anonfun$toCatalyst$1(V2ExpressionUtils.scala:71)
   	at scala.Option.getOrElse(Option.scala:189)
   	at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.toCatalyst(V2ExpressionUtils.scala:71)
   	at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.$anonfun$prepareQuery$2(DistributionAndOrderingUtils.scala:45)
   	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
   	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
   	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
   	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
   	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
   	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
   	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
   	at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.prepareQuery(DistributionAndOrderingUtils.scala:45)
   	at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:95)
   	at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:43)
   	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
   	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
   	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
   	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
   	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
   	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
   	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
   	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
   	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
   	at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:43)
   	at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:39)
   	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
   	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
   	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
   	at scala.collection.immutable.List.foldLeft(List.scala:91)
   	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
   	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
   	at scala.collection.immutable.List.foreach(List.scala:431)
   	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
   	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
   	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
   	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
   	at org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:91)
   	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
   	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
   	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
   	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
   	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
   	at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:83)
   	at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:83)
   	at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:153)
   	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:171)
   	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:168)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:717)
   	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
   	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
   	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:706)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284)
   	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
   	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
   	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
   	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
   	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
   	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
   	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
   	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
   24/04/22 12:15:45 ERROR MicroBatchExecution: Query [id = 4cf32767-e385-43b7-8550-09125c6f638c, runId = 6de5e2e7-7638-4289-9a5c-aa45f2281889] terminated with error
   org.apache.spark.sql.AnalysisException: years(timestamp) is not currently supported
   	at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.$anonfun$toCatalyst$1(V2ExpressionUtils.scala:71)
   	at scala.Option.getOrElse(Option.scala:189)
   	at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.toCatalyst(V2ExpressionUtils.scala:71)
   	at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.$anonfun$prepareQuery$2(DistributionAndOrderingUtils.scala:45)
   	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
   	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
   	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
   	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
   	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
   	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
   	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
   	at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.prepareQuery(DistributionAndOrderingUtils.scala:45)
   	at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:95)
   	at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:43)
   	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
   	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
   	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
   	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
   	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
   	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
   	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
   	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
   	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
   	at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:43)
   	at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:39)
   	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
   	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
   	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
   	at scala.collection.immutable.List.foldLeft(List.scala:91)
   	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
   	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
   	at scala.collection.immutable.List.foreach(List.scala:431)
   	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
   	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
   	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
   	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
   	at org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:91)
   	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
   	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
   	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
   	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
   	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
   	at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:83)
   	at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:83)
   	at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:153)
   	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:171)
   	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:168)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:717)
   	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
   	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
   	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:706)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284)
   	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
   	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
   	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
   	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
   	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
   	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
   	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
   	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
   24/04/22 12:15:45 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
   24/04/22 12:15:45 INFO AppInfoParser: App info kafka.admin.client for adminclient-1 unregistered
   24/04/22 12:15:45 INFO Metrics: Metrics scheduler closed
   24/04/22 12:15:45 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
   24/04/22 12:15:45 INFO Metrics: Metrics reporters closed
   24/04/22 12:15:45 INFO MicroBatchExecution: Async log purge executor pool for query [id = 4cf32767-e385-43b7-8550-09125c6f638c, runId = 6de5e2e7-7638-4289-9a5c-aa45f2281889] has been shutdown
   24/04/22 12:15:45 INFO Metrics: Metrics scheduler closed
   24/04/22 12:15:45 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
   24/04/22 12:15:45 INFO Metrics: Metrics reporters closed
   24/04/22 12:15:45 INFO MicroBatchExecution: Async log purge executor pool for query [id = 5222ef61-86e7-4f64-981c-6f330483730b, runId = c873f460-5c5b-43f3-a182-9bfa09d29b34] has been shutdown
   24/04/22 12:15:45 ERROR TestJobPipelineImpl: stream error: {}
   org.apache.spark.sql.streaming.StreamingQueryException: years(timestamp) is not currently supported
   === Streaming Query ===
   Identifier: [id = 4cf32767-e385-43b7-8550-09125c6f638c, runId = 6de5e2e7-7638-4289-9a5c-aa45f2281889]
   Current Committed Offsets: {}
   Current Available Offsets: {KafkaV2[Subscribe[com.engati.write.user.journey.conversion.data.to.db]]: {"com.engati.write.user.journey.conversion.data.to.db":{"0":729547}}}
   
   Current State: ACTIVE
   Thread State: RUNNABLE
   
   Logical Plan:
   WriteToMicroBatchDataSource RelationV2[bot_ref#441, user_id#442, timestamp#443, source_type#444, conversion_type#445, ad_id#446, ad_source#447, ad_type#448, broadcast_id#449, broadcast_response_type#450, flow_id#451, attribute_id#452] local.user_journey.conversion_analytics local.user_journey.conversion_analytics, local.user_journey.conversion_analytics, 4cf32767-e385-43b7-8550-09125c6f638c, [checkpointLocation=/tmp/checkpointOne, fanout-enabled=true], Append
   +- Project [bot_ref#78, user_id#91, timestamp#143, source_type#156, conversion_type#195, ad_id#104, ad_source#117, ad_type#130, broadcast_id#169, broadcast_response_type#182, flow_id#208, attribute_id#221]
      +- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43 AS attribute_id#221, bot_ref#78, broadcast_id#169, broadcast_response_type#182, conversion_type#195, flow_id#208, source_type#156, timestamp#143, user_id#91]
         +- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43, bot_ref#78, broadcast_id#169, broadcast_response_type#182, conversion_type#195, flowId#48 AS flow_id#208, source_type#156, timestamp#143, user_id#91]
            +- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43, bot_ref#78, broadcast_id#169, broadcast_response_type#182, conversionType#47 AS conversion_type#195, flowId#48, source_type#156, timestamp#143, user_id#91]
               +- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43, bot_ref#78, broadcast_id#169, broadcastResponseType#46 AS broadcast_response_type#182, conversionType#47, flowId#48, source_type#156, timestamp#143, user_id#91]
                  +- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43, bot_ref#78, broadcastId#45 AS broadcast_id#169, broadcastResponseType#46, conversionType#47, flowId#48, source_type#156, timestamp#143, user_id#91]
                     +- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43, bot_ref#78, broadcastId#45, broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49 AS source_type#156, timestamp#143, user_id#91]
                        +- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43, bot_ref#78, broadcastId#45, broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49, timestamp#50 AS timestamp#143, user_id#91]
                           +- Project [ad_id#104, ad_source#117, adType#42 AS ad_type#130, attributeId#43, bot_ref#78, broadcastId#45, broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49, timestamp#50, user_id#91]
                              +- Project [ad_id#104, adSource#41 AS ad_source#117, adType#42, attributeId#43, bot_ref#78, broadcastId#45, broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49, timestamp#50, user_id#91]
                                 +- Project [adId#40 AS ad_id#104, adSource#41, adType#42, attributeId#43, bot_ref#78, broadcastId#45, broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49, timestamp#50, user_id#91]
                                    +- Project [adId#40, adSource#41, adType#42, attributeId#43, bot_ref#78, broadcastId#45, broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49, timestamp#50, userId#51 AS user_id#91]
                                       +- Project [adId#40, adSource#41, adType#42, attributeId#43, botRef#44 AS bot_ref#78, broadcastId#45, broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49, timestamp#50, userId#51]
                                          +- TypedFilter com.engati.analytics.etl.pipeline.impl.TestJobPipelineImpl$$Lambda$1655/1994143461@2556e117, class com.engati.analytics.etl.extract.models.ConversionDTO, [StructField(adId,StringType,true), StructField(adSource,StringType,true), StructField(adType,StringType,true), StructField(attributeId,IntegerType,true), StructField(botRef,IntegerType,true), StructField(broadcastId,StringType,true), StructField(broadcastResponseType,StringType,true), StructField(conversionType,StringType,true), StructField(flowId,IntegerType,true), StructField(sourceType,StringType,true), StructField(timestamp,TimestampType,true), StructField(userId,StringType,true)], initializejavabean(newInstance(class com.engati.analytics.etl.extract.models.ConversionDTO), (setFlowId,staticinvoke(class java.lang.Integer, ObjectType(class java.lang.Integer), valueOf, cast(flowId#48 as int), true, false, true)), (setConversionType,cast(conversionType#47 as string).toString)
 , (setTimestamp,staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, cast(timestamp#50 as timestamp), true, false, true)), (setBotRef,staticinvoke(class java.lang.Integer, ObjectType(class java.lang.Integer), valueOf, cast(botRef#44 as int), true, false, true)), (setBroadcastId,cast(broadcastId#45 as string).toString), (setAdType,cast(adType#42 as string).toString), (setAdSource,cast(adSource#41 as string).toString), (setBroadcastResponseType,cast(broadcastResponseType#46 as string).toString), (setAttributeId,staticinvoke(class java.lang.Integer, ObjectType(class java.lang.Integer), valueOf, cast(attributeId#43 as int), true, false, true)), (setSourceType,cast(sourceType#49 as string).toString), (setUserId,cast(userId#51 as string).toString), (setAdId,cast(adId#40 as string).toString))
                                             +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getAdId, true, false, true) AS adId#40, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getAdSource, true, false, true) AS adSource#41, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getAdType, true, false, true) AS adType#42, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getAttributeId.intValue AS attributeId#43, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getB
 otRef.intValue AS botRef#44, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getBroadcastId, true, false, true) AS broadcastId#45, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getBroadcastResponseType, true, false, true) AS broadcastResponseType#46, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getConversionType, true, false, true) AS conversionType#47, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getFlowId.intValue AS flowId#48, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnul
 l(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getSourceType, true, false, true) AS sourceType#49, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getTimestamp, true, false, true) AS timestamp#50, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getUserId, true, false, true) AS userId#51]
                                                +- MapElements com.engati.analytics.etl.pipeline.impl.TestJobPipelineImpl$$Lambda$1641/1836606934@1e93834b, class java.lang.String, [StructField(value,StringType,true)], obj#39: com.engati.analytics.etl.extract.models.ConversionDTO
                                                   +- DeserializeToObject cast(value#21 as string).toString, obj#38: java.lang.String
                                                      +- Project [cast(value#8 as string) AS value#21]
                                                         +- StreamingDataSourceV2Relation [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@48bd89fe, KafkaV2[Subscribe[com.engati.write.user.journey.conversion.data.to.db]]
   
   	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:333)
   	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
   Caused by: org.apache.spark.sql.AnalysisException: years(timestamp) is not currently supported
   	at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.$anonfun$toCatalyst$1(V2ExpressionUtils.scala:71)
   	at scala.Option.getOrElse(Option.scala:189)
   	at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.toCatalyst(V2ExpressionUtils.scala:71)
   	at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.$anonfun$prepareQuery$2(DistributionAndOrderingUtils.scala:45)
   	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
   	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
   	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
   	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
   	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
   	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
   	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
   	at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.prepareQuery(DistributionAndOrderingUtils.scala:45)
   	at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:95)
   	at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:43)
   	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
   	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
   	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
   	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
   	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
   	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
   	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
   	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
   	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
   	at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:43)
   	at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:39)
   	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
   	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
   	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
   	at scala.collection.immutable.List.foldLeft(List.scala:91)
   	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
   	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
   	at scala.collection.immutable.List.foreach(List.scala:431)
   	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
   	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
   	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
   	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
   	at org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:91)
   	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
   	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
   	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
   	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
   	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
   	at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:83)
   	at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:83)
   	at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:153)
   	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:171)
   	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:168)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:717)
   	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
   	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
   	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:706)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284)
   	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
   	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
   	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
   	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
   	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
   	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
   	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
   	... 1 more
   24/04/22 12:15:45 INFO SparkContext: Invoking stop() from shutdown hook
   24/04/22 12:15:45 INFO SparkContext: SparkContext is stopping with exitCode 0.
   24/04/22 12:15:45 INFO SparkUI: Stopped Spark web UI at http://ip-10-12-72-49.ap-south-1.compute.internal:4040
   24/04/22 12:15:45 INFO StandaloneSchedulerBackend: Shutting down all executors
   24/04/22 12:15:45 INFO StandaloneSchedulerBackend$StandaloneDriverEndpoint: Asking each executor to shut down
   24/04/22 12:15:45 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
   24/04/22 12:15:45 INFO MemoryStore: MemoryStore cleared
   24/04/22 12:15:45 INFO BlockManager: BlockManager stopped
   24/04/22 12:15:45 INFO BlockManagerMaster: BlockManagerMaster stopped
   24/04/22 12:15:45 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
   24/04/22 12:15:45 INFO SparkContext: Successfully stopped SparkContext
   24/04/22 12:15:45 INFO ShutdownHookManager: Shutdown hook called
   24/04/22 12:15:45 INFO ShutdownHookManager: Deleting directory /tmp/spark-bd10ae64-0bf2-4638-a070-2074fe0aeef7
   24/04/22 12:15:45 INFO ShutdownHookManager: Deleting directory /tmp/spark-75a1686e-6c8f-483b-9923-0d09834ccbd3`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1597704921

   The stack trace:
   
   ```
   23/06/19 17:45:53 ERROR MicroBatchExecution: Query [id = 2507e68d-0fb2-4aa2-98e4-427bfa1326c5, runId = 308060c0-6572-4328-9588-ece2b00c054a] terminated with error
   org.apache.spark.sql.AnalysisException: days(ts) is not currently supported
   	at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.$anonfun$toCatalyst$1(V2ExpressionUtils.scala:64)
   	at scala.Option.getOrElse(Option.scala:189)
   	at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.toCatalyst(V2ExpressionUtils.scala:64)
   	at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.$anonfun$prepareQuery$1(DistributionAndOrderingUtils.scala:36)
   	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
   	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
   	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
   	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
   	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
   	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
   	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
   	at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.prepareQuery(DistributionAndOrderingUtils.scala:36)
   	at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:93)
   	at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:43)
   	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
   	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
   	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
   	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
   	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
   	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
   	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
   	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
   	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
   	at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:43)
   	at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:39)
   	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
   	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
   	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
   	at scala.collection.immutable.List.foldLeft(List.scala:91)
   	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
   	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
   	at scala.collection.immutable.List.foreach(List.scala:431)
   	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
   	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
   	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
   	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
   	at org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:81)
   	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
   	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
   	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
   	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
   	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
   	at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:82)
   	at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:79)
   	at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:136)
   	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:154)
   	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:656)
   	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
   	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
   	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:646)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
   	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
   	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
   	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
   	at org.apache.spark.sql.execution.streaming.SingleBatchExecutor.execute(TriggerExecutor.scala:39)
   	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
   	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
   	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
   	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
   	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
   ```
   
   Spark 3.2 uses a V2 data source.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] FabricioZGalvani commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions

Posted by "FabricioZGalvani (via GitHub)" <gi...@apache.org>.
FabricioZGalvani commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1688947126

   Hello, my friends.
   
   I'm using Structured streaming writes to partitioned table, and I'm using Iceberg's transformations to partition.
   
   Trying to record with .start(), I get the same error, but testing to use the approach of passing toTable('iceberg_table') instead of start(), it worked.
   
   I am using Iceberg 1.3.0 and Spark 3.4.0.
   
   This way it didn't work
   ```
   df.writeStream.format("iceberg").outputMode("append").trigger(
       once=True
   ).option("path", iceberg_table).option("fanout-enabled", "true").option(
       "checkpointLocation",
       checkpoint_location,
   ).start().awaitTermination()
   ```
   
   This way it worked:
   ```
   df.writeStream.format("iceberg").outputMode("append").trigger(
       once=True
   ).option("path", iceberg_table).option("fanout-enabled", "true").option(
       "checkpointLocation",
       checkpoint_location,
   ).toTable(
       iceberg_table
   ).awaitTermination()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] arundh93 commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions

Posted by "arundh93 (via GitHub)" <gi...@apache.org>.
arundh93 commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1702381647

   > Hello, my friends.
   > 
   > I'm using Structured streaming writes to partitioned table, and I'm using Iceberg's transformations to partition.
   > 
   > Trying to record with .start(), I get the same error, but testing to use the approach of passing toTable('iceberg_table') instead of start(), it worked.
   > 
   > I am using Iceberg 1.3.0 and Spark 3.4.0.
   > 
   > This way it didn't work
   > 
   > df.writeStream.format("iceberg").outputMode("append").trigger(
   >     once=True
   > ).option("path", iceberg_table).option("fanout-enabled", "true").option(
   >     "checkpointLocation",
   >     checkpoint_location,
   > ).start().awaitTermination()
   > This way it worked:
   > 
   > df.writeStream.format("iceberg").outputMode("append").trigger(
   >     once=True
   > ).option("path", iceberg_table).option("fanout-enabled", "true").option(
   >     "checkpointLocation",
   >     checkpoint_location,
   > ).toTable(
   >     iceberg_table
   > ).awaitTermination()
   
   For me as well toTable works in spark 3.4
   
   However it fails on a multi executor setup 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [I] Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions [iceberg]

Posted by "stevenlii (via GitHub)" <gi...@apache.org>.
stevenlii commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-2047156687

   > We've found a workaround in our use case. (Iceberg 1.4.3, Spark 3.3.0 on Glue 4.0).
   > 
   > Our previous flow was:
   > 
   > ```
   > TABLE = "glue_catalog.<database>.<table>"
   > 
   > # set up readStream
   > read_stream = spark.readStream.format(
   > <setup read stream>
   > .load()
   > 
   > # dataframe operations
   > df = read_stream.select(
   > <various dataframe operations>
   > )
   > 
   > # setup write stream
   > write_stream = df.writeStream.format("iceberg").outputMode("append").trigger(
   >     processingTime=job_args["TRIGGER_PROCESSING_TIME"]
   > ).options(**{
   >     "fanout-enabled": job_args["FANOUT_ENABLED"],
   >     "checkpointLocation": job_args["CHECKPOINT_LOCATION"],
   > }).toTable(TABLE)
   > ```
   > 
   > which always failed on the insert with the above described error.
   > 
   > Our new flow is to use processBatch:
   > 
   > ```
   > def process_batch(df, batch_id):
   >     df = df.select(
   >     <various dataframe operations>
   >     )
   > 
   >     df.writeTo(TABLE).append()
   > 
   > 
   > read_stream.writeStream.forEachBatch(process_batch).start()
   > ```
   > 
   > The above is for completeness, as we're actually using Glue's inbuilt [`GlueContext.forEachBatch`](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-forEachBatch) but it [does exactly the same thing](https://github.com/awslabs/aws-glue-libs/blob/master/awsglue/context.py#L602).
   > 
   > and this is no longer failing. We're able to write to the table with partition transforms (we're using `hour()` to partition our data).
   > 
   > Interestingly, the data is now being written to S3 as you'd expect for the S3FileIO implementation (i.e. writes are prefixed with a [random string](https://iceberg.apache.org/docs/latest/aws/#object-store-file-layout), where previously this wasn't happening.
   > 
   > It would be nice to use the inbuilt write triggers as described [in the docs](https://iceberg.apache.org/docs/latest/spark-structured-streaming/#streaming-writes) but we are happy with a working solution. and this allows us to add MERGE behaviour in with SQL.
   > 
   > Hope someone else finds this useful!
   
   But this way there is no checkpointLocation. How do you manage the offset?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [I] Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions [iceberg]

Posted by "greg-roberts-bbc (via GitHub)" <gi...@apache.org>.
greg-roberts-bbc commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-2049509117

   @stevenlii 
   
   > But this way there is no checkpointLocation. How do you manage the offset?
   
   As I said, we're using `GlueContext.forEachBatch` which allows you to specify the checkpoint location as follows:
   
   ```
   glue_context.forEachBatch(
       frame=read_stream,
       batch_function=process_batch,
       options={
           "windowSize": job_args["TRIGGER_PROCESSING_TIME"],
           "fanout-enabled": job_args["FANOUT_ENABLED"],
           "checkpointLocation": job_args["CHECKPOINT_LOCATION"],
           "persistDataFrame": "false",
       }
   )
   ```
   
   The equivalent call to `writeStream.forEachBatch` would be something like:
   
   `read_stream.writeStream.foreachBatch(process_batch).trigger(processingTime=windowSize).option("checkpointLocation", checkpointLocation)`
   
   As can be seen in the Glue source code I linked above, here: https://github.com/awslabs/aws-glue-libs/blob/master/awsglue/context.py#L641
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org