You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2018/06/04 17:46:00 UTC

[jira] [Commented] (SPARK-24462) Text socket micro-batch reader throws error when a query is restarted with saved state

    [ https://issues.apache.org/jira/browse/SPARK-24462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16500606#comment-16500606 ] 

Apache Spark commented on SPARK-24462:
--------------------------------------

User 'arunmahadevan' has created a pull request for this issue:
https://github.com/apache/spark/pull/21490

> Text socket micro-batch reader throws error when a query is restarted with saved state
> --------------------------------------------------------------------------------------
>
>                 Key: SPARK-24462
>                 URL: https://issues.apache.org/jira/browse/SPARK-24462
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 2.3.0
>            Reporter: Arun Mahadevan
>            Priority: Critical
>
> Exception thrown:
>  
> {noformat}
> scala> 18/06/01 22:47:04 ERROR MicroBatchExecution: Query [id = 0bdc4428-5d21-4237-9d64-898ae65f28f3, runId = f6822423-2bd2-47c1-8ed6-799d1c481195] terminated with error
> java.lang.RuntimeException: Offsets committed out of order: 2 followed by -1
>  at scala.sys.package$.error(package.scala:27)
>  at org.apache.spark.sql.execution.streaming.sources.TextSocketMicroBatchReader.commit(socket.scala:197)
>  at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$2$$anonfun$apply$mcV$sp$5.apply(MicroBatchExecution.scala:377)
>  
> {noformat}
>  
> Sample code that reproduces the error on restarting the query.
>  
> {code:java}
>  
> import java.sql.Timestamp
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions._
> import spark.implicits._
> import org.apache.spark.sql.streaming.Trigger
> val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).option("includeTimestamp", true).load()
> val words = lines.as[(String, Timestamp)].flatMap(line => line._1.split(" ").map(word => (word, line._2))).toDF("word", "timestamp")
> val windowedCounts = words.groupBy(window($"timestamp", "20 minutes", "20 minutes"), $"word").count().orderBy("window")
> val query = windowedCounts.writeStream.outputMode("complete").option("checkpointLocation", "/tmp/debug").format("console").option("truncate", "false").start()
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org