You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2018/04/02 09:10:00 UTC

[jira] [Assigned] (SPARK-23844) Socket Stream recovering from checkpoint will throw exception

     [ https://issues.apache.org/jira/browse/SPARK-23844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-23844:
------------------------------------

    Assignee:     (was: Apache Spark)

> Socket Stream recovering from checkpoint will throw exception
> -------------------------------------------------------------
>
>                 Key: SPARK-23844
>                 URL: https://issues.apache.org/jira/browse/SPARK-23844
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>            Reporter: Saisai Shao
>            Priority: Major
>
> When we specified checkpoint location as well as using socket streaming, it will throw exception after rerun:
> {noformat}
> 18/04/02 14:11:28 ERROR MicroBatchExecution: Query test [id = c5ca82b2-550b-4c3d-9127-869f1aeae477, runId = 552d5bd4-a7e7-44e5-a85a-2f04f666ff6a] terminated with error
> java.lang.RuntimeException: Offsets committed out of order: 0 followed by -1
> at scala.sys.package$.error(package.scala:27)
> at org.apache.spark.sql.execution.streaming.sources.TextSocketMicroBatchReader.commit(socket.scala:196)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(MicroBatchExecution.scala:373)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(MicroBatchExecution.scala:370)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcV$sp(MicroBatchExecution.scala:370)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:353)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:353)
> at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
> at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:353)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:142)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:135)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:135)
> at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
> at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:135)
> at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:131)
> at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
> at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189){noformat}
> Basically it means that {{TextSocketMicroBatchReader}} is honoring the offsets recovered from checkpoint, this is not correct for socket source, as it doesn't support recovering from checkpoint. Even though the offset is recovered, the real data is still unmatched from this offset.
> To reproduce this issue,
> {code:java}
> val socket = spark.readStream.format("socket").options(Map("host" -> "localhost", "port" -> "9999")).load
> spark.conf.set("spark.sql.streaming.checkpointLocation", "./checkpoint")
> socket.writeStream.format("parquet").option("path", "./result").queryName("test").start
> {code}
> This will be failed in the second run.
> Though this source is not supported from in production envs, I think still we should make sure it can be worked in test env without throwing runtime exception.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org