You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "HeartSaVioR (via GitHub)" <gi...@apache.org> on 2024/01/11 09:17:10 UTC

[PR] [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan [spark]

HeartSaVioR opened a new pull request, #44688:
URL: https://github.com/apache/spark/pull/44688

   ### What changes were proposed in this pull request?
   
   This PR proposes to fix the bug on canonicalizing the plan which contains the physical node of dropDuplicatesWithinWatermark (StreamingDeduplicateWithinWatermarkExec).
   
   ### Why are the changes needed?
   
   Canonicalization of the plan will replace the expressions (including attributes) to remove out cosmetic, including name, "and metadata".
   
   StreamingDeduplicateWithinWatermarkExec assumes that the input attributes of child node contain the event time column, and it is determined at the initialization of the node instance. Once canonicalization is being triggered, child node will lose the notion of event time column from its attributes, and copy of StreamingDeduplicateWithinWatermarkExec will be performed which instantiating a new node of `StreamingDeduplicateWithinWatermarkExec` with new child node, which no longer has an event time column, hence instantiation will fail.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   New UT added.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #44688:
URL: https://github.com/apache/spark/pull/44688#discussion_r1452013986


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala:
##########
@@ -199,4 +199,27 @@ class StreamingDeduplicationWithinWatermarkSuite extends StateStoreMetricsTest {
       )
     }
   }
+
+  test("SPARK-46676: canonicalization of StreamingDeduplicateWithinWatermarkExec should work") {
+    withTempDir { checkpoint =>
+      val dedupeInputData = MemoryStream[(String, Int)]
+      val dedupe = dedupeInputData.toDS()
+        .withColumn("eventTime", timestamp_seconds($"_2"))
+        .withWatermark("eventTime", "10 second")
+        .dropDuplicatesWithinWatermark("_1")
+        .select($"_1", $"eventTime".cast("long").as[Long])
+
+      testStream(dedupe, Append)(
+        StartStream(checkpointLocation = checkpoint.getCanonicalPath),
+

Review Comment:
   Good point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #44688:
URL: https://github.com/apache/spark/pull/44688#issuecomment-1891204121

   cc. @zsxwing @viirya @anishshri-db @cloud-fan Friendly reminder.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #44688:
URL: https://github.com/apache/spark/pull/44688#issuecomment-1894889923

   If some attribute metadata streaming is not cosmetic, shall we keep them during canonicalization?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #44688:
URL: https://github.com/apache/spark/pull/44688#issuecomment-1895865683

   OK, looks like we have a case where propagating event time metadata on canonicalization won't help. Given that current fix could handle such case as well, shall we allow the current fix as it is?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR closed pull request #44688: [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan
URL: https://github.com/apache/spark/pull/44688


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #44688:
URL: https://github.com/apache/spark/pull/44688#issuecomment-1899550091

   Thanks all for reviewing! Merging to master/3.5.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #44688:
URL: https://github.com/apache/spark/pull/44688#issuecomment-1893095472

   GA failure is just a linter issue - Sphinx version.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #44688:
URL: https://github.com/apache/spark/pull/44688#issuecomment-1888379367

   There are some optimization rules being applied to the physical plan to reuse the shuffle/subquery in batch side across microbatches (they are unlikely changed between microbatches unless the batch source has updated). One example is ReuseExchangeAndSubquery.
   
   https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/reuse/ReuseExchangeAndSubquery.scala
   
   This rule leverages canonicalized plan to compare with cached subtree (plan). It does not require the plan to be exactly same - as long as they are identical without cosmetic.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #44688:
URL: https://github.com/apache/spark/pull/44688#issuecomment-1895307863

   @cloud-fan 
   Ah OK. Thanks for the suggestion. I can look into how to propagate event time metadata only. Will ping you again once I'm done with it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan [spark]

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on PR #44688:
URL: https://github.com/apache/spark/pull/44688#issuecomment-1887742890

   As the finder of the bug repro and a curious learner of Catalyst I'd really appreciate it if anyone can tell me 
   1. Where is this canonicalized spark plan constructed in this simple test case, or why do we even bother to create such a plan for spark physical plans??
   3. Why without select (i.e. `Project`), the query runs successfully? It should still go through this canonicalization process right?
   Thanks in advance!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44688:
URL: https://github.com/apache/spark/pull/44688#discussion_r1452012084


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala:
##########
@@ -199,4 +199,27 @@ class StreamingDeduplicationWithinWatermarkSuite extends StateStoreMetricsTest {
       )
     }
   }
+
+  test("SPARK-46676: canonicalization of StreamingDeduplicateWithinWatermarkExec should work") {
+    withTempDir { checkpoint =>
+      val dedupeInputData = MemoryStream[(String, Int)]
+      val dedupe = dedupeInputData.toDS()
+        .withColumn("eventTime", timestamp_seconds($"_2"))
+        .withWatermark("eventTime", "10 second")
+        .dropDuplicatesWithinWatermark("_1")
+        .select($"_1", $"eventTime".cast("long").as[Long])
+
+      testStream(dedupe, Append)(
+        StartStream(checkpointLocation = checkpoint.getCanonicalPath),
+

Review Comment:
   Nit: should we skip newline ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #44688:
URL: https://github.com/apache/spark/pull/44688#discussion_r1457520532


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -1097,17 +1097,41 @@ case class StreamingDeduplicateWithinWatermarkExec(
 
   protected val extraOptionOnStateStore: Map[String, String] = Map.empty
 
-  private val eventTimeCol: Attribute = WatermarkSupport.findEventTimeColumn(child.output,
-    allowMultipleEventTimeColumns = false).get
-  private val delayThresholdMs = eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey)
-  private val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol)

Review Comment:
   Done with lazy val, please refer the commit [f36735f](https://github.com/apache/spark/pull/44688/commits/f36735f0f7c736dd50a3bb9a26a4fd99824c6983)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44688:
URL: https://github.com/apache/spark/pull/44688#discussion_r1456774055


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -1097,17 +1097,41 @@ case class StreamingDeduplicateWithinWatermarkExec(
 
   protected val extraOptionOnStateStore: Map[String, String] = Map.empty
 
-  private val eventTimeCol: Attribute = WatermarkSupport.findEventTimeColumn(child.output,
-    allowMultipleEventTimeColumns = false).get
-  private val delayThresholdMs = eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey)
-  private val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol)

Review Comment:
   shall we simply make them `lazy val`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44688:
URL: https://github.com/apache/spark/pull/44688#discussion_r1456781515


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -1097,17 +1097,41 @@ case class StreamingDeduplicateWithinWatermarkExec(
 
   protected val extraOptionOnStateStore: Map[String, String] = Map.empty
 
-  private val eventTimeCol: Attribute = WatermarkSupport.findEventTimeColumn(child.output,
-    allowMultipleEventTimeColumns = false).get
-  private val delayThresholdMs = eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey)
-  private val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol)

Review Comment:
   BTW, we can also check `isCanonicalizedPlan` and do something differently for canonilaized plan.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #44688:
URL: https://github.com/apache/spark/pull/44688#issuecomment-1892978420

   @anishshri-db Thanks for reviewing!
   
   cc. @zsxwing @viirya @cloud-fan @HyukjinKwon Mind having a quick look given @anishshri-db reviewed this already? Thanks in advance!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #44688:
URL: https://github.com/apache/spark/pull/44688#issuecomment-1899540620

   @cloud-fan Mind having another look, please? The failure in GA is only from doc generation (sphinx version), which is a known one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #44688:
URL: https://github.com/apache/spark/pull/44688#issuecomment-1895702492

   I'm not really sure I understand the logic correctly.
   
   ```
   val dedupeInputData = MemoryStream[(String, Int)]
   val dedupe = dedupeInputData.toDS()
     .withColumn("eventTime", timestamp_seconds($"_2"))
     .withWatermark("eventTime", "10 second")
     .dropDuplicatesWithinWatermark("_1")
     .select($"_1", $"eventTime".cast("long").as[Long])
   ```
   
   ```
   20:59:20.593 WARN org.apache.spark.sql.execution.streaming.EventTimeWatermarkExec: CALL [withNewChildInternal] newChild: *(1) !Project [none#0, timestamp_seconds(none#1) AS #0]
   ```
   
   The above Project represents withColumn in above query. Do we expect exprId 0 to be used "twice"? They are referring different columns but use the same exprId.
   (FYI, eventTime in EventTimeWatermarkExec is canonicalized as `none#1`.)
   
   You can change EventTimeWatermarkExec and run the new test in this PR to reproduce.
   
   ```
     override protected def withNewChildInternal(newChild: SparkPlan): EventTimeWatermarkExec = {
       val metadataMap = newChild.output.map { attr =>
         attr.name -> attr.metadata
       }
       logWarning(s"CALL [withNewChildInternal] newChild: $newChild, metadata: $metadataMap")
       copy(child = newChild)
     }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #44688:
URL: https://github.com/apache/spark/pull/44688#discussion_r1456922256


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -1097,17 +1097,41 @@ case class StreamingDeduplicateWithinWatermarkExec(
 
   protected val extraOptionOnStateStore: Map[String, String] = Map.empty
 
-  private val eventTimeCol: Attribute = WatermarkSupport.findEventTimeColumn(child.output,
-    allowMultipleEventTimeColumns = false).get
-  private val delayThresholdMs = eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey)
-  private val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol)

Review Comment:
   Ah OK, that's good to know. Probably either way would simply work. Will try that out. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44688:
URL: https://github.com/apache/spark/pull/44688#discussion_r1452016953


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -1097,17 +1097,41 @@ case class StreamingDeduplicateWithinWatermarkExec(
 
   protected val extraOptionOnStateStore: Map[String, String] = Map.empty
 
-  private val eventTimeCol: Attribute = WatermarkSupport.findEventTimeColumn(child.output,
-    allowMultipleEventTimeColumns = false).get
-  private val delayThresholdMs = eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey)
-  private val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol)
+  // Below three variables are Option as attributes in child won't have an event time column
+  // in the canonicalized plan. The executed plan will eventually call doExecute() and we can
+  // defer assertion of the existence of event time column at that time.
+  private val eventTimeColOpt: Option[Attribute] = WatermarkSupport.findEventTimeColumn(
+    child.output, allowMultipleEventTimeColumns = false)
+  private val delayThresholdMsOpt: Option[Long] = eventTimeColOpt.map(
+    _.metadata.getLong(EventTimeWatermark.delayKey))
+  private val eventTimeColOrdinalOpt: Option[Int] = eventTimeColOpt.map(child.output.indexOf)
+
+  // Below three variables will be set lazily when doExecute() is called.

Review Comment:
   I guess this is only possible for this operator. But do you think there is a way to enforce this at an operator level so that future operators won't miss this case ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #44688:
URL: https://github.com/apache/spark/pull/44688#issuecomment-1895641773

   @cloud-fan 
   It does not seem to be easy fix - I have a hard time debugging on this:
   
   https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
   
   eventTime in the constructor does not seem to match with child.output via semanticEquals. Does canonicalization care about making child.output and attribute referring child.output in sync after canonicalization? Otherwise I expect the fix to be very involved and current fix of the PR sounds to me as reasonable workaround.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #44688:
URL: https://github.com/apache/spark/pull/44688#issuecomment-1895653609

   Yes, see https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala#L606-L610


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #44688:
URL: https://github.com/apache/spark/pull/44688#issuecomment-1895721464

   There are two kinds of expr IDs: 1) to reference attributes from children. 2) to indicate the output of the plan node.
   
   After canonicalization, the output expr IDs for each plan node (if they have output columns, such as Project) will be normalized to 0, 1, 2, .... The reference expr IDs will be normalized to the ordinal of the matching column from children output columns.  So ideally they can match, as the ordinal 0 means the first output column, whose expr ID should also be 0. However, it's possible that an attribute is both a reference and a output, e.g. `Project(a#1 AS aa#3, b#2)`. So it seems we can't find the event column on a canonicalized plan.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #44688:
URL: https://github.com/apache/spark/pull/44688#issuecomment-1886709869

   cc. @zsxwing @viirya @anishshri-db Please take a look.
   Also cc. @cloud-fan Maybe this does not require streaming expertise, hence asking for a review.
   
   Thanks in advance!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #44688:
URL: https://github.com/apache/spark/pull/44688#discussion_r1452024043


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -1097,17 +1097,41 @@ case class StreamingDeduplicateWithinWatermarkExec(
 
   protected val extraOptionOnStateStore: Map[String, String] = Map.empty
 
-  private val eventTimeCol: Attribute = WatermarkSupport.findEventTimeColumn(child.output,
-    allowMultipleEventTimeColumns = false).get
-  private val delayThresholdMs = eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey)
-  private val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol)
+  // Below three variables are Option as attributes in child won't have an event time column
+  // in the canonicalized plan. The executed plan will eventually call doExecute() and we can
+  // defer assertion of the existence of event time column at that time.
+  private val eventTimeColOpt: Option[Attribute] = WatermarkSupport.findEventTimeColumn(
+    child.output, allowMultipleEventTimeColumns = false)
+  private val delayThresholdMsOpt: Option[Long] = eventTimeColOpt.map(
+    _.metadata.getLong(EventTimeWatermark.delayKey))
+  private val eventTimeColOrdinalOpt: Option[Int] = eventTimeColOpt.map(child.output.indexOf)
+
+  // Below three variables will be set lazily when doExecute() is called.

Review Comment:
   We actually leverage trait already for watermark. See WatermarkSupport. I don't see a way to enforce this for operator-specific changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #44688:
URL: https://github.com/apache/spark/pull/44688#issuecomment-1895212887

   > Do you suggest retaining metadata selectively, or keep all metadata?
   
   I mean the metadatas that are not cosmetic, and I think even time metadata is one of them. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on canonicalization of the plan [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #44688:
URL: https://github.com/apache/spark/pull/44688#issuecomment-1895124586

   @cloud-fan Do you suggest retaining metadata selectively, or keep all metadata? For event time column metadata I guess it's unlikely to have a side effect, but I'm not sure about propagating all metadata.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org