You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "sahnib (via GitHub)" <gi...@apache.org> on 2023/10/24 22:05:51 UTC

[PR] [SPARK-45655] Mark CurrentBatchTimestamp as deterministic. [spark]

sahnib opened a new pull request, #43517:
URL: https://github.com/apache/spark/pull/43517

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   This PR removes the `NonDeterministic` trait from `CurrentBatchTimestamp` expression. `CurrentBatchTimestamp` is deterministic and its value is decided in the offset log during WAL and won’t change in case of retrial.
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   Streaming queries do not support current_date() inside CollectMetrics. The primary reason is that current_date() (resolves to CurrentBatchTimestamp) is marked as non-deterministic. However, current_date and current_timestamp are both deterministic today, and current_batch_timestamp should be the same.
   
   As an example, the query below fails (without this change) due to observe call on the DataFrame.
   
   ```
   
   val inputData = MemoryStream[Timestamp]
   
   inputData.toDF()
         .filter("value < current_date()")
         .observe("metrics", count(expr("value >= current_date()")).alias("dropped"))
         .writeStream
         .queryName("ts_metrics_test")
         .format("memory")
         .outputMode("append")
         .start()
   
   ```
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   No
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   Unit test cases added.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   <!--
   If generative AI tooling has been used in the process of authoring this patch, please include the
   phrase: 'Generated-by: ' followed by the name of the tool and its version.
   If no, write 'No'.
   Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
   -->
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45655][SQL][SS] Allow non-deterministic expressions inside AggregateFunctions in CollectMetrics [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #43517:
URL: https://github.com/apache/spark/pull/43517#discussion_r1390349996


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala:
##########
@@ -794,9 +794,20 @@ class AnalysisSuite extends AnalysisTest with Matchers {
     // No columns
     assert(!CollectMetrics("evt", Nil, testRelation, 0).resolved)
 
-    def checkAnalysisError(exprs: Seq[NamedExpression], errors: String*): Unit = {

Review Comment:
   This hasn't seem to be used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45655][SS] Mark CurrentBatchTimestamp as deterministic [spark]

Posted by "sahnib (via GitHub)" <gi...@apache.org>.
sahnib commented on code in PR #43517:
URL: https://github.com/apache/spark/pull/43517#discussion_r1371968318


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala:
##########
@@ -354,6 +357,47 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
     )
   }
 
+  test("SPARK-45655: test current batch timestamp in streaming query metrics") {
+    import testImplicits._
+
+    val inputData = MemoryStream[Timestamp]
+
+    val query = inputData.toDF()
+      .filter("value < current_date()")
+      .observe("metrics", count(expr("value >= current_date()")).alias("dropped"))
+      .writeStream
+      .queryName("ts_metrics_test")
+      .format("memory")
+      .outputMode("append")
+      .start()
+
+    val timeNow = Instant.now()
+
+    // this value would be accepted by the filter and would not count towards
+    // dropped metrics.
+    val validValue = Timestamp.from(timeNow.minus(2, ChronoUnit.DAYS))
+    inputData.addData(validValue)
+
+    // would be dropped by the filter and count towards dropped metrics
+    inputData.addData(Timestamp.from(timeNow.plus(2, ChronoUnit.DAYS)))
+
+    query.processAllAvailable()
+
+    val dropped = query.recentProgress.map(p => {
+      val metricVal = Option(p.observedMetrics.get("metrics"))
+      metricVal.map(_.getLong(0)).getOrElse(0L)
+    }).sum
+
+    query.stop()

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45655][SQL][SS] Allow non-deterministic expressions inside AggregateFunctions in CollectMetrics [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR closed pull request #43517: [SPARK-45655][SQL][SS] Allow non-deterministic expressions inside AggregateFunctions in CollectMetrics
URL: https://github.com/apache/spark/pull/43517


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45655][SS] Mark `CurrentBatchTimestamp` as deterministic [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #43517:
URL: https://github.com/apache/spark/pull/43517#issuecomment-1784610509

   (I'm sorry to miss leaving a note in prior.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45655][SQL][SS] Allow non-deterministic expressions inside AggregateFunctions in CollectMetrics [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #43517:
URL: https://github.com/apache/spark/pull/43517#issuecomment-1807047223

   @sahnib Thanks for your first contribution to Apache Spark! I merged this to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45655][SQL][SS] Allow non-deterministic expressions inside AggregateFunctions in CollectMetrics [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #43517:
URL: https://github.com/apache/spark/pull/43517#issuecomment-1803246795

   Please check `SPARK-45655: Use current batch timestamp in observe API *** FAILED *** (241 milliseconds)`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45655][SS] Mark CurrentBatchTimestamp as deterministic [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #43517:
URL: https://github.com/apache/spark/pull/43517#discussion_r1370931246


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala:
##########
@@ -354,6 +357,47 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
     )
   }
 
+  test("SPARK-45655: test current batch timestamp in streaming query metrics") {
+    import testImplicits._
+
+    val inputData = MemoryStream[Timestamp]
+
+    val query = inputData.toDF()
+      .filter("value < current_date()")
+      .observe("metrics", count(expr("value >= current_date()")).alias("dropped"))
+      .writeStream
+      .queryName("ts_metrics_test")
+      .format("memory")
+      .outputMode("append")
+      .start()
+
+    val timeNow = Instant.now()
+
+    // this value would be accepted by the filter and would not count towards
+    // dropped metrics.
+    val validValue = Timestamp.from(timeNow.minus(2, ChronoUnit.DAYS))
+    inputData.addData(validValue)
+
+    // would be dropped by the filter and count towards dropped metrics
+    inputData.addData(Timestamp.from(timeNow.plus(2, ChronoUnit.DAYS)))
+
+    query.processAllAvailable()
+
+    val dropped = query.recentProgress.map(p => {

Review Comment:
   nit: style, you can omit ( and ) via changing this to `{ p =>`



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala:
##########
@@ -354,6 +357,47 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
     )
   }
 
+  test("SPARK-45655: test current batch timestamp in streaming query metrics") {
+    import testImplicits._
+
+    val inputData = MemoryStream[Timestamp]
+
+    val query = inputData.toDF()
+      .filter("value < current_date()")
+      .observe("metrics", count(expr("value >= current_date()")).alias("dropped"))
+      .writeStream
+      .queryName("ts_metrics_test")
+      .format("memory")
+      .outputMode("append")
+      .start()
+
+    val timeNow = Instant.now()
+
+    // this value would be accepted by the filter and would not count towards
+    // dropped metrics.
+    val validValue = Timestamp.from(timeNow.minus(2, ChronoUnit.DAYS))
+    inputData.addData(validValue)
+
+    // would be dropped by the filter and count towards dropped metrics
+    inputData.addData(Timestamp.from(timeNow.plus(2, ChronoUnit.DAYS)))
+
+    query.processAllAvailable()
+
+    val dropped = query.recentProgress.map(p => {
+      val metricVal = Option(p.observedMetrics.get("metrics"))
+      metricVal.map(_.getLong(0)).getOrElse(0L)
+    }).sum
+
+    query.stop()

Review Comment:
   nit: Let's make related code be near to each other.
   
   ```
   query.processAllAvailable()
   query.stop()
   
   val dropped = ...blabla...
   assert(dropped == 1)
   
   val data = ...blabla...
   assert(data(0).getAs[Timestamp](0).equals(validValue))
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala:
##########
@@ -354,6 +357,47 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
     )
   }
 
+  test("SPARK-45655: test current batch timestamp in streaming query metrics") {

Review Comment:
   Let's make it more explicit - Use current batch timestamp in observe API. We can omit streaming query as the test suite is for streaming query.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala:
##########
@@ -354,6 +357,47 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
     )
   }
 
+  test("SPARK-45655: test current batch timestamp in streaming query metrics") {
+    import testImplicits._
+
+    val inputData = MemoryStream[Timestamp]
+
+    val query = inputData.toDF()
+      .filter("value < current_date()")

Review Comment:
   Let's add code comment that current_date() internally uses current batch timestamp on streaming query. Many people may not understand about the relationship between current_date() and current batch timestamp.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45655][SS] Mark CurrentBatchTimestamp as deterministic [spark]

Posted by "sahnib (via GitHub)" <gi...@apache.org>.
sahnib commented on code in PR #43517:
URL: https://github.com/apache/spark/pull/43517#discussion_r1371967970


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala:
##########
@@ -354,6 +357,47 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
     )
   }
 
+  test("SPARK-45655: test current batch timestamp in streaming query metrics") {
+    import testImplicits._
+
+    val inputData = MemoryStream[Timestamp]
+
+    val query = inputData.toDF()
+      .filter("value < current_date()")

Review Comment:
   Done.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala:
##########
@@ -354,6 +357,47 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
     )
   }
 
+  test("SPARK-45655: test current batch timestamp in streaming query metrics") {

Review Comment:
   Done



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala:
##########
@@ -354,6 +357,47 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
     )
   }
 
+  test("SPARK-45655: test current batch timestamp in streaming query metrics") {
+    import testImplicits._
+
+    val inputData = MemoryStream[Timestamp]
+
+    val query = inputData.toDF()
+      .filter("value < current_date()")
+      .observe("metrics", count(expr("value >= current_date()")).alias("dropped"))
+      .writeStream
+      .queryName("ts_metrics_test")
+      .format("memory")
+      .outputMode("append")
+      .start()
+
+    val timeNow = Instant.now()
+
+    // this value would be accepted by the filter and would not count towards
+    // dropped metrics.
+    val validValue = Timestamp.from(timeNow.minus(2, ChronoUnit.DAYS))
+    inputData.addData(validValue)
+
+    // would be dropped by the filter and count towards dropped metrics
+    inputData.addData(Timestamp.from(timeNow.plus(2, ChronoUnit.DAYS)))
+
+    query.processAllAvailable()
+
+    val dropped = query.recentProgress.map(p => {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45655][SQL][SS] Allow non-deterministic expressions inside AggregateFunctions in CollectMetrics [spark]

Posted by "sahnib (via GitHub)" <gi...@apache.org>.
sahnib commented on PR #43517:
URL: https://github.com/apache/spark/pull/43517#issuecomment-1806461097

   > Please check `SPARK-45655: Use current batch timestamp in observe API *** FAILED *** (241 milliseconds)`
   
   This test was failing because CI machine has milliseconds precision of 8 decimal points, I have fixed the testcase. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45655][SS] Mark `CurrentBatchTimestamp` as deterministic [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #43517:
URL: https://github.com/apache/spark/pull/43517#issuecomment-1784608374

   FYI, the previous test failure was relevant to this change, and @sahnib is looking into it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45655][SQL][SS] Allow non-deterministic expressions inside AggregateFunctions in CollectMetrics [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #43517:
URL: https://github.com/apache/spark/pull/43517#issuecomment-1807046684

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45655][SQL][SS] Allow non-deterministic expressions inside AggregateFunctions in CollectMetrics [spark]

Posted by "sahnib (via GitHub)" <gi...@apache.org>.
sahnib commented on code in PR #43517:
URL: https://github.com/apache/spark/pull/43517#discussion_r1390488268


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala:
##########
@@ -794,9 +794,20 @@ class AnalysisSuite extends AnalysisTest with Matchers {
     // No columns
     assert(!CollectMetrics("evt", Nil, testRelation, 0).resolved)
 
-    def checkAnalysisError(exprs: Seq[NamedExpression], errors: String*): Unit = {

Review Comment:
   Correct. This was unused, so I removed it.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org