You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Aoyuan Liao (Jira)" <ji...@apache.org> on 2020/10/05 23:19:00 UTC

[jira] [Commented] (SPARK-33039) Misleading watermark calculation in structure streaming

    [ https://issues.apache.org/jira/browse/SPARK-33039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208409#comment-17208409 ] 

Aoyuan Liao commented on SPARK-33039:
-------------------------------------

It is not a bug. It is documented that

"

It is important to note that the following conditions must be satisfied for the watermarking to clean the state in aggregation queries _(as of Spark 2.1.1, subject to change in the future)_.
 * *Output mode must be Append or Update*

" in [https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking.]

In your case, the data is output in complete mode so all aggregated data are preserved.

> Misleading watermark calculation in structure streaming
> -------------------------------------------------------
>
>                 Key: SPARK-33039
>                 URL: https://issues.apache.org/jira/browse/SPARK-33039
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.4
>            Reporter: Sandish Kumar HN
>            Priority: Major
>
> source code:
> {code:java}
> import org.apache.spark.sql.SparkSession
> import org.apache.hadoop.fs.Path
> import java.sql.Timestamp
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
> object TestWaterMark extends App {
>  val spark = SparkSession.builder().master("local").getOrCreate()
>  val sc = spark.sparkContext
>  val dir = new Path("/tmp/test-structured-streaming")
>  val fs = dir.getFileSystem(sc.hadoopConfiguration)
>  fs.mkdirs(dir)
>  val schema = StructType(StructField("vilue", StringType) ::
>  StructField("timestamp", TimestampType) ::
>  Nil)
>  val eventStream = spark
>  .readStream
>  .option("sep", ";")
>  .option("header", "false")
>  .schema(schema)
>  .csv(dir.toString)
>  // Watermarked aggregation
>  val eventsCount = eventStream
>  .withWatermark("timestamp", "5 seconds")
>  .groupBy(window(col("timestamp"), "10 seconds"))
>  .count
>  def writeFile(path: Path, data: String) {
>  val file = fs.create(path)
>  file.writeUTF(data)
>  file.close()
>  }
>  // Debug query
>  val query = eventsCount.writeStream
>  .format("console")
>  .outputMode("complete")
>  .option("truncate", "false")
>  .trigger(Trigger.ProcessingTime("5 seconds"))
>  .start()
>  writeFile(new Path(dir, "file1"), """
>  |OLD;2019-08-09 10:05:00
>  |OLD;2019-08-09 10:10:00
>  |OLD;2019-08-09 10:15:00""".stripMargin)
>  query.processAllAvailable()
>  val lp1 = query.lastProgress
>  println(lp1.eventTime)
>  writeFile(new Path(dir, "file2"), """
>  |NEW;2020-08-29 10:05:00
>  |NEW;2020-08-29 10:10:00
>  |NEW;2020-08-29 10:15:00""".stripMargin)
>  query.processAllAvailable()
>  val lp2 = query.lastProgress
>  println(lp2.eventTime)
>  writeFile(new Path(dir, "file4"), """
>  |OLD;2017-08-10 10:05:00
>  |OLD;2017-08-10 10:10:00
>  |OLD;2017-08-10 10:15:00""".stripMargin)
>  writeFile(new Path(dir, "file3"), "")
>  query.processAllAvailable()
>  val lp3 = query.lastProgress
>  println(lp3.eventTime)
>  query.awaitTermination()
>  fs.delete(dir, true)
> }
> {code}
> OUTPUT:
>  
> {code:java}
> -------------------------------------------
> Batch: 0
> -------------------------------------------
> +------------------------------------------+-----+
> |window |count|
> +------------------------------------------+-----+
> |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 |
> |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 |
> |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 |
> +------------------------------------------+-----+
> {min=2019-08-09T17:05:00.000Z, avg=2019-08-09T17:10:00.000Z, watermark=1970-01-01T00:00:00.000Z, max=2019-08-09T17:15:00.000Z}
> -------------------------------------------
> Batch: 1
> -------------------------------------------
> +------------------------------------------+-----+
> |window |count|
> +------------------------------------------+-----+
> |[2020-08-29 10:15:00, 2020-08-29 10:15:10]|1 |
> |[2020-08-29 10:10:00, 2020-08-29 10:10:10]|1 |
> |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 |
> |[2020-08-29 10:05:00, 2020-08-29 10:05:10]|1 |
> |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 |
> |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 |
> +------------------------------------------+-----+
> {min=2020-08-29T17:05:00.000Z, avg=2020-08-29T17:10:00.000Z, watermark=2019-08-09T17:14:55.000Z, max=2020-08-29T17:15:00.000Z}
> -------------------------------------------
> Batch: 2
> -------------------------------------------
> +------------------------------------------+-----+
> |window |count|
> +------------------------------------------+-----+
> |[2017-08-10 10:15:00, 2017-08-10 10:15:10]|1 |
> |[2020-08-29 10:15:00, 2020-08-29 10:15:10]|1 |
> |[2017-08-10 10:05:00, 2017-08-10 10:05:10]|1 |
> |[2020-08-29 10:10:00, 2020-08-29 10:10:10]|1 |
> |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 |
> |[2017-08-10 10:10:00, 2017-08-10 10:10:10]|1 |
> |[2020-08-29 10:05:00, 2020-08-29 10:05:10]|1 |
> |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 |
> |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 |
> +------------------------------------------+-----+
> {min=2017-08-10T17:05:00.000Z, avg=2017-08-10T17:10:00.000Z, watermark=2020-08-29T17:14:55.000Z, max=2017-08-10T17:15:00.000Z}
> {code}
> EXPECTED:
> expected to drop the last batch events to get dropped as the watermark is 2019-08-09T17:14:55.000Z.
> expected events to get droped:
>  |OLD;2017-08-10 10:05:00
>  |OLD;2017-08-10 10:10:00
>  |OLD;2017-08-10 10:15:00



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org