You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Seth Fitzsimmons (JIRA)" <ji...@apache.org> on 2018/08/27 22:32:00 UTC

[jira] [Commented] (SPARK-25257) v2 MicroBatchReaders can't resume from checkpoints

    [ https://issues.apache.org/jira/browse/SPARK-25257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16594298#comment-16594298 ] 

Seth Fitzsimmons commented on SPARK-25257:
------------------------------------------

I traced this a bit further; {{OffsetSeq.toStreamProgress}} looks like it might be the right place to deserialize. {{deserialize.patch}} deserializes if the source is a {{MicroBatchReader}} (i.e. has a {{deserializeOffset}} method).

{{runBatch}} seems partially right (it's specific to {{MicroBatchReader}}s), but {{available}} (variable in scope) may be either a {{SerializedOffset}} or {{Offset}} depending whether a checkpoint is being resumed or if it's starting from a clean slate.

> v2 MicroBatchReaders can't resume from checkpoints
> --------------------------------------------------
>
>                 Key: SPARK-25257
>                 URL: https://issues.apache.org/jira/browse/SPARK-25257
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.1
>            Reporter: Seth Fitzsimmons
>            Priority: Major
>         Attachments: deserialize.patch
>
>
> When resuming from a checkpoint:
> {code:java}
> writeStream.option("checkpointLocation", "/tmp/checkpoint").format("console").start
> {code}
> The stream reader fails with:
> {noformat}
> osmesa.common.streaming.AugmentedDiffMicroBatchReader@59e19287
> 	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
> 	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
> Caused by: java.lang.ClassCastException: org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to org.apache.spark.sql.sources.v2.reader.streaming.Offset
> 	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:405)
> 	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:390)
> 	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> 	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> 	at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
> 	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> 	at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
> 	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
> 	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
> 	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
> 	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> 	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:389)
> 	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
> 	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
> 	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
> 	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
> 	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> 	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
> 	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
> 	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
> 	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
> 	... 1 more
> {noformat}
> The root cause appears to be that the `SerializedOffset` (JSON, from disk) is never deserialized; I would expect to see something along the lines of `reader.deserializeOffset(off.json)` here (unless `available` is intended to be deserialized elsewhere):
> https://github.com/apache/spark/blob/4dc82259d81102e0cb48f4cb2e8075f80d899ac4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L405



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org