You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "HeartSaVioR (via GitHub)" <gi...@apache.org> on 2023/10/25 00:02:08 UTC

Re: [PR] [SPARK-45655][SS] Mark CurrentBatchTimestamp as deterministic [spark]

HeartSaVioR commented on code in PR #43517:
URL: https://github.com/apache/spark/pull/43517#discussion_r1370931246


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala:
##########
@@ -354,6 +357,47 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
     )
   }
 
+  test("SPARK-45655: test current batch timestamp in streaming query metrics") {
+    import testImplicits._
+
+    val inputData = MemoryStream[Timestamp]
+
+    val query = inputData.toDF()
+      .filter("value < current_date()")
+      .observe("metrics", count(expr("value >= current_date()")).alias("dropped"))
+      .writeStream
+      .queryName("ts_metrics_test")
+      .format("memory")
+      .outputMode("append")
+      .start()
+
+    val timeNow = Instant.now()
+
+    // this value would be accepted by the filter and would not count towards
+    // dropped metrics.
+    val validValue = Timestamp.from(timeNow.minus(2, ChronoUnit.DAYS))
+    inputData.addData(validValue)
+
+    // would be dropped by the filter and count towards dropped metrics
+    inputData.addData(Timestamp.from(timeNow.plus(2, ChronoUnit.DAYS)))
+
+    query.processAllAvailable()
+
+    val dropped = query.recentProgress.map(p => {

Review Comment:
   nit: style, you can omit ( and ) via changing this to `{ p =>`



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala:
##########
@@ -354,6 +357,47 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
     )
   }
 
+  test("SPARK-45655: test current batch timestamp in streaming query metrics") {
+    import testImplicits._
+
+    val inputData = MemoryStream[Timestamp]
+
+    val query = inputData.toDF()
+      .filter("value < current_date()")
+      .observe("metrics", count(expr("value >= current_date()")).alias("dropped"))
+      .writeStream
+      .queryName("ts_metrics_test")
+      .format("memory")
+      .outputMode("append")
+      .start()
+
+    val timeNow = Instant.now()
+
+    // this value would be accepted by the filter and would not count towards
+    // dropped metrics.
+    val validValue = Timestamp.from(timeNow.minus(2, ChronoUnit.DAYS))
+    inputData.addData(validValue)
+
+    // would be dropped by the filter and count towards dropped metrics
+    inputData.addData(Timestamp.from(timeNow.plus(2, ChronoUnit.DAYS)))
+
+    query.processAllAvailable()
+
+    val dropped = query.recentProgress.map(p => {
+      val metricVal = Option(p.observedMetrics.get("metrics"))
+      metricVal.map(_.getLong(0)).getOrElse(0L)
+    }).sum
+
+    query.stop()

Review Comment:
   nit: Let's make related code be near to each other.
   
   ```
   query.processAllAvailable()
   query.stop()
   
   val dropped = ...blabla...
   assert(dropped == 1)
   
   val data = ...blabla...
   assert(data(0).getAs[Timestamp](0).equals(validValue))
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala:
##########
@@ -354,6 +357,47 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
     )
   }
 
+  test("SPARK-45655: test current batch timestamp in streaming query metrics") {

Review Comment:
   Let's make it more explicit - Use current batch timestamp in observe API. We can omit streaming query as the test suite is for streaming query.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala:
##########
@@ -354,6 +357,47 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
     )
   }
 
+  test("SPARK-45655: test current batch timestamp in streaming query metrics") {
+    import testImplicits._
+
+    val inputData = MemoryStream[Timestamp]
+
+    val query = inputData.toDF()
+      .filter("value < current_date()")

Review Comment:
   Let's add code comment that current_date() internally uses current batch timestamp on streaming query. Many people may not understand about the relationship between current_date() and current batch timestamp.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org