You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sandish Kumar HN (Jira)" <ji...@apache.org> on 2020/09/30 18:37:00 UTC
[jira] [Created] (SPARK-33039) Misleading watermark calculation in
structure streaming
Sandish Kumar HN created SPARK-33039:
----------------------------------------
Summary: Misleading watermark calculation in structure streaming
Key: SPARK-33039
URL: https://issues.apache.org/jira/browse/SPARK-33039
Project: Spark
Issue Type: Bug
Components: Structured Streaming
Affects Versions: 2.4.4
Reporter: Sandish Kumar HN
source code:
{code:java}
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.Path
import java.sql.Timestamp
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
object TestWaterMark extends App {
val spark = SparkSession.builder().master("local").getOrCreate()
val sc = spark.sparkContext
val dir = new Path("/tmp/test-structured-streaming")
val fs = dir.getFileSystem(sc.hadoopConfiguration)
fs.mkdirs(dir)
val schema = StructType(StructField("vilue", StringType) ::
StructField("timestamp", TimestampType) ::
Nil)
val eventStream = spark
.readStream
.option("sep", ";")
.option("header", "false")
.schema(schema)
.csv(dir.toString)
// Watermarked aggregation
val eventsCount = eventStream
.withWatermark("timestamp", "5 seconds")
.groupBy(window(col("timestamp"), "10 seconds"))
.count
def writeFile(path: Path, data: String) {
val file = fs.create(path)
file.writeUTF(data)
file.close()
}
// Debug query
val query = eventsCount.writeStream
.format("console")
.outputMode("complete")
.option("truncate", "false")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start()
writeFile(new Path(dir, "file1"), """
|OLD;2019-08-09 10:05:00
|OLD;2019-08-09 10:10:00
|OLD;2019-08-09 10:15:00""".stripMargin)
query.processAllAvailable()
val lp1 = query.lastProgress
println(lp1.eventTime)
writeFile(new Path(dir, "file2"), """
|NEW;2020-08-29 10:05:00
|NEW;2020-08-29 10:10:00
|NEW;2020-08-29 10:15:00""".stripMargin)
query.processAllAvailable()
val lp2 = query.lastProgress
println(lp2.eventTime)
writeFile(new Path(dir, "file4"), """
|OLD;2017-08-10 10:05:00
|OLD;2017-08-10 10:10:00
|OLD;2017-08-10 10:15:00""".stripMargin)
writeFile(new Path(dir, "file3"), "")
query.processAllAvailable()
val lp3 = query.lastProgress
println(lp3.eventTime)
query.awaitTermination()
fs.delete(dir, true)
}
{code}
OUTPUT:
{code:java}
-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+-----+
|window |count|
+------------------------------------------+-----+
|[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 |
|[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 |
|[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 |
+------------------------------------------+-----+
{min=2019-08-09T17:05:00.000Z, avg=2019-08-09T17:10:00.000Z, watermark=1970-01-01T00:00:00.000Z, max=2019-08-09T17:15:00.000Z}
-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+-----+
|window |count|
+------------------------------------------+-----+
|[2020-08-29 10:15:00, 2020-08-29 10:15:10]|1 |
|[2020-08-29 10:10:00, 2020-08-29 10:10:10]|1 |
|[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 |
|[2020-08-29 10:05:00, 2020-08-29 10:05:10]|1 |
|[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 |
|[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 |
+------------------------------------------+-----+
{min=2020-08-29T17:05:00.000Z, avg=2020-08-29T17:10:00.000Z, watermark=2019-08-09T17:14:55.000Z, max=2020-08-29T17:15:00.000Z}
-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+-----+
|window |count|
+------------------------------------------+-----+
|[2017-08-10 10:15:00, 2017-08-10 10:15:10]|1 |
|[2020-08-29 10:15:00, 2020-08-29 10:15:10]|1 |
|[2017-08-10 10:05:00, 2017-08-10 10:05:10]|1 |
|[2020-08-29 10:10:00, 2020-08-29 10:10:10]|1 |
|[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 |
|[2017-08-10 10:10:00, 2017-08-10 10:10:10]|1 |
|[2020-08-29 10:05:00, 2020-08-29 10:05:10]|1 |
|[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 |
|[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 |
+------------------------------------------+-----+
{min=2017-08-10T17:05:00.000Z, avg=2017-08-10T17:10:00.000Z, watermark=2020-08-29T17:14:55.000Z, max=2017-08-10T17:15:00.000Z}
{code}
EXPECTED:
expected to drop the last batch events to get dropped as the watermark is 2019-08-09T17:14:55.000Z.
expected events to get droped:
|OLD;2017-08-10 10:05:00
|OLD;2017-08-10 10:10:00
|OLD;2017-08-10 10:15:00
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org