You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "adigerber (via GitHub)" <gi...@apache.org> on 2023/03/28 14:00:32 UTC
[GitHub] [iceberg] adigerber opened a new issue, #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions
adigerber opened a new issue, #7226:
URL: https://github.com/apache/iceberg/issues/7226
### Apache Iceberg version
1.2.0 (latest release)
### Query engine
Spark
### Please describe the bug 🐞
According to the documentation, when using Iceberg, one should set `spark.sql.extensions` to `org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions`, but setting this property seems to cause an exception to be thrown when trying to write to an Iceberg table using Spark structured streaming.
The exception that is thrown is:
```
Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: days(ts) is not currently supported
=== Streaming Query ===
Identifier: [id = cfb83943-cd87-4c84-bf25-a290e8891e19, runId = ddf71690-7e5d-41f6-8a8e-84c425683a26]
Current Committed Offsets: {}
Current Available Offsets: {MemoryStream[ts#3,a#4,b#5]: 0}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
WriteToMicroBatchDataSource spark_catalog.default.test_iceberg_table, cfb83943-cd87-4c84-bf25-a290e8891e19, [path=test_iceberg_table, fanout-enabled=true, checkpointLocation=/tmp/spark-checkpoint-16659193840247202419], Append
+- StreamingDataSourceV2Relation [ts#3, a#4, b#5], org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@4533d1cf, MemoryStream[ts#3,a#4,b#5]
```
Code to reproduce:
```scala
package com.example
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{Encoders, SparkSession}
import java.nio.file.Files
import java.sql.Timestamp
case class Bla(ts: Timestamp, a: String, b: Double)
object MinEx {
def main(args: Array[String]): Unit = {
val warehouseDir = Files.createTempDirectory("spark-warehouse-iceberg-").toString
val checkpointDir = Files.createTempDirectory("spark-checkpoint-").toString
val spark = SparkSession.builder()
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.config("spark.sql.catalog.spark_catalog.type", "hadoop")
.config("spark.sql.catalog.spark_catalog.warehouse", warehouseDir)
.config("spark.sql.warehouse.dir", warehouseDir)
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.appName("BugRepro")
.master("local[*]")
.enableHiveSupport()
.getOrCreate()
spark.sql("create table test_iceberg_table(ts timestamp, a string, b double) using iceberg partitioned by (days(ts))")
implicit val sqlContext = spark.sqlContext
implicit val encoder = Encoders.product[Bla]
val memStream = MemoryStream[Bla]
val now = System.currentTimeMillis()
val day = 86400000
memStream.addData(List(
Bla(new Timestamp(now), "test", 12.34),
Bla(new Timestamp(now - 1 * day), "test 1d", 33.34),
Bla(new Timestamp(now - 3 * day), "test 3d", 44.34),
Bla(new Timestamp(now - 2 * day), "test 2d", 55.34),
))
memStream.toDF()
.writeStream
.format("iceberg")
.outputMode("append")
.option("path", "test_iceberg_table")
.option("fanout-enabled", true)
.option("checkpointLocation", checkpointDir)
.trigger(Trigger.Once())
.start()
.awaitTermination()
}
}
```
The code works as expected when the statement that configures `spark.sql.extensions` is commented out.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] Marcus-Rosti commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions
Posted by "Marcus-Rosti (via GitHub)" <gi...@apache.org>.
Marcus-Rosti commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1668836893
same issue for me
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions
Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1596258379
Able to reproduce this using:
```sh
rm -rf /tmp/warehouse
rm -rf /tmp/spark-checkpoint
mkdir -p /tmp/warehouse
mkdir -p /tmp/spark-checkpoint
./bin/spark-shell \
--packages org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.0.0 \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.demo.type=hadoop \
--conf spark.sql.catalog.demo.warehouse=/tmp/warehouse \
--conf spark.sql.warehouse.dir=/tmp/warehouse \
--conf spark.sql.defaultCatalog=demo
```
```scala
spark.sql("create table test_iceberg_table(ts timestamp, a string, b double) using iceberg partitioned by (days(ts))")
import java.nio.file.Files
import java.sql.Timestamp
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{Encoders, SparkSession}
case class Bla(ts: Timestamp, a: String, b: Double)
implicit val sqlContext = spark.sqlContext
implicit val encoder = Encoders.product[Bla]
val memStream = MemoryStream[Bla]
val now = System.currentTimeMillis()
val day = 86400000
memStream.addData(List(
Bla(new Timestamp(now), "test", 12.34),
Bla(new Timestamp(now - 1 * day), "test 1d", 33.34),
Bla(new Timestamp(now - 3 * day), "test 3d", 44.34),
Bla(new Timestamp(now - 2 * day), "test 2d", 55.34),
))
memStream.toDF()
.writeStream
.format("iceberg")
.outputMode("append")
.option("path", "/tmp/warehouse/test_iceberg_table")
.option("checkpointLocation", "/tmp/spark-checkpoint")
.option("fanout-enabled", true)
.trigger(Trigger.Once())
.start()
.awaitTermination()
```
Any idea @aokolnychyi? Not really looking forward to bisecting between 0.13.1 and 1.0.0 :3
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions
Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1603956247
Closing in. One workaround is to use `toTable('test_iceberg_table ')` instead of `start()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
Re: [I] Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions [iceberg]
Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1758774011
If I remember correctly, Spark 3.3 does not have the function catalog. Therefore, we can't resolve Iceberg transforms. I think the right approach is either to use `toTable` in Spark 3.4 and above or fix Spark.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
Re: [I] Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions [iceberg]
Posted by "greg-roberts-bbc (via GitHub)" <gi...@apache.org>.
greg-roberts-bbc commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-2012098543
We've found a workaround in our use case. (Iceberg 1.4.3, Spark 3.3.0 on Glue 4.0).
Our previous flow was:
```
# set up readStream
read_stream = spark.readStream.format(
<setup read stream>
.load()
# dataframe operations
df = read_stream.select(
<various dataframe operations>
)
# setup write stream
write_stream = df.writeStream.format("iceberg").outputMode("append").trigger(
processingTime=job_args["TRIGGER_PROCESSING_TIME"]
).options(**{
"fanout-enabled": job_args["FANOUT_ENABLED"],
"checkpointLocation": job_args["CHECKPOINT_LOCATION"],
}).toTable(TABLE)
```
which always failed on the insert with the above described error.
Our new flow is to use processBatch:
```
def process_batch(df, batch_id):
df = df.select(
<various dataframe operations>
)
df.writeTo(TABLE).append()
read_stream.writeStream.forEachBatch(process_batch).start()
```
The above is for completeness, as we're actually using Glue's inbuilt [`GlueContext.forEachBatch`](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-forEachBatch) but it [does exactly the same thing](https://github.com/awslabs/aws-glue-libs/blob/master/awsglue/context.py#L602).
and this is no longer failing. We're able to write to the table with partition transforms (we're using `hour()` to partition our data).
Interestingly, the data is now being written to S3 as you'd expect for the S3FileIO implementation (i.e. writes are prefixed with a [random string](https://iceberg.apache.org/docs/latest/aws/#object-store-file-layout).
It would be nice to use the inbuilt write triggers as described [in the docs](https://iceberg.apache.org/docs/latest/spark-structured-streaming/#streaming-writes) but we are happy with a working solution. and this allows us to add MERGE behaviour in with SQL.
Hope someone else finds this useful!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] meetsitaram commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions
Posted by "meetsitaram (via GitHub)" <gi...@apache.org>.
meetsitaram commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1556564782
I had the same issue, but worked when I changed the configs from `spark.sql.catalog.spark_catalog` to `spark.sql.catalog.iceberg`.
```scala
SparkConf()
.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
.set("spark.sql.catalog.iceberg.catalog-impl", "hadoop")
.set("spark.sql.catalog.iceberg.catalog.warehouse", config.warehouse)
.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions
Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1634209715
@shuvaevv Sorry for the late reply, I missed this one. Do you have a stacktrace? I've cleaned up the PR, and that should be ready for the next release.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] Marcus-Rosti commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions
Posted by "Marcus-Rosti (via GitHub)" <gi...@apache.org>.
Marcus-Rosti commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1694345235
> Hello, my friends.
>
> I'm using Structured streaming writes to partitioned table, and I'm using Iceberg's transformations to partition.
>
> Trying to record with .start(), I get the same error, but testing to use the approach of passing toTable('iceberg_table') instead of start(), it worked.
>
> I am using Iceberg 1.3.0 and Spark 3.4.0.
>
> This way it didn't work
>
> ```
> df.writeStream.format("iceberg").outputMode("append").trigger(
> once=True
> ).option("path", iceberg_table).option("fanout-enabled", "true").option(
> "checkpointLocation",
> checkpoint_location,
> ).start().awaitTermination()
> ```
>
> This way it worked:
>
> ```
> df.writeStream.format("iceberg").outputMode("append").trigger(
> once=True
> ).option("path", iceberg_table).option("fanout-enabled", "true").option(
> "checkpointLocation",
> checkpoint_location,
> ).toTable(
> iceberg_table
> ).awaitTermination()
> ```
Yeah I got an error with iceberg 1.3 and spark 3.3, that's interesting
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] adigerber commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions
Posted by "adigerber (via GitHub)" <gi...@apache.org>.
adigerber commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1597723086
> @adigerber I know that it is a while ago, but which version of Spark are you using?
Either 3.3.1 or 3.3.2.
IIRC it still happens in Spark 3.3.2 + Iceberg 1.2.1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] shuvaevv commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions
Posted by "shuvaevv (via GitHub)" <gi...@apache.org>.
shuvaevv commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1611676271
> Closing in. One workaround is to use `toTable('test_iceberg_table ')` instead of `start()`.
@Fokko For Spark 3.3.0 and + Iceberg 1.2.1, calling the `toTable` method didn't solve the issue.
Code:
```
df.writeStream
.format("iceberg")
.outputMode("append")
.option("checkpointLocation", checkpointLocation)
.toTable(tableName)
.awaitTermination()
```
Error:
`org.apache.spark.sql.AnalysisException: hours(timestamp) is not currently supported`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] Marcus-Rosti commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions
Posted by "Marcus-Rosti (via GitHub)" <gi...@apache.org>.
Marcus-Rosti commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1669546435
> The stack trace:
>
> ```
> 23/06/19 17:45:53 ERROR MicroBatchExecution: Query [id = 2507e68d-0fb2-4aa2-98e4-427bfa1326c5, runId = 308060c0-6572-4328-9588-ece2b00c054a] terminated with error
> org.apache.spark.sql.AnalysisException: days(ts) is not currently supported
> at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.$anonfun$toCatalyst$1(V2ExpressionUtils.scala:64)
> at scala.Option.getOrElse(Option.scala:189)
> at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.toCatalyst(V2ExpressionUtils.scala:64)
> at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.$anonfun$prepareQuery$1(DistributionAndOrderingUtils.scala:36)
> at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
> at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
> at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
> at scala.collection.TraversableLike.map(TraversableLike.scala:286)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
> at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.prepareQuery(DistributionAndOrderingUtils.scala:36)
> at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:93)
> at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:43)
> at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
> at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
> at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
> at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
> at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
> at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
> at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
> at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:43)
> at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:39)
> at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
> at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
> at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
> at scala.collection.immutable.List.foldLeft(List.scala:91)
> at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
> at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
> at scala.collection.immutable.List.foreach(List.scala:431)
> at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
> at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
> at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
> at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
> at org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:81)
> at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
> at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
> at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
> at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
> at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
> at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:82)
> at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:79)
> at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:136)
> at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:154)
> at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:656)
> at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
> at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
> at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:646)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
> at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
> at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
> at org.apache.spark.sql.execution.streaming.SingleBatchExecutor.execute(TriggerExecutor.scala:39)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
> at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
> at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
> at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
> ```
>
> Spark 3.2 uses a V2 data source.
same stack trace for me on 1.3.1 and spark 3.3
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions
Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1597426712
Spark 3.1 with Iceberg 1.3.0 works fine. It seems that something is off with Spark 3.2+
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions
Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1596255719
@adigerber I know that it is a while ago, but which version of Spark are you using?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
Re: [I] Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions [iceberg]
Posted by "kaijiezhang0319 (via GitHub)" <gi...@apache.org>.
kaijiezhang0319 commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1912673470
hello @aokolnychyi . We are meeting similar issue. Spark 3.3.1 and iceberg 1.1.0. We previously use spark3.2.1 and iceberg 1.1.0 it works fine. And we find the issue when we bump up to spark 3.3.1. Any suggestion is appreciate. thanks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions
Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1596254885
Can reproduce this locally. Found in Iceberg 1.0.0 onward.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions
Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1603932406
Was able to chase this down. Looks like the catalog is not passed down to the execution.
```scala
// TODO (SPARK-27484): we should add the writing node before the plan is analyzed.
sink match {
case s: SupportsWrite =>
val relationOpt = plan.catalogAndIdent.map {
case (catalog, ident) => DataSourceV2Relation.create(s, Some(catalog), Some(ident))
}
WriteToMicroBatchDataSource(
relationOpt,
table = s,
query = _logicalPlan,
queryId = id.toString,
extraOptions,
outputMode)
```
The `relationOpt` is `None`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] Marcus-Rosti commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions
Posted by "Marcus-Rosti (via GitHub)" <gi...@apache.org>.
Marcus-Rosti commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1669778478
toTable wasn't a fix
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
Re: [I] Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions [iceberg]
Posted by "greg-roberts-bbc (via GitHub)" <gi...@apache.org>.
greg-roberts-bbc commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1997509467
We're encountering this issue using iceberg 1.4.3 on Spark 3.3.0 (AWS Glue 4.0) and using `toTable` hasn't fixed the issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions
Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1598538623
Able to reproduce this:
```java
@Test
public void testStructuredStreaming() throws TimeoutException, StreamingQueryException {
sql("DROP TABLE IF EXISTS %s", tableName("structured_streaming"));
sql("create table %s(ts timestamp) using iceberg partitioned by (days(ts))", tableName("structured_streaming"));
List<TimestampRow> nums = new ArrayList<TimestampRow>();
nums.add(new TimestampRow(Timestamp.valueOf("2021-01-01 00:00:00")));
nums.add(new TimestampRow(Timestamp.valueOf("2021-01-02 00:00:00")));
nums.add(new TimestampRow(Timestamp.valueOf("2021-01-03 00:00:00")));
MemoryStream<TimestampRow> ms = new MemoryStream<>(100, spark.sqlContext(), null, Encoders.bean(TimestampRow.class));
ms.addData(JavaConverters.asScalaIteratorConverter(nums.iterator()).asScala().toSeq());
ms.toDF()
.writeStream()
.format("iceberg")
.outputMode("append")
.option("checkpointLocation", "/tmp/spark-checkpoint")
.option("fanout-enabled", true)
.trigger(Trigger.Once())
.toTable(tableName("structured_streaming"))
.awaitTermination();
}
```
Seems that a regular query does not hit `V2ExpressionUtils`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions
Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1687307732
This is a Spark issue, not an Iceberg issue (at least in 3.4). We may consider fixing Spark 3.3 and older but I am not so sure about 3.4. In Spark 3.4, we are relying on the function catalog API to resolve transforms. @Marcus-Rosti, could you confirm `toTable` works in 3.4? I believe you tried 3.3 before.
@Fokko is right that `start()`, unlike `toTable()` does not populate the catalog, hence we can't resolve the transforms. I believe the right solution would be to fix Spark to use `SupportsCatalogOptions` when loading `Table` from `TableProvider`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] arun-dhingra commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions
Posted by "arun-dhingra (via GitHub)" <gi...@apache.org>.
arun-dhingra commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1672887119
same issue for me
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
Re: [I] Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions [iceberg]
Posted by "UtkarshSharma2612 (via GitHub)" <gi...@apache.org>.
UtkarshSharma2612 commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-2069280084
I am writing using structured spark streaming to iceberg, my spark version is 3.4.2 and iceberg version is 2. I am facing this issue also and changing `.option()` to `.toTable` didn't help.
`24/04/22 12:15:45 WARN V2ExpressionUtils: V2 function years with parameter types (timestamp) is used in partition transforms, but its definition couldn't be found in the function catalog provided
24/04/22 12:15:45 WARN V2ExpressionUtils: V2 function years with parameter types (timestamp) is used in partition transforms, but its definition couldn't be found in the function catalog provided
24/04/22 12:15:45 ERROR MicroBatchExecution: Query [id = 5222ef61-86e7-4f64-981c-6f330483730b, runId = c873f460-5c5b-43f3-a182-9bfa09d29b34] terminated with error
org.apache.spark.sql.AnalysisException: years(timestamp) is not currently supported
at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.$anonfun$toCatalyst$1(V2ExpressionUtils.scala:71)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.toCatalyst(V2ExpressionUtils.scala:71)
at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.$anonfun$prepareQuery$2(DistributionAndOrderingUtils.scala:45)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.prepareQuery(DistributionAndOrderingUtils.scala:45)
at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:95)
at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:43)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:39)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
at org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:91)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:83)
at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:83)
at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:153)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:171)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:168)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:717)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:706)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
24/04/22 12:15:45 ERROR MicroBatchExecution: Query [id = 4cf32767-e385-43b7-8550-09125c6f638c, runId = 6de5e2e7-7638-4289-9a5c-aa45f2281889] terminated with error
org.apache.spark.sql.AnalysisException: years(timestamp) is not currently supported
at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.$anonfun$toCatalyst$1(V2ExpressionUtils.scala:71)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.toCatalyst(V2ExpressionUtils.scala:71)
at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.$anonfun$prepareQuery$2(DistributionAndOrderingUtils.scala:45)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.prepareQuery(DistributionAndOrderingUtils.scala:45)
at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:95)
at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:43)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:39)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
at org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:91)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:83)
at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:83)
at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:153)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:171)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:168)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:717)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:706)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
24/04/22 12:15:45 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
24/04/22 12:15:45 INFO AppInfoParser: App info kafka.admin.client for adminclient-1 unregistered
24/04/22 12:15:45 INFO Metrics: Metrics scheduler closed
24/04/22 12:15:45 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
24/04/22 12:15:45 INFO Metrics: Metrics reporters closed
24/04/22 12:15:45 INFO MicroBatchExecution: Async log purge executor pool for query [id = 4cf32767-e385-43b7-8550-09125c6f638c, runId = 6de5e2e7-7638-4289-9a5c-aa45f2281889] has been shutdown
24/04/22 12:15:45 INFO Metrics: Metrics scheduler closed
24/04/22 12:15:45 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
24/04/22 12:15:45 INFO Metrics: Metrics reporters closed
24/04/22 12:15:45 INFO MicroBatchExecution: Async log purge executor pool for query [id = 5222ef61-86e7-4f64-981c-6f330483730b, runId = c873f460-5c5b-43f3-a182-9bfa09d29b34] has been shutdown
24/04/22 12:15:45 ERROR TestJobPipelineImpl: stream error: {}
org.apache.spark.sql.streaming.StreamingQueryException: years(timestamp) is not currently supported
=== Streaming Query ===
Identifier: [id = 4cf32767-e385-43b7-8550-09125c6f638c, runId = 6de5e2e7-7638-4289-9a5c-aa45f2281889]
Current Committed Offsets: {}
Current Available Offsets: {KafkaV2[Subscribe[com.engati.write.user.journey.conversion.data.to.db]]: {"com.engati.write.user.journey.conversion.data.to.db":{"0":729547}}}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
WriteToMicroBatchDataSource RelationV2[bot_ref#441, user_id#442, timestamp#443, source_type#444, conversion_type#445, ad_id#446, ad_source#447, ad_type#448, broadcast_id#449, broadcast_response_type#450, flow_id#451, attribute_id#452] local.user_journey.conversion_analytics local.user_journey.conversion_analytics, local.user_journey.conversion_analytics, 4cf32767-e385-43b7-8550-09125c6f638c, [checkpointLocation=/tmp/checkpointOne, fanout-enabled=true], Append
+- Project [bot_ref#78, user_id#91, timestamp#143, source_type#156, conversion_type#195, ad_id#104, ad_source#117, ad_type#130, broadcast_id#169, broadcast_response_type#182, flow_id#208, attribute_id#221]
+- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43 AS attribute_id#221, bot_ref#78, broadcast_id#169, broadcast_response_type#182, conversion_type#195, flow_id#208, source_type#156, timestamp#143, user_id#91]
+- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43, bot_ref#78, broadcast_id#169, broadcast_response_type#182, conversion_type#195, flowId#48 AS flow_id#208, source_type#156, timestamp#143, user_id#91]
+- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43, bot_ref#78, broadcast_id#169, broadcast_response_type#182, conversionType#47 AS conversion_type#195, flowId#48, source_type#156, timestamp#143, user_id#91]
+- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43, bot_ref#78, broadcast_id#169, broadcastResponseType#46 AS broadcast_response_type#182, conversionType#47, flowId#48, source_type#156, timestamp#143, user_id#91]
+- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43, bot_ref#78, broadcastId#45 AS broadcast_id#169, broadcastResponseType#46, conversionType#47, flowId#48, source_type#156, timestamp#143, user_id#91]
+- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43, bot_ref#78, broadcastId#45, broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49 AS source_type#156, timestamp#143, user_id#91]
+- Project [ad_id#104, ad_source#117, ad_type#130, attributeId#43, bot_ref#78, broadcastId#45, broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49, timestamp#50 AS timestamp#143, user_id#91]
+- Project [ad_id#104, ad_source#117, adType#42 AS ad_type#130, attributeId#43, bot_ref#78, broadcastId#45, broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49, timestamp#50, user_id#91]
+- Project [ad_id#104, adSource#41 AS ad_source#117, adType#42, attributeId#43, bot_ref#78, broadcastId#45, broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49, timestamp#50, user_id#91]
+- Project [adId#40 AS ad_id#104, adSource#41, adType#42, attributeId#43, bot_ref#78, broadcastId#45, broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49, timestamp#50, user_id#91]
+- Project [adId#40, adSource#41, adType#42, attributeId#43, bot_ref#78, broadcastId#45, broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49, timestamp#50, userId#51 AS user_id#91]
+- Project [adId#40, adSource#41, adType#42, attributeId#43, botRef#44 AS bot_ref#78, broadcastId#45, broadcastResponseType#46, conversionType#47, flowId#48, sourceType#49, timestamp#50, userId#51]
+- TypedFilter com.engati.analytics.etl.pipeline.impl.TestJobPipelineImpl$$Lambda$1655/1994143461@2556e117, class com.engati.analytics.etl.extract.models.ConversionDTO, [StructField(adId,StringType,true), StructField(adSource,StringType,true), StructField(adType,StringType,true), StructField(attributeId,IntegerType,true), StructField(botRef,IntegerType,true), StructField(broadcastId,StringType,true), StructField(broadcastResponseType,StringType,true), StructField(conversionType,StringType,true), StructField(flowId,IntegerType,true), StructField(sourceType,StringType,true), StructField(timestamp,TimestampType,true), StructField(userId,StringType,true)], initializejavabean(newInstance(class com.engati.analytics.etl.extract.models.ConversionDTO), (setFlowId,staticinvoke(class java.lang.Integer, ObjectType(class java.lang.Integer), valueOf, cast(flowId#48 as int), true, false, true)), (setConversionType,cast(conversionType#47 as string).toString)
, (setTimestamp,staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, cast(timestamp#50 as timestamp), true, false, true)), (setBotRef,staticinvoke(class java.lang.Integer, ObjectType(class java.lang.Integer), valueOf, cast(botRef#44 as int), true, false, true)), (setBroadcastId,cast(broadcastId#45 as string).toString), (setAdType,cast(adType#42 as string).toString), (setAdSource,cast(adSource#41 as string).toString), (setBroadcastResponseType,cast(broadcastResponseType#46 as string).toString), (setAttributeId,staticinvoke(class java.lang.Integer, ObjectType(class java.lang.Integer), valueOf, cast(attributeId#43 as int), true, false, true)), (setSourceType,cast(sourceType#49 as string).toString), (setUserId,cast(userId#51 as string).toString), (setAdId,cast(adId#40 as string).toString))
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getAdId, true, false, true) AS adId#40, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getAdSource, true, false, true) AS adSource#41, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getAdType, true, false, true) AS adType#42, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getAttributeId.intValue AS attributeId#43, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getB
otRef.intValue AS botRef#44, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getBroadcastId, true, false, true) AS broadcastId#45, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getBroadcastResponseType, true, false, true) AS broadcastResponseType#46, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getConversionType, true, false, true) AS conversionType#47, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getFlowId.intValue AS flowId#48, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnul
l(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getSourceType, true, false, true) AS sourceType#49, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getTimestamp, true, false, true) AS timestamp#50, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, com.engati.analytics.etl.extract.models.ConversionDTO, true])).getUserId, true, false, true) AS userId#51]
+- MapElements com.engati.analytics.etl.pipeline.impl.TestJobPipelineImpl$$Lambda$1641/1836606934@1e93834b, class java.lang.String, [StructField(value,StringType,true)], obj#39: com.engati.analytics.etl.extract.models.ConversionDTO
+- DeserializeToObject cast(value#21 as string).toString, obj#38: java.lang.String
+- Project [cast(value#8 as string) AS value#21]
+- StreamingDataSourceV2Relation [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@48bd89fe, KafkaV2[Subscribe[com.engati.write.user.journey.conversion.data.to.db]]
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:333)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
Caused by: org.apache.spark.sql.AnalysisException: years(timestamp) is not currently supported
at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.$anonfun$toCatalyst$1(V2ExpressionUtils.scala:71)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.toCatalyst(V2ExpressionUtils.scala:71)
at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.$anonfun$prepareQuery$2(DistributionAndOrderingUtils.scala:45)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.prepareQuery(DistributionAndOrderingUtils.scala:45)
at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:95)
at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:43)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:39)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
at org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:91)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:83)
at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:83)
at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:153)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:171)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:168)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:717)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:706)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
... 1 more
24/04/22 12:15:45 INFO SparkContext: Invoking stop() from shutdown hook
24/04/22 12:15:45 INFO SparkContext: SparkContext is stopping with exitCode 0.
24/04/22 12:15:45 INFO SparkUI: Stopped Spark web UI at http://ip-10-12-72-49.ap-south-1.compute.internal:4040
24/04/22 12:15:45 INFO StandaloneSchedulerBackend: Shutting down all executors
24/04/22 12:15:45 INFO StandaloneSchedulerBackend$StandaloneDriverEndpoint: Asking each executor to shut down
24/04/22 12:15:45 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
24/04/22 12:15:45 INFO MemoryStore: MemoryStore cleared
24/04/22 12:15:45 INFO BlockManager: BlockManager stopped
24/04/22 12:15:45 INFO BlockManagerMaster: BlockManagerMaster stopped
24/04/22 12:15:45 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
24/04/22 12:15:45 INFO SparkContext: Successfully stopped SparkContext
24/04/22 12:15:45 INFO ShutdownHookManager: Shutdown hook called
24/04/22 12:15:45 INFO ShutdownHookManager: Deleting directory /tmp/spark-bd10ae64-0bf2-4638-a070-2074fe0aeef7
24/04/22 12:15:45 INFO ShutdownHookManager: Deleting directory /tmp/spark-75a1686e-6c8f-483b-9923-0d09834ccbd3`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] Fokko commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions
Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1597704921
The stack trace:
```
23/06/19 17:45:53 ERROR MicroBatchExecution: Query [id = 2507e68d-0fb2-4aa2-98e4-427bfa1326c5, runId = 308060c0-6572-4328-9588-ece2b00c054a] terminated with error
org.apache.spark.sql.AnalysisException: days(ts) is not currently supported
at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.$anonfun$toCatalyst$1(V2ExpressionUtils.scala:64)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.toCatalyst(V2ExpressionUtils.scala:64)
at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.$anonfun$prepareQuery$1(DistributionAndOrderingUtils.scala:36)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$.prepareQuery(DistributionAndOrderingUtils.scala:36)
at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:93)
at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:43)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:39)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
at org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:81)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:82)
at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:136)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:154)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:656)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:646)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
at org.apache.spark.sql.execution.streaming.SingleBatchExecutor.execute(TriggerExecutor.scala:39)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
```
Spark 3.2 uses a V2 data source.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] FabricioZGalvani commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions
Posted by "FabricioZGalvani (via GitHub)" <gi...@apache.org>.
FabricioZGalvani commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1688947126
Hello, my friends.
I'm using Structured streaming writes to partitioned table, and I'm using Iceberg's transformations to partition.
Trying to record with .start(), I get the same error, but testing to use the approach of passing toTable('iceberg_table') instead of start(), it worked.
I am using Iceberg 1.3.0 and Spark 3.4.0.
This way it didn't work
```
df.writeStream.format("iceberg").outputMode("append").trigger(
once=True
).option("path", iceberg_table).option("fanout-enabled", "true").option(
"checkpointLocation",
checkpoint_location,
).start().awaitTermination()
```
This way it worked:
```
df.writeStream.format("iceberg").outputMode("append").trigger(
once=True
).option("path", iceberg_table).option("fanout-enabled", "true").option(
"checkpointLocation",
checkpoint_location,
).toTable(
iceberg_table
).awaitTermination()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] arundh93 commented on issue #7226: Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions
Posted by "arundh93 (via GitHub)" <gi...@apache.org>.
arundh93 commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-1702381647
> Hello, my friends.
>
> I'm using Structured streaming writes to partitioned table, and I'm using Iceberg's transformations to partition.
>
> Trying to record with .start(), I get the same error, but testing to use the approach of passing toTable('iceberg_table') instead of start(), it worked.
>
> I am using Iceberg 1.3.0 and Spark 3.4.0.
>
> This way it didn't work
>
> df.writeStream.format("iceberg").outputMode("append").trigger(
> once=True
> ).option("path", iceberg_table).option("fanout-enabled", "true").option(
> "checkpointLocation",
> checkpoint_location,
> ).start().awaitTermination()
> This way it worked:
>
> df.writeStream.format("iceberg").outputMode("append").trigger(
> once=True
> ).option("path", iceberg_table).option("fanout-enabled", "true").option(
> "checkpointLocation",
> checkpoint_location,
> ).toTable(
> iceberg_table
> ).awaitTermination()
For me as well toTable works in spark 3.4
However it fails on a multi executor setup
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
Re: [I] Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions [iceberg]
Posted by "stevenlii (via GitHub)" <gi...@apache.org>.
stevenlii commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-2047156687
> We've found a workaround in our use case. (Iceberg 1.4.3, Spark 3.3.0 on Glue 4.0).
>
> Our previous flow was:
>
> ```
> TABLE = "glue_catalog.<database>.<table>"
>
> # set up readStream
> read_stream = spark.readStream.format(
> <setup read stream>
> .load()
>
> # dataframe operations
> df = read_stream.select(
> <various dataframe operations>
> )
>
> # setup write stream
> write_stream = df.writeStream.format("iceberg").outputMode("append").trigger(
> processingTime=job_args["TRIGGER_PROCESSING_TIME"]
> ).options(**{
> "fanout-enabled": job_args["FANOUT_ENABLED"],
> "checkpointLocation": job_args["CHECKPOINT_LOCATION"],
> }).toTable(TABLE)
> ```
>
> which always failed on the insert with the above described error.
>
> Our new flow is to use processBatch:
>
> ```
> def process_batch(df, batch_id):
> df = df.select(
> <various dataframe operations>
> )
>
> df.writeTo(TABLE).append()
>
>
> read_stream.writeStream.forEachBatch(process_batch).start()
> ```
>
> The above is for completeness, as we're actually using Glue's inbuilt [`GlueContext.forEachBatch`](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-forEachBatch) but it [does exactly the same thing](https://github.com/awslabs/aws-glue-libs/blob/master/awsglue/context.py#L602).
>
> and this is no longer failing. We're able to write to the table with partition transforms (we're using `hour()` to partition our data).
>
> Interestingly, the data is now being written to S3 as you'd expect for the S3FileIO implementation (i.e. writes are prefixed with a [random string](https://iceberg.apache.org/docs/latest/aws/#object-store-file-layout), where previously this wasn't happening.
>
> It would be nice to use the inbuilt write triggers as described [in the docs](https://iceberg.apache.org/docs/latest/spark-structured-streaming/#streaming-writes) but we are happy with a working solution. and this allows us to add MERGE behaviour in with SQL.
>
> Hope someone else finds this useful!
But this way there is no checkpointLocation. How do you manage the offset?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
Re: [I] Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions [iceberg]
Posted by "greg-roberts-bbc (via GitHub)" <gi...@apache.org>.
greg-roberts-bbc commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-2049509117
@stevenlii
> But this way there is no checkpointLocation. How do you manage the offset?
As I said, we're using `GlueContext.forEachBatch` which allows you to specify the checkpoint location as follows:
```
glue_context.forEachBatch(
frame=read_stream,
batch_function=process_batch,
options={
"windowSize": job_args["TRIGGER_PROCESSING_TIME"],
"fanout-enabled": job_args["FANOUT_ENABLED"],
"checkpointLocation": job_args["CHECKPOINT_LOCATION"],
"persistDataFrame": "false",
}
)
```
The equivalent call to `writeStream.forEachBatch` would be something like:
`read_stream.writeStream.foreachBatch(process_batch).trigger(processingTime=windowSize).option("checkpointLocation", checkpointLocation)`
As can be seen in the Glue source code I linked above, here: https://github.com/awslabs/aws-glue-libs/blob/master/awsglue/context.py#L641
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org