You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/02/07 08:29:03 UTC

[GitHub] [spark] HeartSaVioR opened a new pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to all stateful operators

HeartSaVioR opened a new pull request #35419:
URL: https://github.com/apache/spark/pull/35419


   ### What changes were proposed in this pull request?
   
   This PR revives `HashClusteredDistribution` and renames to `StatefulOpClusteredDistribution` so that the rationalization of the distribution is clear from the name. Renaming is safe because this class no longer needs to be general one - in SPARK-35703 we moved out the usages of `HashClusteredDistribution` to `ClusteredDistribution`; stateful operators are exceptions.
   
   Only `HashPartitioning` with same expressions and number of partitions can satisfy `StatefulOpClusteredDistribution`. That said, we cannot modify `HashPartitioning` unless we clone `HashPartitioning` and assign the clone to `StatefulOpClusteredDistribution`.
   
   This PR documents the expectation of stateful operator on partitioning in the classdoc of `StatefulOpClusteredDistribution`.
   
   This PR also changes all stateful operators to use `StatefulOpClusteredDistribution` instead of `ClusteredDistribution`. This is a long standing issue (Spark 2.3.0+) for stateful operator to use `ClusteredDistribution` which has relaxed requirements than `HashClusteredDistribution`, and this PR fixes the issue.
   
   This PR also has to introduce some changes on Aggregate, since the required child distribution Aggregate is not same with stateful operator's one, which may bring unexpected shuffles between Aggregate and StateStoreRestoreExec / StateStoreSaveExec. This PR makes sure the overall pipeline of `Aggregate -> StateStoreRestoreExec -> Aggregate -> StateStoreSaveExec -> Aggregate` to use stateful operator's required child distribution.
   
   ### Why are the changes needed?
   
   Spark does not guarantee stable physical partitioning for stateful operators across query lifetime, and due to the relaxed distribution requirement it is hard to expect what would be the current physical partitioning of the state.
   (We expect hash partitioning with grouping keys, but ClusteredDistribution does not "guarantee" the partitioning. It is much more relaxed.)
   
   This PR will enforce the physical partitioning of stateful operators to be hash partition with grouping keys, which is our general expectation of state store partitioning.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, if they have streaming queries with checkpoint which state is NOT partitioned via hash partitioning with grouping keys, the change may impact the query result. But we have no idea what is current partitioning of the state in checkpoint, so this is  unfortunately the best effort we can do.
   
   ### How was this patch tested?
   
   Existing tests.
   
   TODO: New tests will come up sooner.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r802045995



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -185,8 +185,8 @@ case class StreamingSymmetricHashJoinExec(
   val nullRight = new GenericInternalRow(right.output.map(_.withNullability(true)).length)
 
   override def requiredChildDistribution: Seq[Distribution] =
-    ClusteredDistribution(leftKeys, stateInfo.map(_.numPartitions)) ::
-      ClusteredDistribution(rightKeys, stateInfo.map(_.numPartitions)) :: Nil
+    StatefulOpClusteredDistribution(leftKeys, stateInfo.map(_.numPartitions)) ::
+      StatefulOpClusteredDistribution(rightKeys, stateInfo.map(_.numPartitions)) :: Nil

Review comment:
       Please refer the long comment thread https://github.com/apache/spark/pull/35419#discussion_r801162163
   
   We have to fix them, but we should have a plan to avoid introducing "silently broken" on existing queries. We need more time to think through how to address the "already broken" thing. They seem to be broken from their introduction (Spark 2.2+), so it could be possible someone is even leveraging the relaxed requirement as a "feature". Even for this case we can't simply break their query.
   
   I'll create a new JIRA ticket, and/or initiate discussion thread in dev@ regarding this. I need some time to build a plan (with options) to address this safely.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r802084404



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -185,8 +185,8 @@ case class StreamingSymmetricHashJoinExec(
   val nullRight = new GenericInternalRow(right.output.map(_.withNullability(true)).length)
 
   override def requiredChildDistribution: Seq[Distribution] =
-    ClusteredDistribution(leftKeys, stateInfo.map(_.numPartitions)) ::
-      ClusteredDistribution(rightKeys, stateInfo.map(_.numPartitions)) :: Nil
+    StatefulOpClusteredDistribution(leftKeys, stateInfo.map(_.numPartitions)) ::
+      StatefulOpClusteredDistribution(rightKeys, stateInfo.map(_.numPartitions)) :: Nil

Review comment:
       > it could be possible someone is even leveraging the relaxed requirement as a "feature"
   
   Suppose they have a stream of event logs having (userId, time, blabla), and do time-window aggregation like this:
   
   ```
   df
     .withWatermark("time", "1 hour")
     .groupBy("userId", window("time", "10 minutes"))
     .agg(count("*"))
   ```
   
   groupBy won't trigger shuffle for various output partitionings of df, since streaming aggregation requires ClusteredDistribution. The thing is, it could be from the intention to 1) reduce shuffle in any way, or 2) try to control the partitioning to deal with skew. (I can't easily think of skew from applying hash function against "grouping keys + time window", but once they see it, they will try to fix it.) 
   
   Both are very risky (as of now, changing the partitioning during query lifetime would lead to correctness issue), but it's still from users' intention.
   
   Furthermore, we seem to allow data source to produce output partitioning by itself, which can satisfy ClusteredDistribution. This is still very risky for stateful operator's perspective, but once the output partitioning is guaranteed to be not changed, it's still a great change to reduce (unnecessary) shuffle.
   (Just saying hypothetically; stateful operator has to require specific output partitioning once the state is built, so it's unlikely that we can leverage the partitioning of data source. We may find a way later but not now.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #35419:
URL: https://github.com/apache/spark/pull/35419#issuecomment-1033167068


   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801357151



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -90,6 +90,34 @@ case class ClusteredDistribution(
   }
 }
 
+/**
+ * Represents the requirement of distribution on the stateful operator.
+ *
+ * Each partition in stateful operator initializes state store(s), which are independent with state
+ * store(s) in other partitions. Since it is not possible to repartition the data in state store,
+ * Spark should make sure the physical partitioning of the stateful operator is unchanged across
+ * Spark versions. Violation of this requirement may bring silent correctness issue.
+ *
+ * Since this distribution relies on [[HashPartitioning]] on the physical partitioning of the
+ * stateful operator, only [[HashPartitioning]] can satisfy this distribution.

Review comment:
       Yeah either works for me. The comment is also non-blocking for this PR, as this is an improvement for documentation. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR closed pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR closed pull request #35419:
URL: https://github.com/apache/spark/pull/35419


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to all stateful operators

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801168066



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
##########
@@ -337,7 +337,7 @@ case class StateStoreRestoreExec(
     if (keyExpressions.isEmpty) {
       AllTuples :: Nil
     } else {
-      ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil
+      StatefulOpClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil

Review comment:
       But we don't define "adding repartition" as "unsupported operation across query lifetime", no?
   
   The thing is that once the query is run once, the partitioning of stateful operator must not be changed during lifetime. Since we don't store the information of partitioning against stateful operator in the checkpoint, we have no way around other than enforcing the partitioning of stateful operator as the "one" what we basically expect.
   
   As I said in #32875, there is a room for improvement, but the effort on improvement must be performed after we fix this issue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801899247



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -90,6 +90,35 @@ case class ClusteredDistribution(
   }
 }
 
+/**
+ * Represents the requirement of distribution on the stateful operator.
+ *
+ * Each partition in stateful operator initializes state store(s), which are independent with state
+ * store(s) in other partitions. Since it is not possible to repartition the data in state store,
+ * Spark should make sure the physical partitioning of the stateful operator is unchanged across
+ * Spark versions. Violation of this requirement may bring silent correctness issue.
+ *
+ * Since this distribution relies on [[HashPartitioning]] on the physical partitioning of the
+ * stateful operator, only [[HashPartitioning]] (and HashPartitioning in
+ * [[PartitioningCollection]]) can satisfy this distribution.
+ */
+case class StatefulOpClusteredDistribution(
+    expressions: Seq[Expression],
+    requiredNumPartitions: Option[Int] = None) extends Distribution {
+  require(
+    expressions != Nil,
+    "The expressions for hash of a StatefulOpClusteredDistribution should not be Nil. " +
+      "An AllTuples should be used to represent a distribution that only has " +
+      "a single partition.")
+
+  override def createPartitioning(numPartitions: Int): Partitioning = {
+    assert(requiredNumPartitions.isEmpty || requiredNumPartitions.get == numPartitions,

Review comment:
       Hmm, is there any chance we specify empty `requiredNumPartitions`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801347810



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -200,6 +228,11 @@ case object SinglePartition extends Partitioning {
  * Represents a partitioning where rows are split up across partitions based on the hash
  * of `expressions`.  All rows where `expressions` evaluate to the same values are guaranteed to be
  * in the same partition.
+ *
+ * Since [[StatefulOpClusteredDistribution]] relies on this partitioning and Spark requires
+ * stateful operators to retain the same physical partitioning during the lifetime of the query
+ * (including restart), the result of evaluation on `partitionIdExpression` must be unchanged
+ * across Spark versions. Violation of this requirement may bring silent correctness issue.

Review comment:
       We have a test for verifying this, although it is not exhaustive.
   
   https://github.com/apache/spark/blob/2e703ae9d210d26573b849a9a88c55667b24127d/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala#L574-L605
   
   If we want to be exhaustive, I can make a combination of repartitions which could have not triggered shuffle with hash partitioning against joining keys if stream-stream join uses ClusteredDistribution. It may not be exhaustive for future-proof indeed.
   
   Instead, if we are pretty sure StateOpClusteredDistribution would work as expected, we can simply check the required child distribution of the physical plan of stream-stream join.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to all stateful operators

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801179051



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
##########
@@ -337,7 +337,7 @@ case class StateStoreRestoreExec(
     if (keyExpressions.isEmpty) {
       AllTuples :: Nil
     } else {
-      ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil
+      StatefulOpClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil

Review comment:
       >> But we don't define "repartition just before stateful operator" as "unsupported operation across query lifetime", no?
   
   > I ran the query in StreamingAggregationSuite.scala and it seems fine. I briefly checked UnsupportedOperationChecker.scala and didn't find we disallow "repartition just before stateful operator".
   
   That is the problem we have. We didn't disallow the case where it brings silent correctness issue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r802045995



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -185,8 +185,8 @@ case class StreamingSymmetricHashJoinExec(
   val nullRight = new GenericInternalRow(right.output.map(_.withNullability(true)).length)
 
   override def requiredChildDistribution: Seq[Distribution] =
-    ClusteredDistribution(leftKeys, stateInfo.map(_.numPartitions)) ::
-      ClusteredDistribution(rightKeys, stateInfo.map(_.numPartitions)) :: Nil
+    StatefulOpClusteredDistribution(leftKeys, stateInfo.map(_.numPartitions)) ::
+      StatefulOpClusteredDistribution(rightKeys, stateInfo.map(_.numPartitions)) :: Nil

Review comment:
       Please refer the long comment thread https://github.com/apache/spark/pull/35419#discussion_r801162163
   
   We have to fix them, but we should have a plan to avoid introducing "silently broken" on existing queries. We need more time to think through how to address the "already broken" thing. They seem to be broken from their introduction (Spark 2.2+), so it could be possible someone is even leveraging the relaxed requirement as a "feature", despite it would be very risky if they tried to adjust partitioning by theirselves. Even for this case we can't simply break their query.
   
   I'll create a new JIRA ticket, and/or initiate discussion thread in dev@ regarding this. I need some time to build a plan (with options) to address this safely.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801354342



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -200,6 +228,11 @@ case object SinglePartition extends Partitioning {
  * Represents a partitioning where rows are split up across partitions based on the hash
  * of `expressions`.  All rows where `expressions` evaluate to the same values are guaranteed to be
  * in the same partition.
+ *
+ * Since [[StatefulOpClusteredDistribution]] relies on this partitioning and Spark requires
+ * stateful operators to retain the same physical partitioning during the lifetime of the query
+ * (including restart), the result of evaluation on `partitionIdExpression` must be unchanged
+ * across Spark versions. Violation of this requirement may bring silent correctness issue.

Review comment:
       oh actually I was referring to the assumption:
   
   * `HashPartitioning.partitionIdExpression` has to be exactly `Pmod(new Murmur3Hash(expressions), Literal(numPartitions))`.
   
   It would be just to add some logic to check `opA/opB.partitionIdExpression` for the `opA/opB` at Line 598/599. I can also do it later if it's not clear to you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to all stateful operators

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801178448



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
##########
@@ -337,7 +337,7 @@ case class StateStoreRestoreExec(
     if (keyExpressions.isEmpty) {
       AllTuples :: Nil
     } else {
-      ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil
+      StatefulOpClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil

Review comment:
       > If we make the change here, I assume one extra shuffle on ($"_1", $"_2") would be introduced, and it might yield incorrect result when running the new query plan against the existing state store?
   
   Unfortunately yes. We may need to craft some tools to analyze the state and repartition if the partitioning is already messed up. But leaving this as it is would bring more chances to let users' state be indeterministic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r805734667



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -90,6 +90,34 @@ case class ClusteredDistribution(
   }
 }
 
+/**
+ * Represents the requirement of distribution on the stateful operator.
+ *
+ * Each partition in stateful operator initializes state store(s), which are independent with state
+ * store(s) in other partitions. Since it is not possible to repartition the data in state store,
+ * Spark should make sure the physical partitioning of the stateful operator is unchanged across
+ * Spark versions. Violation of this requirement may bring silent correctness issue.
+ *
+ * Since this distribution relies on [[HashPartitioning]] on the physical partitioning of the
+ * stateful operator, only [[HashPartitioning]] can satisfy this distribution.

Review comment:
       #35512




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to all stateful operators

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #35419:
URL: https://github.com/apache/spark/pull/35419#issuecomment-1032263480


   (I'm reading through #19080, and just realized there has been NO such strict requirement before we introduce HashClusteredDistribution in #19080 if I understand correctly... If then, it doesn't even start from 2.3+. It starts from 2.2+ where we introduced stateful operators in SS.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #35419:
URL: https://github.com/apache/spark/pull/35419#issuecomment-1032292814


   Just updated JIRA ticket, PR description, PR code diff to only contain the change of stream-stream join. I'll file another JIRA ticket for dealing with other stateful operators.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801343068



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -90,6 +90,34 @@ case class ClusteredDistribution(
   }
 }
 
+/**
+ * Represents the requirement of distribution on the stateful operator.
+ *
+ * Each partition in stateful operator initializes state store(s), which are independent with state
+ * store(s) in other partitions. Since it is not possible to repartition the data in state store,
+ * Spark should make sure the physical partitioning of the stateful operator is unchanged across
+ * Spark versions. Violation of this requirement may bring silent correctness issue.
+ *
+ * Since this distribution relies on [[HashPartitioning]] on the physical partitioning of the
+ * stateful operator, only [[HashPartitioning]] can satisfy this distribution.

Review comment:
       do we want to also explain briefly this only applies to `StreamingSymmetricHashJoinExec` now, and the challenge to apply it to other stateful operators? Maybe we can also file a JIRA for other stateful operators and leave a TODO here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801904424



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -185,8 +185,8 @@ case class StreamingSymmetricHashJoinExec(
   val nullRight = new GenericInternalRow(right.output.map(_.withNullability(true)).length)
 
   override def requiredChildDistribution: Seq[Distribution] =
-    ClusteredDistribution(leftKeys, stateInfo.map(_.numPartitions)) ::
-      ClusteredDistribution(rightKeys, stateInfo.map(_.numPartitions)) :: Nil
+    StatefulOpClusteredDistribution(leftKeys, stateInfo.map(_.numPartitions)) ::
+      StatefulOpClusteredDistribution(rightKeys, stateInfo.map(_.numPartitions)) :: Nil

Review comment:
       There is other `ClusteredDistribution` usage in statefulOperators, e.g. `ClusteredDistribution`, do we need to update it too?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r802084404



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -185,8 +185,8 @@ case class StreamingSymmetricHashJoinExec(
   val nullRight = new GenericInternalRow(right.output.map(_.withNullability(true)).length)
 
   override def requiredChildDistribution: Seq[Distribution] =
-    ClusteredDistribution(leftKeys, stateInfo.map(_.numPartitions)) ::
-      ClusteredDistribution(rightKeys, stateInfo.map(_.numPartitions)) :: Nil
+    StatefulOpClusteredDistribution(leftKeys, stateInfo.map(_.numPartitions)) ::
+      StatefulOpClusteredDistribution(rightKeys, stateInfo.map(_.numPartitions)) :: Nil

Review comment:
       > it could be possible someone is even leveraging the relaxed requirement as a "feature"
   
   Suppose they have a stream of event logs having (userId, time, blabla), and do time-window aggregation like this:
   
   ```
   df
     .withWatermark("time", "1 hour")
     .groupBy("userId", window("time", "10 minutes"))
     .agg(count("*"))
   ```
   
   groupBy won't trigger shuffle for various output partitionings of df, since streaming aggregation requires ClusteredDistribution. The thing is, it could be from the intention to 1) reduce shuffle in any way, or 2) try to control the partitioning to deal with skew. (I can't easily think of skew from applying hash function against "grouping keys + time window", but once they see it, they will try to fix it.) 
   
   Both are very risky (as of now, changing the partitioning during query lifetime would lead to correctness issue), but it's still from users' intention and they already did it anyway so we can't enforce the partitioning and silently break this again.
   
   Furthermore, we seem to allow data source to produce output partitioning by itself, which can satisfy ClusteredDistribution. This is still very risky for stateful operator's perspective, but once the output partitioning is guaranteed to be not changed, it's still a great change to reduce (unnecessary) shuffle.
   (Just saying hypothetically; stateful operator has to require specific output partitioning once the state is built, so it's unlikely that we can leverage the partitioning of data source. We may find a way later but not now.)

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -185,8 +185,8 @@ case class StreamingSymmetricHashJoinExec(
   val nullRight = new GenericInternalRow(right.output.map(_.withNullability(true)).length)
 
   override def requiredChildDistribution: Seq[Distribution] =
-    ClusteredDistribution(leftKeys, stateInfo.map(_.numPartitions)) ::
-      ClusteredDistribution(rightKeys, stateInfo.map(_.numPartitions)) :: Nil
+    StatefulOpClusteredDistribution(leftKeys, stateInfo.map(_.numPartitions)) ::
+      StatefulOpClusteredDistribution(rightKeys, stateInfo.map(_.numPartitions)) :: Nil

Review comment:
       > it could be possible someone is even leveraging the relaxed requirement as a "feature"
   
   Suppose they have a stream of event logs having (userId, time, blabla), and do time-window aggregation like this:
   
   ```
   df
     .withWatermark("time", "1 hour")
     .groupBy("userId", window("time", "10 minutes"))
     .agg(count("*"))
   ```
   
   groupBy won't trigger shuffle for various output partitionings of df, since streaming aggregation requires ClusteredDistribution. The thing is, it could be from the intention to 1) reduce shuffle in any way, or 2) try to control the partitioning to deal with skew. (I can't easily think of skew from applying hash function against "grouping keys + time window", but once they see it, they will try to fix it.) 
   
   Both are very risky (as of now, changing the partitioning during query lifetime would lead to correctness issue), but it's still from users' intention and they already did it anyway so we can't simply enforce the partitioning and silently break this again.
   
   Furthermore, we seem to allow data source to produce output partitioning by itself, which can satisfy ClusteredDistribution. This is still very risky for stateful operator's perspective, but once the output partitioning is guaranteed to be not changed, it's still a great change to reduce (unnecessary) shuffle.
   (Just saying hypothetically; stateful operator has to require specific output partitioning once the state is built, so it's unlikely that we can leverage the partitioning of data source. We may find a way later but not now.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to all stateful operators

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801135672



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -90,6 +90,34 @@ case class ClusteredDistribution(
   }
 }
 
+/**
+ * Represents the requirement of distribution on the stateful operator.
+ *
+ * Each partition in stateful operator initializes state store(s), which are independent with state
+ * store(s) in other partitions. Since it is not possible to repartition the data in state store,
+ * Spark should make sure the physical partitioning of the stateful operator is unchanged across
+ * Spark versions. Violation of this requirement may bring silent correctness issue.
+ *
+ * Since this distribution relies on [[HashPartitioning]] on the physical partitioning of the
+ * stateful operator, only [[HashPartitioning]] can satisfy this distribution.
+ */
+case class StatefulOpClusteredDistribution(

Review comment:
       > Do we also need to update HashShuffleSpec so that two HashPartitionings can be compatible with each other when checking against StatefulOpClusteredDistributions? this is the previous behavior where Spark would avoid shuffle if both sides of the streaming join are co-partitioned.
   
   Each input must follow the required distribution provided from stateful operator to respect the requirement of state partitioning. State partitioning is the first class, so even both sides of the streaming join are co-partitioned, Spark must perform shuffle if they don't match with state partitioning. (If that was the previous behavior, we broke something at some time point.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to all stateful operators

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801135672



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -90,6 +90,34 @@ case class ClusteredDistribution(
   }
 }
 
+/**
+ * Represents the requirement of distribution on the stateful operator.
+ *
+ * Each partition in stateful operator initializes state store(s), which are independent with state
+ * store(s) in other partitions. Since it is not possible to repartition the data in state store,
+ * Spark should make sure the physical partitioning of the stateful operator is unchanged across
+ * Spark versions. Violation of this requirement may bring silent correctness issue.
+ *
+ * Since this distribution relies on [[HashPartitioning]] on the physical partitioning of the
+ * stateful operator, only [[HashPartitioning]] can satisfy this distribution.
+ */
+case class StatefulOpClusteredDistribution(

Review comment:
       > Do we also need to update HashShuffleSpec so that two HashPartitionings can be compatible with each other when checking against StatefulOpClusteredDistributions? this is the previous behavior where Spark would avoid shuffle if both sides of the streaming join are co-partitioned.
   
   Each input must follow the required distribution provided from stateful operator to respect the requirement of state partitioning. state partitioning is the first class, so even both sides of the streaming join are co-partitioned, Spark must perform shuffle if they don't match with state partitioning. (If that was the previous behavior, we broke something.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to all stateful operators

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801185466



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -90,6 +90,34 @@ case class ClusteredDistribution(
   }
 }
 
+/**
+ * Represents the requirement of distribution on the stateful operator.
+ *
+ * Each partition in stateful operator initializes state store(s), which are independent with state
+ * store(s) in other partitions. Since it is not possible to repartition the data in state store,
+ * Spark should make sure the physical partitioning of the stateful operator is unchanged across
+ * Spark versions. Violation of this requirement may bring silent correctness issue.
+ *
+ * Since this distribution relies on [[HashPartitioning]] on the physical partitioning of the
+ * stateful operator, only [[HashPartitioning]] can satisfy this distribution.
+ */
+case class StatefulOpClusteredDistribution(

Review comment:
       The problem brought up because ClusteredDistribution has much more relaxed requirement; what we really need to require for "any" stateful operator including stream-stream join is that for all children a specific tuple having specific grouping key must be bound to the deterministic partition "ID", which only HashClusteredDistribution could guarantee.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r802084404



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -185,8 +185,8 @@ case class StreamingSymmetricHashJoinExec(
   val nullRight = new GenericInternalRow(right.output.map(_.withNullability(true)).length)
 
   override def requiredChildDistribution: Seq[Distribution] =
-    ClusteredDistribution(leftKeys, stateInfo.map(_.numPartitions)) ::
-      ClusteredDistribution(rightKeys, stateInfo.map(_.numPartitions)) :: Nil
+    StatefulOpClusteredDistribution(leftKeys, stateInfo.map(_.numPartitions)) ::
+      StatefulOpClusteredDistribution(rightKeys, stateInfo.map(_.numPartitions)) :: Nil

Review comment:
       > it could be possible someone is even leveraging the relaxed requirement as a "feature"
   
   Suppose they have a stream of event logs having (userId, time, blabla), and do time-window aggregation like this:
   
   ```
   df
     .withWatermark("time", "1 hour")
     .groupBy("userId", window("time", "10 minutes"))
     .agg(count("*"))
   ```
   
   groupBy won't trigger shuffle for various output partitionings of df, since streaming aggregation requires ClusteredDistribution. The thing is, it could be from the intention to 1) reduce shuffle in any way, or 2) try to control the partitioning to deal with skew. (I can't easily think of skew from applying hash function against "grouping keys + time window", but once they see it, they will try to fix it. ...Technically saying, they must not try to fix it as state partitioning will be no longer the same with operator's partitioning...)
   
   Both are very risky (as of now, changing the partitioning during query lifetime would lead to correctness issue), but it's still from users' intention and they already did it anyway so we can't simply enforce the partitioning and silently break this again.
   
   Furthermore, we seem to allow data source to produce output partitioning by itself, which can satisfy ClusteredDistribution. This is still very risky for stateful operator's perspective, but once the output partitioning is guaranteed to be not changed, it's still a great change to reduce (unnecessary) shuffle.
   (Just saying hypothetically; stateful operator has to require specific output partitioning once the state is built, so it's unlikely that we can leverage the partitioning of data source. We may find a way later but not now.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to all stateful operators

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801190907



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -90,6 +90,34 @@ case class ClusteredDistribution(
   }
 }
 
+/**
+ * Represents the requirement of distribution on the stateful operator.
+ *
+ * Each partition in stateful operator initializes state store(s), which are independent with state
+ * store(s) in other partitions. Since it is not possible to repartition the data in state store,
+ * Spark should make sure the physical partitioning of the stateful operator is unchanged across
+ * Spark versions. Violation of this requirement may bring silent correctness issue.
+ *
+ * Since this distribution relies on [[HashPartitioning]] on the physical partitioning of the
+ * stateful operator, only [[HashPartitioning]] can satisfy this distribution.
+ */
+case class StatefulOpClusteredDistribution(

Review comment:
       HashClusteredDistribution also has a requirement of the number of partitions, step 1) should fulfill it.

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -90,6 +90,34 @@ case class ClusteredDistribution(
   }
 }
 
+/**
+ * Represents the requirement of distribution on the stateful operator.
+ *
+ * Each partition in stateful operator initializes state store(s), which are independent with state
+ * store(s) in other partitions. Since it is not possible to repartition the data in state store,
+ * Spark should make sure the physical partitioning of the stateful operator is unchanged across
+ * Spark versions. Violation of this requirement may bring silent correctness issue.
+ *
+ * Since this distribution relies on [[HashPartitioning]] on the physical partitioning of the
+ * stateful operator, only [[HashPartitioning]] can satisfy this distribution.
+ */
+case class StatefulOpClusteredDistribution(

Review comment:
       HashClusteredDistribution also has a requirement of the number of partitions, so step 1) should fulfill it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to all stateful operators

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801192077



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
##########
@@ -337,7 +337,7 @@ case class StateStoreRestoreExec(
     if (keyExpressions.isEmpty) {
       AllTuples :: Nil
     } else {
-      ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil
+      StatefulOpClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil

Review comment:
       > That is the problem we have. We didn't disallow the case where it brings silent correctness issue.
   
   This is just a synthetic example I composed to verify my theory. But I think it might break for more cases, such as `GROUP BY c1, c2` after `JOIN ON c1`. I am trying to say it would break for the queries which are partitioned on subset of group-by keys.
   
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to all stateful operators

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801240853



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
##########
@@ -337,7 +337,7 @@ case class StateStoreRestoreExec(
     if (keyExpressions.isEmpty) {
       AllTuples :: Nil
     } else {
-      ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil
+      StatefulOpClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil

Review comment:
       Even nowadays multiple stateful operators don't work properly due to the global watermark, so we don't need to worry about the partitioning between stateful operators. We just need to worry about the partitioning between upstream (in most cases non stateful) and the stateful operator.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to all stateful operators

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801162163



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
##########
@@ -337,7 +337,7 @@ case class StateStoreRestoreExec(
     if (keyExpressions.isEmpty) {
       AllTuples :: Nil
     } else {
-      ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil
+      StatefulOpClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil

Review comment:
        I am wondering if this change could introduce extra shuffle for streaming aggregate. Previously the operator requires `ClusteredDistribution`, and right now it requires `StatefulOpClusteredDistribution`/`HashClusteredDistribution`. 
   
   `ClusteredDistribution` is more relaxed than `HashClusteredDistribution` in the sense that a `HashPartitioning(c1)` can satisfy `ClusteredDistribution(c1, c2)`, but cannot satisfy `HashClusteredDistribution(c1, c2)`. In short, `ClusteredDistribution` allows child to be hash-partitioned on subset of required keys. So for aggregate, if the plan is already shuffled on subset of group-by columns, Spark will not add a shuffle again before group-by.
   
   For example:
   
   ```
   MemoryStream[(Int, Int)].toDF()
     .repartition($"_1")
     .groupBy($"_1", $"_2")
     .agg(count("*"))
     .as[(Int, Int, Long)]
   ```
   
   and the query plan:
   
   ```
   WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5940f7c2, org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$1940/1200613952@4861dac3
   +- *(4) HashAggregate(keys=[_1#588, _2#589], functions=[count(1)], output=[_1#588, _2#589, count(1)#596L])
      +- StateStoreSave [_1#588, _2#589], state info [ checkpoint = file:/private/var/folders/y5/hnsw8mz93vs57ngcd30y6y9c0000gn/T/streaming.metadata-0d7cb004-92dd-4b0d-9d90-5a65c0d2934c/state, runId = 68598bd1-cf35-4bf7-a167-5f73dc9f4d84, opId = 0, ver = 0, numPartitions = 5], Complete, 0, 1
         +- *(3) HashAggregate(keys=[_1#588, _2#589], functions=[merge_count(1)], output=[_1#588, _2#589, count#663L])
            +- StateStoreRestore [_1#588, _2#589], state info [ checkpoint = file:/private/var/folders/y5/hnsw8mz93vs57ngcd30y6y9c0000gn/T/streaming.metadata-0d7cb004-92dd-4b0d-9d90-5a65c0d2934c/state, runId = 68598bd1-cf35-4bf7-a167-5f73dc9f4d84, opId = 0, ver = 0, numPartitions = 5], 1
               +- *(2) HashAggregate(keys=[_1#588, _2#589], functions=[merge_count(1)], output=[_1#588, _2#589, count#663L])
                  +- *(2) HashAggregate(keys=[_1#588, _2#589], functions=[partial_count(1)], output=[_1#588, _2#589, count#663L])
                     +- Exchange hashpartitioning(_1#588, 5), REPARTITION_BY_COL, [id=#2008]
                        +- *(1) Project [_1#588, _2#589]
                           +- MicroBatchScan[_1#588, _2#589] MemoryStreamDataSource
   ```
   
   One can argue the previous behavior for streaming aggregate is not wrong. As long as all rows for same keys are colocated in same partition, `StateStoreRestore/Store` should output correct answer for streaming aggregate. If we make the change here, I assume one extra shuffle on `($"_1", $"_2")` would be introduced, and it might yield incorrect result when running the new query plan against the existing state store? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r802050145



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -90,6 +90,35 @@ case class ClusteredDistribution(
   }
 }
 
+/**
+ * Represents the requirement of distribution on the stateful operator.
+ *
+ * Each partition in stateful operator initializes state store(s), which are independent with state
+ * store(s) in other partitions. Since it is not possible to repartition the data in state store,
+ * Spark should make sure the physical partitioning of the stateful operator is unchanged across
+ * Spark versions. Violation of this requirement may bring silent correctness issue.
+ *
+ * Since this distribution relies on [[HashPartitioning]] on the physical partitioning of the
+ * stateful operator, only [[HashPartitioning]] (and HashPartitioning in
+ * [[PartitioningCollection]]) can satisfy this distribution.
+ */
+case class StatefulOpClusteredDistribution(
+    expressions: Seq[Expression],
+    requiredNumPartitions: Option[Int] = None) extends Distribution {
+  require(
+    expressions != Nil,
+    "The expressions for hash of a StatefulOpClusteredDistribution should not be Nil. " +
+      "An AllTuples should be used to represent a distribution that only has " +
+      "a single partition.")
+
+  override def createPartitioning(numPartitions: Int): Partitioning = {
+    assert(requiredNumPartitions.isEmpty || requiredNumPartitions.get == numPartitions,

Review comment:
       In sense of "defensive programming", it shouldn't. I just didn't change the implementation of HashClusteredPartition, but now I think it worths to do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801901230



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -90,6 +90,35 @@ case class ClusteredDistribution(
   }
 }
 
+/**
+ * Represents the requirement of distribution on the stateful operator.

Review comment:
       nit: do we need to put "structured streaming" before "stateful operator"?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801341626



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -200,6 +228,11 @@ case object SinglePartition extends Partitioning {
  * Represents a partitioning where rows are split up across partitions based on the hash
  * of `expressions`.  All rows where `expressions` evaluate to the same values are guaranteed to be
  * in the same partition.
+ *
+ * Since [[StatefulOpClusteredDistribution]] relies on this partitioning and Spark requires
+ * stateful operators to retain the same physical partitioning during the lifetime of the query
+ * (including restart), the result of evaluation on `partitionIdExpression` must be unchanged
+ * across Spark versions. Violation of this requirement may bring silent correctness issue.

Review comment:
       shall we enforce this assumption in unit test as well? e.g. in `StreamingJoinSuite`. It's great to highlight in comment here, but people always forget and the unit test will fail loudly when we introduce any invalid change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #35419:
URL: https://github.com/apache/spark/pull/35419#issuecomment-1033094450


   Thanks @HeartSaVioR . LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801362825



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -200,6 +228,11 @@ case object SinglePartition extends Partitioning {
  * Represents a partitioning where rows are split up across partitions based on the hash
  * of `expressions`.  All rows where `expressions` evaluate to the same values are guaranteed to be
  * in the same partition.
+ *
+ * Since [[StatefulOpClusteredDistribution]] relies on this partitioning and Spark requires
+ * stateful operators to retain the same physical partitioning during the lifetime of the query
+ * (including restart), the result of evaluation on `partitionIdExpression` must be unchanged
+ * across Spark versions. Violation of this requirement may bring silent correctness issue.

Review comment:
       We checked HashPartitioning and partitionExpression here - the remaining is partitionIdExpression, which is the implementation of HashPartitioning.
   
   That said, it would be nice if we have a separate test against HashPartitioning if we don't have one. Could you please check and craft one if we don't have it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sunchao commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to all stateful operators

Posted by GitBox <gi...@apache.org>.
sunchao commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801186921



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -90,6 +90,34 @@ case class ClusteredDistribution(
   }
 }
 
+/**
+ * Represents the requirement of distribution on the stateful operator.
+ *
+ * Each partition in stateful operator initializes state store(s), which are independent with state
+ * store(s) in other partitions. Since it is not possible to repartition the data in state store,
+ * Spark should make sure the physical partitioning of the stateful operator is unchanged across
+ * Spark versions. Violation of this requirement may bring silent correctness issue.
+ *
+ * Since this distribution relies on [[HashPartitioning]] on the physical partitioning of the
+ * stateful operator, only [[HashPartitioning]] can satisfy this distribution.
+ */
+case class StatefulOpClusteredDistribution(

Review comment:
       I think one behavior difference between this PR and the state before https://github.com/apache/spark/pull/32875 is that, previously, we'd also check `spark.sql.shuffle.partitions` and insert shuffle if there's not enough parallelism from the input. However, this PR doesn't do that since it skips the step 2) above. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to all stateful operators

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801185466



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -90,6 +90,34 @@ case class ClusteredDistribution(
   }
 }
 
+/**
+ * Represents the requirement of distribution on the stateful operator.
+ *
+ * Each partition in stateful operator initializes state store(s), which are independent with state
+ * store(s) in other partitions. Since it is not possible to repartition the data in state store,
+ * Spark should make sure the physical partitioning of the stateful operator is unchanged across
+ * Spark versions. Violation of this requirement may bring silent correctness issue.
+ *
+ * Since this distribution relies on [[HashPartitioning]] on the physical partitioning of the
+ * stateful operator, only [[HashPartitioning]] can satisfy this distribution.
+ */
+case class StatefulOpClusteredDistribution(

Review comment:
       The problem brought up because ClusteredDistribution has much more relaxed requirement; what we really need to require is that for all children a specific tuple having specific grouping key must be bound to the deterministic partition "ID", which only HashClusteredDistribution could guarantee.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #35419:
URL: https://github.com/apache/spark/pull/35419#issuecomment-1033068920


   @viirya Could you please take a look again? I've reflected your review comments via [753a5b6](https://github.com/apache/spark/pull/35419/commits/753a5b6c62c88f1a9e0318ebcd655cb04beee058)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801366183



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -200,6 +228,11 @@ case object SinglePartition extends Partitioning {
  * Represents a partitioning where rows are split up across partitions based on the hash
  * of `expressions`.  All rows where `expressions` evaluate to the same values are guaranteed to be
  * in the same partition.
+ *
+ * Since [[StatefulOpClusteredDistribution]] relies on this partitioning and Spark requires
+ * stateful operators to retain the same physical partitioning during the lifetime of the query
+ * (including restart), the result of evaluation on `partitionIdExpression` must be unchanged
+ * across Spark versions. Violation of this requirement may bring silent correctness issue.

Review comment:
       Sure I can add one later this week.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sunchao commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to all stateful operators

Posted by GitBox <gi...@apache.org>.
sunchao commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r800875604



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -90,6 +90,34 @@ case class ClusteredDistribution(
   }
 }
 
+/**
+ * Represents the requirement of distribution on the stateful operator.
+ *
+ * Each partition in stateful operator initializes state store(s), which are independent with state
+ * store(s) in other partitions. Since it is not possible to repartition the data in state store,
+ * Spark should make sure the physical partitioning of the stateful operator is unchanged across
+ * Spark versions. Violation of this requirement may bring silent correctness issue.
+ *
+ * Since this distribution relies on [[HashPartitioning]] on the physical partitioning of the
+ * stateful operator, only [[HashPartitioning]] can satisfy this distribution.
+ */
+case class StatefulOpClusteredDistribution(

Review comment:
       I like the new name :) thanks for making it more specific.
   
   Do we also need to update `HashShuffleSpec` so that two `HashPartitioning`s can be compatible with each other when checking against `StatefulOpClusteredDistribution`s? this is the previous behavior where Spark would avoid shuffle if both sides of the streaming join are co-partitioned.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801350561



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -90,6 +90,34 @@ case class ClusteredDistribution(
   }
 }
 
+/**
+ * Represents the requirement of distribution on the stateful operator.
+ *
+ * Each partition in stateful operator initializes state store(s), which are independent with state
+ * store(s) in other partitions. Since it is not possible to repartition the data in state store,
+ * Spark should make sure the physical partitioning of the stateful operator is unchanged across
+ * Spark versions. Violation of this requirement may bring silent correctness issue.
+ *
+ * Since this distribution relies on [[HashPartitioning]] on the physical partitioning of the
+ * stateful operator, only [[HashPartitioning]] can satisfy this distribution.

Review comment:
       `only [[HashPartitioning]] can satisfy this distribution.` -> `only [[HashPartitioning]], and [[PartitioningCollection]] of [[HashPartitioning]] can satisfy this distribution.` ?
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #35419:
URL: https://github.com/apache/spark/pull/35419#issuecomment-1032291084


   > So if we want to split down the problem with stream-stream join and others, I'll move out the changes on other stateful operators, and in there we have to also come up with the plan how to deal with state on existing queries.
   
   +1, SGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #35419:
URL: https://github.com/apache/spark/pull/35419#issuecomment-1032346649


   In the meanwhile I'll think of how to deal with other stateful operators with existing state. One rough idea is having the functionality of validation on state partitioning against child output partitioning, although it is not "future-proof" validation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to all stateful operators

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801168066



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
##########
@@ -337,7 +337,7 @@ case class StateStoreRestoreExec(
     if (keyExpressions.isEmpty) {
       AllTuples :: Nil
     } else {
-      ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil
+      StatefulOpClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil

Review comment:
       But we don't define "repartition just before stateful operator" as "unsupported operation across query lifetime", no?
   
   The thing is that once the query is run, the partitioning of stateful operator must not be changed during lifetime. Since we don't store the information of partitioning against stateful operator in the checkpoint, we have no way around other than enforcing the partitioning of stateful operator as the "one" what we basically expect.
   
   As I said in #32875, there is a room for improvement, but the effort on improvement must be performed after we fix this issue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to all stateful operators

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801168066



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
##########
@@ -337,7 +337,7 @@ case class StateStoreRestoreExec(
     if (keyExpressions.isEmpty) {
       AllTuples :: Nil
     } else {
-      ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil
+      StatefulOpClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil

Review comment:
       But we don't define "repartition just before stateful operator" as "unsupported operation across query lifetime", no?
   
   The thing is that once the query is run once, the partitioning of stateful operator must not be changed during lifetime. Since we don't store the information of partitioning against stateful operator in the checkpoint, we have no way around other than enforcing the partitioning of stateful operator as the "one" what we basically expect.
   
   As I said in #32875, there is a room for improvement, but the effort on improvement must be performed after we fix this issue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to all stateful operators

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801240853



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
##########
@@ -337,7 +337,7 @@ case class StateStoreRestoreExec(
     if (keyExpressions.isEmpty) {
       AllTuples :: Nil
     } else {
-      ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil
+      StatefulOpClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil

Review comment:
       Even nowadays multiple stateful operators don't work properly due to the global watermark, so we don't need to worry about the partitioning between stateful operators. We just need to worry about the partitioning between upstream (non stateful) and the stateful operator.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sunchao commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to all stateful operators

Posted by GitBox <gi...@apache.org>.
sunchao commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801161076



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -90,6 +90,34 @@ case class ClusteredDistribution(
   }
 }
 
+/**
+ * Represents the requirement of distribution on the stateful operator.
+ *
+ * Each partition in stateful operator initializes state store(s), which are independent with state
+ * store(s) in other partitions. Since it is not possible to repartition the data in state store,
+ * Spark should make sure the physical partitioning of the stateful operator is unchanged across
+ * Spark versions. Violation of this requirement may bring silent correctness issue.
+ *
+ * Since this distribution relies on [[HashPartitioning]] on the physical partitioning of the
+ * stateful operator, only [[HashPartitioning]] can satisfy this distribution.
+ */
+case class StatefulOpClusteredDistribution(

Review comment:
       Actually, I think this PR **will skip shuffle** if both sides of a streaming join are co-partitioned. In `EnsureRequirements`, we currently mainly do two things:
   1. check if output partitioning can satisfy the required distribution
   2. if there are two children, check if they are compatible with each other, and insert shuffle if not.
   
   In the step 2) we'd only consider `ClusteredDistribution` at the moment, so in case of `StatefulOpClusteredDistributions` this step is simply skipped. Consequently, Spark will skip shuffle even if only step 1) is successful. 
   
   > State partitioning is the first class, so even both sides of the streaming join are co-partitioned, Spark must perform shuffle if they don't match with state partitioning.
   
   I'm not quite sure about this. Shouldn't we retain the behavior before https://github.com/apache/spark/pull/32875? Quoting [the comment](https://github.com/apache/spark/pull/32875#issuecomment-1023829161) from @cloud-fan:
   
   > I think this is kind of a potential bug. Let's say that we have 2 tables that can report hash partitioning optionally (e.g. controlled by a flag). Assume a streaming query is first run with the flag off, which means the tables do not report hash partitioning, then Spark will add shuffles before the stream-stream join, and the join state (steaming checkpoint) is partitioned by Spark's murmur3 hash function. Then we restart the streaming query with the flag on, and the 2 tables report hash partitioning (not the same as Spark's murmur3). Spark will not add shuffles before stream-stream join this time, and leads to wrong result, because the left/right join child is not co-partitioned with the join state in the previous run.
   
   If we respect co-partitioning and avoid shuffle before https://github.com/apache/spark/pull/32875 but start shuffle after this PR, I think similar issue like described in the comment can happen?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR edited a comment on pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR edited a comment on pull request #35419:
URL: https://github.com/apache/spark/pull/35419#issuecomment-1032292814


   Just updated JIRA ticket, PR description, PR code diff to only contain the change of stream-stream join. Now this PR is effectively partial revert of SPARK-35703, which hasn't been released. That said, this PR doesn't bring any breaking change.
   
   I'll file another JIRA ticket for dealing with other stateful operators.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR edited a comment on pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR edited a comment on pull request #35419:
URL: https://github.com/apache/spark/pull/35419#issuecomment-1032263480


   (I'm reading through #19080, and just realized there has been NO such strict requirement before we introduced HashClusteredDistribution in #19080 if I understand correctly... If then, it doesn't even start from 2.3+. It starts from 2.2+ where we introduced stateful operators in SS.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801904424



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -185,8 +185,8 @@ case class StreamingSymmetricHashJoinExec(
   val nullRight = new GenericInternalRow(right.output.map(_.withNullability(true)).length)
 
   override def requiredChildDistribution: Seq[Distribution] =
-    ClusteredDistribution(leftKeys, stateInfo.map(_.numPartitions)) ::
-      ClusteredDistribution(rightKeys, stateInfo.map(_.numPartitions)) :: Nil
+    StatefulOpClusteredDistribution(leftKeys, stateInfo.map(_.numPartitions)) ::
+      StatefulOpClusteredDistribution(rightKeys, stateInfo.map(_.numPartitions)) :: Nil

Review comment:
       There is other `ClusteredDistribution` usages in statefulOperators, e.g. `ClusteredDistribution`, do we need to update them too? As they are also stateful operators, they also need strict partition requirement?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801904424



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -185,8 +185,8 @@ case class StreamingSymmetricHashJoinExec(
   val nullRight = new GenericInternalRow(right.output.map(_.withNullability(true)).length)
 
   override def requiredChildDistribution: Seq[Distribution] =
-    ClusteredDistribution(leftKeys, stateInfo.map(_.numPartitions)) ::
-      ClusteredDistribution(rightKeys, stateInfo.map(_.numPartitions)) :: Nil
+    StatefulOpClusteredDistribution(leftKeys, stateInfo.map(_.numPartitions)) ::
+      StatefulOpClusteredDistribution(rightKeys, stateInfo.map(_.numPartitions)) :: Nil

Review comment:
       There is other `ClusteredDistribution` usages in statefulOperators, e.g. `ClusteredDistribution`, do we need to update them too?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r802048571



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -90,6 +90,35 @@ case class ClusteredDistribution(
   }
 }
 
+/**
+ * Represents the requirement of distribution on the stateful operator.

Review comment:
       I think "stateful" is already representing the streaming context, but no big deal if we repeat here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801347810



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -200,6 +228,11 @@ case object SinglePartition extends Partitioning {
  * Represents a partitioning where rows are split up across partitions based on the hash
  * of `expressions`.  All rows where `expressions` evaluate to the same values are guaranteed to be
  * in the same partition.
+ *
+ * Since [[StatefulOpClusteredDistribution]] relies on this partitioning and Spark requires
+ * stateful operators to retain the same physical partitioning during the lifetime of the query
+ * (including restart), the result of evaluation on `partitionIdExpression` must be unchanged
+ * across Spark versions. Violation of this requirement may bring silent correctness issue.

Review comment:
       We have a test for verifying this, although it is not exhaustive.
   
   https://github.com/apache/spark/blob/2e703ae9d210d26573b849a9a88c55667b24127d/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala#L574-L605
   
   If we want to be exhaustive, I can make a combination of repartitions which could have not triggered shuffle with hash partitioning against joining keys if stream-stream join uses ClusteredDistribution. It may not be exhaustive for future-proof indeed.
   
   Instead, if we are pretty sure StateOpClusteredDistribution would work as expected, we can simply check the required child distribution of the physical plan of stream-stream join, and additionally check the output partitioning of each child to be HashPartitioning with joining keys (this effectively verifies StateOpClusteredDistribution indeed).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801353321



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -90,6 +90,34 @@ case class ClusteredDistribution(
   }
 }
 
+/**
+ * Represents the requirement of distribution on the stateful operator.
+ *
+ * Each partition in stateful operator initializes state store(s), which are independent with state
+ * store(s) in other partitions. Since it is not possible to repartition the data in state store,
+ * Spark should make sure the physical partitioning of the stateful operator is unchanged across
+ * Spark versions. Violation of this requirement may bring silent correctness issue.
+ *
+ * Since this distribution relies on [[HashPartitioning]] on the physical partitioning of the
+ * stateful operator, only [[HashPartitioning]] can satisfy this distribution.

Review comment:
       Where to leave a comment is the issue. It is unlikely that we often look at StatefulOpClusteredDistribution - probably giving more chance to get attention if we put a comment on every stateful operators wherever using ClusteredDistribution. Totally redundant, but gives a sign of warning whenever they try to change it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to all stateful operators

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801260669



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -90,6 +90,34 @@ case class ClusteredDistribution(
   }
 }
 
+/**
+ * Represents the requirement of distribution on the stateful operator.
+ *
+ * Each partition in stateful operator initializes state store(s), which are independent with state
+ * store(s) in other partitions. Since it is not possible to repartition the data in state store,
+ * Spark should make sure the physical partitioning of the stateful operator is unchanged across
+ * Spark versions. Violation of this requirement may bring silent correctness issue.
+ *
+ * Since this distribution relies on [[HashPartitioning]] on the physical partitioning of the
+ * stateful operator, only [[HashPartitioning]] can satisfy this distribution.
+ */
+case class StatefulOpClusteredDistribution(

Review comment:
       Yes, `StatefulOpClusteredDistributions` is very strict and requires numPartitions as well. I don't think we need extra co-partition check for it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to all stateful operators

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801171476



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
##########
@@ -337,7 +337,7 @@ case class StateStoreRestoreExec(
     if (keyExpressions.isEmpty) {
       AllTuples :: Nil
     } else {
-      ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil
+      StatefulOpClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil

Review comment:
       > But we don't define "repartition just before stateful operator" as "unsupported operation across query lifetime", no?
   
   
   I ran the query in `StreamingAggregationSuite.scala` and it seems fine. I briefly checked `UnsupportedOperationChecker.scala` and didn't find we disallow "repartition just before stateful operator".
   
   > The thing is that once the query is run, the partitioning of stateful operator must not be changed during lifetime. Since we don't store the information of partitioning against stateful operator in the checkpoint, we have no way around other than enforcing the partitioning of stateful operator as the "one" what we basically expect.
   
   
   I agree with you @HeartSaVioR. I want to raise a concern here it might change the query plan for certain streaming aggregate query (as above synthetic query), and it could break existing state store when running with next Spark 3.3 code, based on my limited understanding.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801347810



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -200,6 +228,11 @@ case object SinglePartition extends Partitioning {
  * Represents a partitioning where rows are split up across partitions based on the hash
  * of `expressions`.  All rows where `expressions` evaluate to the same values are guaranteed to be
  * in the same partition.
+ *
+ * Since [[StatefulOpClusteredDistribution]] relies on this partitioning and Spark requires
+ * stateful operators to retain the same physical partitioning during the lifetime of the query
+ * (including restart), the result of evaluation on `partitionIdExpression` must be unchanged
+ * across Spark versions. Violation of this requirement may bring silent correctness issue.

Review comment:
       We have a test for verifying this, although it is not exhaustive.
   
   https://github.com/apache/spark/blob/2e703ae9d210d26573b849a9a88c55667b24127d/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala#L574-L605
   
   If we want to be exhaustive, I can make a combination of repartitions which could have not triggered shuffle with hash partitioning against joining keys if stream-stream join uses ClusteredDistribution. It may not be exhaustive for future-proof indeed.
   
   Instead, if we are pretty sure StateOpClusteredDistribution would work as expected, we can simply check the required child distribution of the physical plan of stream-stream join, and check the output partitioning of each child to be HashPartitioning with joining keys.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to all stateful operators

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #35419:
URL: https://github.com/apache/spark/pull/35419#issuecomment-1032225949


   I see the concern of fixing stateful operators due to the state on existing queries. stream-stream join has been using HashClusteredDistribution and it shouldn't suffer this long standing problem.
   
   So if we want to split down the problem with stream-stream join and others, I'll move out the changes on other stateful operators, and in there we have to also come up with the plan how to deal with state on existing queries.
   
   Would it work for all?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sunchao commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
sunchao commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801875087



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -90,6 +90,34 @@ case class ClusteredDistribution(
   }
 }
 
+/**
+ * Represents the requirement of distribution on the stateful operator.
+ *
+ * Each partition in stateful operator initializes state store(s), which are independent with state
+ * store(s) in other partitions. Since it is not possible to repartition the data in state store,
+ * Spark should make sure the physical partitioning of the stateful operator is unchanged across
+ * Spark versions. Violation of this requirement may bring silent correctness issue.
+ *
+ * Since this distribution relies on [[HashPartitioning]] on the physical partitioning of the
+ * stateful operator, only [[HashPartitioning]] can satisfy this distribution.
+ */
+case class StatefulOpClusteredDistribution(

Review comment:
       I see, it should be good then!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to all stateful operators

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801135672



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -90,6 +90,34 @@ case class ClusteredDistribution(
   }
 }
 
+/**
+ * Represents the requirement of distribution on the stateful operator.
+ *
+ * Each partition in stateful operator initializes state store(s), which are independent with state
+ * store(s) in other partitions. Since it is not possible to repartition the data in state store,
+ * Spark should make sure the physical partitioning of the stateful operator is unchanged across
+ * Spark versions. Violation of this requirement may bring silent correctness issue.
+ *
+ * Since this distribution relies on [[HashPartitioning]] on the physical partitioning of the
+ * stateful operator, only [[HashPartitioning]] can satisfy this distribution.
+ */
+case class StatefulOpClusteredDistribution(

Review comment:
       > Do we also need to update HashShuffleSpec so that two HashPartitionings can be compatible with each other when checking against StatefulOpClusteredDistributions? this is the previous behavior where Spark would avoid shuffle if both sides of the streaming join are co-partitioned.
   
   Each input must follow the required distribution provided from stateful operator to respect the requirement of state partitioning. state partitioning is the first class, so even both sides of the streaming join are co-partitioned, Spark must perform shuffle if they don't match with state partitioning. (If that was the previous behavior, we broke something at some time point.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to all stateful operators

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801168066



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
##########
@@ -337,7 +337,7 @@ case class StateStoreRestoreExec(
     if (keyExpressions.isEmpty) {
       AllTuples :: Nil
     } else {
-      ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil
+      StatefulOpClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil

Review comment:
       But we don't define "repartition" as "unsupported operation across query lifetime", no?
   
   The thing is that once the query is run once, the partitioning of stateful operator must not be changed during lifetime. Since we don't store the information of partitioning against stateful operator in the checkpoint, we have no way around other than enforcing the partitioning of stateful operator as the "one" what we basically expect.
   
   As I said in #32875, there is a room for improvement, but the effort on improvement must be performed after we fix this issue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to all stateful operators

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801184382



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -90,6 +90,34 @@ case class ClusteredDistribution(
   }
 }
 
+/**
+ * Represents the requirement of distribution on the stateful operator.
+ *
+ * Each partition in stateful operator initializes state store(s), which are independent with state
+ * store(s) in other partitions. Since it is not possible to repartition the data in state store,
+ * Spark should make sure the physical partitioning of the stateful operator is unchanged across
+ * Spark versions. Violation of this requirement may bring silent correctness issue.
+ *
+ * Since this distribution relies on [[HashPartitioning]] on the physical partitioning of the
+ * stateful operator, only [[HashPartitioning]] can satisfy this distribution.
+ */
+case class StatefulOpClusteredDistribution(

Review comment:
       > 1. check if output partitioning can satisfy the required distribution
   
   For stream-stream join, once each input satisfy the required "hash" distribution of each, they will be co-partitioned. stream-stream join must guarantee this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to all stateful operators

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #35419:
URL: https://github.com/apache/spark/pull/35419#issuecomment-1031242871


   Implementation-wise, the PR is ready to review.
   cc. @cloud-fan @sunchao @viirya @HyukjinKwon @c21 who are listed as reviewers of #32875.
   cc. @tdas @zsxwing @brkyvz @jose-torres @xuanyuanking as reviewers on streaming area.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to all stateful operators

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #35419:
URL: https://github.com/apache/spark/pull/35419#issuecomment-1031234038


   TODO:
   I'll fix the build and make sure tests are passing.
   I'll also try to see whether I can add tests for stateful operators to see whether the physical partitioning is consistent with several pre-partitioning.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR edited a comment on pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR edited a comment on pull request #35419:
URL: https://github.com/apache/spark/pull/35419#issuecomment-1032292814


   Just updated JIRA ticket, PR description, PR code diff to only contain the change of stream-stream join. 
   
   Now this PR is effectively a partial revert of SPARK-35703, which hasn't been released. That said, this PR doesn't bring any breaking change.
   
   I'll file another JIRA ticket for dealing with other stateful operators.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR edited a comment on pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR edited a comment on pull request #35419:
URL: https://github.com/apache/spark/pull/35419#issuecomment-1032292814


   Just updated JIRA ticket, PR description, PR code diff to only contain the change of stream-stream join. 
   
   Now this PR is effectively partial revert of SPARK-35703, which hasn't been released. That said, this PR doesn't bring any breaking change.
   
   I'll file another JIRA ticket for dealing with other stateful operators.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org