You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Genmao Yu (JIRA)" <ji...@apache.org> on 2019/05/14 09:21:00 UTC
[jira] [Commented] (SPARK-26278) V2 Streaming sources cannot be
written to V1 sinks
[ https://issues.apache.org/jira/browse/SPARK-26278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16839246#comment-16839246 ]
Genmao Yu commented on SPARK-26278:
-----------------------------------
[~jpolchlo] Could you please close this jira? This issue has been fixed.
> V2 Streaming sources cannot be written to V1 sinks
> --------------------------------------------------
>
> Key: SPARK-26278
> URL: https://issues.apache.org/jira/browse/SPARK-26278
> Project: Spark
> Issue Type: Bug
> Components: Input/Output, Structured Streaming
> Affects Versions: 2.3.2
> Reporter: Justin Polchlopek
> Priority: Major
>
> Starting from a streaming DataFrame derived from a custom v2 MicroBatch reader, we have
> {code:java}
> val df: DataFrame = ...
> assert(df.isStreaming)
> val outputFormat = "orc" // also applies to "csv" and "json" but not "console"
> df.writeStream
> .format(outputFormat)
> .option("checkpointLocation", "/tmp/checkpoints")
> .option("path", "/tmp/result")
> .start
> {code}
> This code fails with the following stack trace:
> {code:java}
> 2018-12-04 08:24:27 ERROR MicroBatchExecution:91 - Query [id = 193f97bf-8064-4658-8aa6-0f481919eafe, runId = e96ed7e5-aaf4-4ef4-a3f3-05fe0b01a715] terminated with error
> java.lang.ClassCastException: org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to org.apache.spark.sql.sources.v2.reader.streaming.Offset
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:405)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:390)
> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
> at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
> at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
> at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:389)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
> at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
> at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
> at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
> at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
> at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189){code}
> I'm filing this issue on the suggestion of [~mojodna] who suggests that this problem could be resolved by backporting streaming sinks from spark 2.4.0.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org