You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Aoyuan Liao (Jira)" <ji...@apache.org> on 2020/10/05 23:19:00 UTC
[jira] [Commented] (SPARK-33039) Misleading watermark calculation
in structure streaming
[ https://issues.apache.org/jira/browse/SPARK-33039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208409#comment-17208409 ]
Aoyuan Liao commented on SPARK-33039:
-------------------------------------
It is not a bug. It is documented that
"
It is important to note that the following conditions must be satisfied for the watermarking to clean the state in aggregation queries _(as of Spark 2.1.1, subject to change in the future)_.
* *Output mode must be Append or Update*
" in [https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking.]
In your case, the data is output in complete mode so all aggregated data are preserved.
> Misleading watermark calculation in structure streaming
> -------------------------------------------------------
>
> Key: SPARK-33039
> URL: https://issues.apache.org/jira/browse/SPARK-33039
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.4.4
> Reporter: Sandish Kumar HN
> Priority: Major
>
> source code:
> {code:java}
> import org.apache.spark.sql.SparkSession
> import org.apache.hadoop.fs.Path
> import java.sql.Timestamp
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
> object TestWaterMark extends App {
> val spark = SparkSession.builder().master("local").getOrCreate()
> val sc = spark.sparkContext
> val dir = new Path("/tmp/test-structured-streaming")
> val fs = dir.getFileSystem(sc.hadoopConfiguration)
> fs.mkdirs(dir)
> val schema = StructType(StructField("vilue", StringType) ::
> StructField("timestamp", TimestampType) ::
> Nil)
> val eventStream = spark
> .readStream
> .option("sep", ";")
> .option("header", "false")
> .schema(schema)
> .csv(dir.toString)
> // Watermarked aggregation
> val eventsCount = eventStream
> .withWatermark("timestamp", "5 seconds")
> .groupBy(window(col("timestamp"), "10 seconds"))
> .count
> def writeFile(path: Path, data: String) {
> val file = fs.create(path)
> file.writeUTF(data)
> file.close()
> }
> // Debug query
> val query = eventsCount.writeStream
> .format("console")
> .outputMode("complete")
> .option("truncate", "false")
> .trigger(Trigger.ProcessingTime("5 seconds"))
> .start()
> writeFile(new Path(dir, "file1"), """
> |OLD;2019-08-09 10:05:00
> |OLD;2019-08-09 10:10:00
> |OLD;2019-08-09 10:15:00""".stripMargin)
> query.processAllAvailable()
> val lp1 = query.lastProgress
> println(lp1.eventTime)
> writeFile(new Path(dir, "file2"), """
> |NEW;2020-08-29 10:05:00
> |NEW;2020-08-29 10:10:00
> |NEW;2020-08-29 10:15:00""".stripMargin)
> query.processAllAvailable()
> val lp2 = query.lastProgress
> println(lp2.eventTime)
> writeFile(new Path(dir, "file4"), """
> |OLD;2017-08-10 10:05:00
> |OLD;2017-08-10 10:10:00
> |OLD;2017-08-10 10:15:00""".stripMargin)
> writeFile(new Path(dir, "file3"), "")
> query.processAllAvailable()
> val lp3 = query.lastProgress
> println(lp3.eventTime)
> query.awaitTermination()
> fs.delete(dir, true)
> }
> {code}
> OUTPUT:
>
> {code:java}
> -------------------------------------------
> Batch: 0
> -------------------------------------------
> +------------------------------------------+-----+
> |window |count|
> +------------------------------------------+-----+
> |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 |
> |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 |
> |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 |
> +------------------------------------------+-----+
> {min=2019-08-09T17:05:00.000Z, avg=2019-08-09T17:10:00.000Z, watermark=1970-01-01T00:00:00.000Z, max=2019-08-09T17:15:00.000Z}
> -------------------------------------------
> Batch: 1
> -------------------------------------------
> +------------------------------------------+-----+
> |window |count|
> +------------------------------------------+-----+
> |[2020-08-29 10:15:00, 2020-08-29 10:15:10]|1 |
> |[2020-08-29 10:10:00, 2020-08-29 10:10:10]|1 |
> |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 |
> |[2020-08-29 10:05:00, 2020-08-29 10:05:10]|1 |
> |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 |
> |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 |
> +------------------------------------------+-----+
> {min=2020-08-29T17:05:00.000Z, avg=2020-08-29T17:10:00.000Z, watermark=2019-08-09T17:14:55.000Z, max=2020-08-29T17:15:00.000Z}
> -------------------------------------------
> Batch: 2
> -------------------------------------------
> +------------------------------------------+-----+
> |window |count|
> +------------------------------------------+-----+
> |[2017-08-10 10:15:00, 2017-08-10 10:15:10]|1 |
> |[2020-08-29 10:15:00, 2020-08-29 10:15:10]|1 |
> |[2017-08-10 10:05:00, 2017-08-10 10:05:10]|1 |
> |[2020-08-29 10:10:00, 2020-08-29 10:10:10]|1 |
> |[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 |
> |[2017-08-10 10:10:00, 2017-08-10 10:10:10]|1 |
> |[2020-08-29 10:05:00, 2020-08-29 10:05:10]|1 |
> |[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 |
> |[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 |
> +------------------------------------------+-----+
> {min=2017-08-10T17:05:00.000Z, avg=2017-08-10T17:10:00.000Z, watermark=2020-08-29T17:14:55.000Z, max=2017-08-10T17:15:00.000Z}
> {code}
> EXPECTED:
> expected to drop the last batch events to get dropped as the watermark is 2019-08-09T17:14:55.000Z.
> expected events to get droped:
> |OLD;2017-08-10 10:05:00
> |OLD;2017-08-10 10:10:00
> |OLD;2017-08-10 10:15:00
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org