You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sandish Kumar HN (Jira)" <ji...@apache.org> on 2020/09/30 18:37:00 UTC

[jira] [Created] (SPARK-33039) Misleading watermark calculation in structure streaming

Sandish Kumar HN created SPARK-33039:
----------------------------------------

             Summary: Misleading watermark calculation in structure streaming
                 Key: SPARK-33039
                 URL: https://issues.apache.org/jira/browse/SPARK-33039
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 2.4.4
            Reporter: Sandish Kumar HN


source code:
{code:java}

import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.Path
import java.sql.Timestamp

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}

object TestWaterMark extends App {


 val spark = SparkSession.builder().master("local").getOrCreate()
 val sc = spark.sparkContext
 val dir = new Path("/tmp/test-structured-streaming")
 val fs = dir.getFileSystem(sc.hadoopConfiguration)
 fs.mkdirs(dir)

 val schema = StructType(StructField("vilue", StringType) ::
 StructField("timestamp", TimestampType) ::
 Nil)

 val eventStream = spark
 .readStream
 .option("sep", ";")
 .option("header", "false")
 .schema(schema)
 .csv(dir.toString)

 // Watermarked aggregation
 val eventsCount = eventStream
 .withWatermark("timestamp", "5 seconds")
 .groupBy(window(col("timestamp"), "10 seconds"))
 .count

 def writeFile(path: Path, data: String) {
 val file = fs.create(path)
 file.writeUTF(data)
 file.close()
 }

 // Debug query
 val query = eventsCount.writeStream
 .format("console")
 .outputMode("complete")
 .option("truncate", "false")
 .trigger(Trigger.ProcessingTime("5 seconds"))
 .start()

 writeFile(new Path(dir, "file1"), """
 |OLD;2019-08-09 10:05:00
 |OLD;2019-08-09 10:10:00
 |OLD;2019-08-09 10:15:00""".stripMargin)

 query.processAllAvailable()
 val lp1 = query.lastProgress
 println(lp1.eventTime)


 writeFile(new Path(dir, "file2"), """
 |NEW;2020-08-29 10:05:00
 |NEW;2020-08-29 10:10:00
 |NEW;2020-08-29 10:15:00""".stripMargin)

 query.processAllAvailable()
 val lp2 = query.lastProgress
 println(lp2.eventTime)

 writeFile(new Path(dir, "file4"), """
 |OLD;2017-08-10 10:05:00
 |OLD;2017-08-10 10:10:00
 |OLD;2017-08-10 10:15:00""".stripMargin)
 writeFile(new Path(dir, "file3"), "")

 query.processAllAvailable()
 val lp3 = query.lastProgress
 println(lp3.eventTime)


 query.awaitTermination()
 fs.delete(dir, true)

}

{code}

OUTPUT:

 
{code:java}
-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+-----+
|window |count|
+------------------------------------------+-----+
|[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 |
|[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 |
|[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 |
+------------------------------------------+-----+
{min=2019-08-09T17:05:00.000Z, avg=2019-08-09T17:10:00.000Z, watermark=1970-01-01T00:00:00.000Z, max=2019-08-09T17:15:00.000Z}
-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+-----+
|window |count|
+------------------------------------------+-----+
|[2020-08-29 10:15:00, 2020-08-29 10:15:10]|1 |
|[2020-08-29 10:10:00, 2020-08-29 10:10:10]|1 |
|[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 |
|[2020-08-29 10:05:00, 2020-08-29 10:05:10]|1 |
|[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 |
|[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 |
+------------------------------------------+-----+
{min=2020-08-29T17:05:00.000Z, avg=2020-08-29T17:10:00.000Z, watermark=2019-08-09T17:14:55.000Z, max=2020-08-29T17:15:00.000Z}
-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+-----+
|window |count|
+------------------------------------------+-----+
|[2017-08-10 10:15:00, 2017-08-10 10:15:10]|1 |
|[2020-08-29 10:15:00, 2020-08-29 10:15:10]|1 |
|[2017-08-10 10:05:00, 2017-08-10 10:05:10]|1 |
|[2020-08-29 10:10:00, 2020-08-29 10:10:10]|1 |
|[2019-08-09 10:05:00, 2019-08-09 10:05:10]|1 |
|[2017-08-10 10:10:00, 2017-08-10 10:10:10]|1 |
|[2020-08-29 10:05:00, 2020-08-29 10:05:10]|1 |
|[2019-08-09 10:15:00, 2019-08-09 10:15:10]|1 |
|[2019-08-09 10:10:00, 2019-08-09 10:10:10]|1 |
+------------------------------------------+-----+
{min=2017-08-10T17:05:00.000Z, avg=2017-08-10T17:10:00.000Z, watermark=2020-08-29T17:14:55.000Z, max=2017-08-10T17:15:00.000Z}
{code}

EXPECTED:
expected to drop the last batch events to get dropped as the watermark is 2019-08-09T17:14:55.000Z.

expected events to get droped:
 |OLD;2017-08-10 10:05:00
 |OLD;2017-08-10 10:10:00
 |OLD;2017-08-10 10:15:00



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org