You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Saisai Shao (JIRA)" <ji...@apache.org> on 2018/04/02 08:13:00 UTC

[jira] [Created] (SPARK-23844) Socket Stream recovering from checkpoint will throw exception

Saisai Shao created SPARK-23844:
-----------------------------------

             Summary: Socket Stream recovering from checkpoint will throw exception
                 Key: SPARK-23844
                 URL: https://issues.apache.org/jira/browse/SPARK-23844
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 2.4.0
            Reporter: Saisai Shao


When we specified checkpoint location as well as using socket streaming, it will throw exception after rerun:
{noformat}
18/04/02 14:11:28 ERROR MicroBatchExecution: Query test [id = c5ca82b2-550b-4c3d-9127-869f1aeae477, runId = 552d5bd4-a7e7-44e5-a85a-2f04f666ff6a] terminated with error
java.lang.RuntimeException: Offsets committed out of order: 0 followed by -1
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.execution.streaming.sources.TextSocketMicroBatchReader.commit(socket.scala:196)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(MicroBatchExecution.scala:373)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(MicroBatchExecution.scala:370)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcV$sp(MicroBatchExecution.scala:370)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:353)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:353)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:353)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:142)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:135)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:135)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:135)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:131)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189){noformat}
Basically it means that {{TextSocketMicroBatchReader}} is honoring the offsets recovered from checkpoint, this is not correct for socket source, as it doesn't support recovering from checkpoint. Even though the offset is recovered, the real data is still unmatched from this offset.

To reproduce this issue,
{code:java}
val socket = spark.readStream.format("socket").options(Map("host" -> "localhost", "port" -> "9999")).load
spark.conf.set("spark.sql.streaming.checkpointLocation", "./checkpoint")
socket.writeStream.format("parquet").option("path", "./result").queryName("test").start
{code}
This will be failed in the second run.

Though this source is not supported from in production envs, I think still we should make sure it can be worked in test env without throwing runtime exception.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org