You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Genmao Yu (JIRA)" <ji...@apache.org> on 2019/05/14 09:21:00 UTC

[jira] [Commented] (SPARK-26278) V2 Streaming sources cannot be written to V1 sinks

    [ https://issues.apache.org/jira/browse/SPARK-26278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16839246#comment-16839246 ] 

Genmao Yu commented on SPARK-26278:
-----------------------------------

[~jpolchlo] Could you please close this jira? This issue has been fixed.

> V2 Streaming sources cannot be written to V1 sinks
> --------------------------------------------------
>
>                 Key: SPARK-26278
>                 URL: https://issues.apache.org/jira/browse/SPARK-26278
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output, Structured Streaming
>    Affects Versions: 2.3.2
>            Reporter: Justin Polchlopek
>            Priority: Major
>
> Starting from a streaming DataFrame derived from a custom v2 MicroBatch reader, we have
> {code:java}
> val df: DataFrame = ... 
> assert(df.isStreaming)
> val outputFormat = "orc" // also applies to "csv" and "json" but not "console" 
> df.writeStream
>   .format(outputFormat)
>   .option("checkpointLocation", "/tmp/checkpoints")
>   .option("path", "/tmp/result")
>   .start
> {code}
> This code fails with the following stack trace:
> {code:java}
> 2018-12-04 08:24:27 ERROR MicroBatchExecution:91 - Query [id = 193f97bf-8064-4658-8aa6-0f481919eafe, runId = e96ed7e5-aaf4-4ef4-a3f3-05fe0b01a715] terminated with error
> java.lang.ClassCastException: org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to org.apache.spark.sql.sources.v2.reader.streaming.Offset
>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:405)
>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:390)
>     at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>     at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
>     at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>     at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
>     at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
>     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:389)
>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
>     at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
>     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
>     at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189){code}
> I'm filing this issue on the suggestion of [~mojodna] who suggests that this problem could be resolved by backporting streaming sinks from spark 2.4.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org