You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/10/24 02:56:21 UTC

[GitHub] [spark] cloud-fan commented on a diff in pull request #38361: [SPARK-40892][SQL][SS] Loosen the requirement of window_time rule - allow multiple window_time calls

cloud-fan commented on code in PR #38361:
URL: https://github.com/apache/spark/pull/38361#discussion_r1002847203


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala:
##########
@@ -292,53 +292,59 @@ object ResolveWindowTime extends Rule[LogicalPlan] {
       val windowTimeExpressions =
         p.expressions.flatMap(_.collect { case w: WindowTime => w }).toSet
 
-      if (windowTimeExpressions.size == 1 &&
-        windowTimeExpressions.head.windowColumn.resolved &&
-        windowTimeExpressions.head.checkInputDataTypes().isSuccess) {
+      val allWindowTimeExprsResolved = windowTimeExpressions.forall { w =>
+        w.windowColumn.resolved && w.checkInputDataTypes().isSuccess
+      }
 
-        val windowTime = windowTimeExpressions.head
+      if (windowTimeExpressions.nonEmpty && allWindowTimeExprsResolved) {
+        val windowTimeToAttrAndNewColumn = windowTimeExpressions.map { windowTime =>
+          val metadata = windowTime.windowColumn match {
+            case a: Attribute => a.metadata
+            case _ => Metadata.empty
+          }
 
-        val metadata = windowTime.windowColumn match {
-          case a: Attribute => a.metadata
-          case _ => Metadata.empty
-        }
+          if (!metadata.contains(TimeWindow.marker) &&
+            !metadata.contains(SessionWindow.marker)) {
+            // FIXME: error framework?
+            throw new AnalysisException(
+              "The input is not a correct window column: $windowTime", plan = Some(p))
+          }
 
-        if (!metadata.contains(TimeWindow.marker) &&
-          !metadata.contains(SessionWindow.marker)) {
-          // FIXME: error framework?
-          throw new AnalysisException(
-            "The input is not a correct window column: $windowTime", plan = Some(p))
-        }
+          val newMetadata = new MetadataBuilder()
+            .withMetadata(metadata)
+            .remove(TimeWindow.marker)
+            .remove(SessionWindow.marker)
+            .build()
 
-        val newMetadata = new MetadataBuilder()
-          .withMetadata(metadata)
-          .remove(TimeWindow.marker)
-          .remove(SessionWindow.marker)
-          .build()
+          val colName = windowTime.sql

Review Comment:
   will this be materialized in the checkpoint or state store? The SQL string for an expression is unstable, as it depends on resolved expression, and resolution may change overtime (e.g. type coercion may add cast differently).



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala:
##########
@@ -292,53 +292,59 @@ object ResolveWindowTime extends Rule[LogicalPlan] {
       val windowTimeExpressions =
         p.expressions.flatMap(_.collect { case w: WindowTime => w }).toSet
 
-      if (windowTimeExpressions.size == 1 &&
-        windowTimeExpressions.head.windowColumn.resolved &&
-        windowTimeExpressions.head.checkInputDataTypes().isSuccess) {
+      val allWindowTimeExprsResolved = windowTimeExpressions.forall { w =>
+        w.windowColumn.resolved && w.checkInputDataTypes().isSuccess
+      }
 
-        val windowTime = windowTimeExpressions.head
+      if (windowTimeExpressions.nonEmpty && allWindowTimeExprsResolved) {
+        val windowTimeToAttrAndNewColumn = windowTimeExpressions.map { windowTime =>
+          val metadata = windowTime.windowColumn match {
+            case a: Attribute => a.metadata
+            case _ => Metadata.empty
+          }
 
-        val metadata = windowTime.windowColumn match {
-          case a: Attribute => a.metadata
-          case _ => Metadata.empty
-        }
+          if (!metadata.contains(TimeWindow.marker) &&
+            !metadata.contains(SessionWindow.marker)) {
+            // FIXME: error framework?
+            throw new AnalysisException(
+              "The input is not a correct window column: $windowTime", plan = Some(p))
+          }
 
-        if (!metadata.contains(TimeWindow.marker) &&
-          !metadata.contains(SessionWindow.marker)) {
-          // FIXME: error framework?
-          throw new AnalysisException(
-            "The input is not a correct window column: $windowTime", plan = Some(p))
-        }
+          val newMetadata = new MetadataBuilder()
+            .withMetadata(metadata)
+            .remove(TimeWindow.marker)
+            .remove(SessionWindow.marker)
+            .build()
 
-        val newMetadata = new MetadataBuilder()
-          .withMetadata(metadata)
-          .remove(TimeWindow.marker)
-          .remove(SessionWindow.marker)
-          .build()
+          val colName = windowTime.sql

Review Comment:
   will this be materialized in the checkpoint or state store? The SQL string for an expression is unstable, as it depends on resolved expression, and resolution may change over time (e.g. type coercion may add cast differently).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org