You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "eejbyfeldt (via GitHub)" <gi...@apache.org> on 2023/10/18 13:58:16 UTC

[PR] [SPARK-45592][SQL][WIP] Correctness issue in AQE with InMemoryTableScanExec [spark]

eejbyfeldt opened a new pull request, #43435:
URL: https://github.com/apache/spark/pull/43435

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   Potential fix for correctness issue in 3.5.0
   
   I bisected the bug to being introduced in this PR https://github.com/apache/spark/pull/40522  I do not understand this issue fully to know if this is the best way of fixing it.
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   Correctness bug.
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   Yes, fixed correctness issue.
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   New and existing unit test.
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No
   <!--
   If generative AI tooling has been used in the process of authoring this patch, please include the
   phrase: 'Generated-by: ' followed by the name of the tool and its version.
   If no, write 'No'.
   Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "eejbyfeldt (via GitHub)" <gi...@apache.org>.
eejbyfeldt commented on PR #43435:
URL: https://github.com/apache/spark/pull/43435#issuecomment-1784782064

   > We don't need this hack anymore and can safely remove the `if` branch.
   
   Is the suggestion to do that in this PR or is it better to do it in a follow up?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43435:
URL: https://github.com/apache/spark/pull/43435#discussion_r1375881810


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -306,6 +306,34 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
 
   override protected def withNewChildrenInternal(
     newChildren: IndexedSeq[Expression]): HashPartitioning = copy(expressions = newChildren)
+
+}
+
+case class CoalescedBoundary(startReducerIndex: Int, endReducerIndex: Int)
+
+/**
+ * Represents a partitioning where partitions have been coalesced from a HashPartitioning into a
+ * fewer number of partitions.
+ */
+case class CoalescedHashPartitioning(from: HashPartitioning, partitions: Seq[CoalescedBoundary])
+  extends Expression with Partitioning with Unevaluable {
+
+  override def children: Seq[Expression] = from.expressions
+  override def nullable: Boolean = from.nullable
+  override def dataType: DataType = from.dataType
+
+  override def satisfies0(required: Distribution): Boolean = from.satisfies0(required)
+
+  override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec =
+    CoalescedHashShuffleSpec(from.createShuffleSpec(distribution), partitions)
+
+  override protected def withNewChildrenInternal(
+    newChildren: IndexedSeq[Expression]): CoalescedHashPartitioning =
+      copy(from = from.copy(expressions = newChildren))
+
+  override val numPartitions: Int = partitions.length
+
+  override def stringArgs: Iterator[Any] = Iterator(from)

Review Comment:
   this looks weird, maybe just overwrite `toString` and `sql` to call `from.toString/sql`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL][WIP] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #43435:
URL: https://github.com/apache/spark/pull/43435#issuecomment-1772807296

   @ulysses-you can you take a look when you have time?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43435:
URL: https://github.com/apache/spark/pull/43435#discussion_r1375883625


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala:
##########
@@ -62,211 +62,254 @@ class ShuffleSpecSuite extends SparkFunSuite with SQLHelper {
     }
   }
 
-  test("compatibility: HashShuffleSpec on both sides") {
-    checkCompatible(
-      HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
-        ClusteredDistribution(Seq($"a", $"b"))),
-      HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10),
-        ClusteredDistribution(Seq($"a", $"b"))),
-      expected = true
-    )
-
-    checkCompatible(
-      HashShuffleSpec(HashPartitioning(Seq($"a"), 10), ClusteredDistribution(Seq($"a", $"b"))),
-      HashShuffleSpec(HashPartitioning(Seq($"a"), 10), ClusteredDistribution(Seq($"a", $"b"))),
-      expected = true
-    )
+  private def testHashShuffleSpecLike(
+    shuffleSpecName: String,

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #43435:
URL: https://github.com/apache/spark/pull/43435#issuecomment-1787597375

   Thank you, @eejbyfeldt and all.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "ulysses-you (via GitHub)" <gi...@apache.org>.
ulysses-you commented on code in PR #43435:
URL: https://github.com/apache/spark/pull/43435#discussion_r1375942459


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -708,6 +736,26 @@ case class HashShuffleSpec(
   override def numPartitions: Int = partitioning.numPartitions
 }
 
+case class CoalescedHashShuffleSpec(
+    from: ShuffleSpec,
+    @transient partitions: Seq[CoalescedBoundary]) extends ShuffleSpec {

Review Comment:
   I thought it would be holden at some place... After some search, it seems ShuffleSpec is created dynamically so it's unnecessary to consider ser/de. We can remove it @transient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #43435:
URL: https://github.com/apache/spark/pull/43435#issuecomment-1784788544

   @eejbyfeldt nvm, I made a mistake. This is for coalesce, we can add a new partitioning for skew join handling (split and replicate partitions). It's unrelated to this PR and we can do it latter.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL][WIP] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "eejbyfeldt (via GitHub)" <gi...@apache.org>.
eejbyfeldt commented on code in PR #43435:
URL: https://github.com/apache/spark/pull/43435#discussion_r1368212280


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -700,14 +723,38 @@ case class HashShuffleSpec(
     }
   }
 
-  override def createPartitioning(clustering: Seq[Expression]): Partitioning = {
+  override def createPartitioning(clustering: Seq[Expression]): HashPartitioning = {
     val exprs = hashKeyPositions.map(v => clustering(v.head))
     HashPartitioning(exprs, partitioning.numPartitions)
   }
 
   override def numPartitions: Int = partitioning.numPartitions
 }
 
+case class CoalescedHashShuffleSpec(
+    from: HashShuffleSpec,
+    partitions: Seq[CoalescedBoundary]) extends ShuffleSpec {
+
+  override def isCompatibleWith(other: ShuffleSpec): Boolean = other match {
+    case SinglePartitionShuffleSpec =>
+      numPartitions == 1
+    case CoalescedHashShuffleSpec(otherParent, otherPartitions) =>
+      partitions == otherPartitions &&
+      from.isCompatibleWith(otherParent)
+    case ShuffleSpecCollection(specs) =>
+      specs.exists(isCompatibleWith)
+    case _ =>
+      false
+  }
+
+  override def canCreatePartitioning: Boolean = from.canCreatePartitioning

Review Comment:
   That make sense, removed this and that made the spec reproducing the bug pass.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "eejbyfeldt (via GitHub)" <gi...@apache.org>.
eejbyfeldt commented on code in PR #43435:
URL: https://github.com/apache/spark/pull/43435#discussion_r1368512972


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -306,6 +306,34 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
 
   override protected def withNewChildrenInternal(
     newChildren: IndexedSeq[Expression]): HashPartitioning = copy(expressions = newChildren)
+
+}
+
+case class CoalescedBoundary(startReducerIndex: Int, endReducerIndex: Int)
+
+/**
+ * Represents a partitioning where partitions have been coalesced from a HashPartitioning into a
+ * fewer number of partitions.
+ */
+case class CoalescedHashPartitioning(from: HashPartitioning, partitions: Seq[CoalescedBoundary])

Review Comment:
   Probably. What is the reason behind it? Is that we never intentionally what to serialize this class and if we do we will not really use it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "ulysses-you (via GitHub)" <gi...@apache.org>.
ulysses-you commented on code in PR #43435:
URL: https://github.com/apache/spark/pull/43435#discussion_r1375905803


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -708,6 +736,26 @@ case class HashShuffleSpec(
   override def numPartitions: Int = partitioning.numPartitions
 }
 
+case class CoalescedHashShuffleSpec(
+    from: ShuffleSpec,
+    @transient partitions: Seq[CoalescedBoundary]) extends ShuffleSpec {

Review Comment:
   It is to avoid unnecessary ser/de. We only use it to check if compatible with other shuffle spec at driver side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "eejbyfeldt (via GitHub)" <gi...@apache.org>.
eejbyfeldt commented on code in PR #43435:
URL: https://github.com/apache/spark/pull/43435#discussion_r1375895171


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -708,6 +736,26 @@ case class HashShuffleSpec(
   override def numPartitions: Int = partitioning.numPartitions
 }
 
+case class CoalescedHashShuffleSpec(
+    from: ShuffleSpec,
+    @transient partitions: Seq[CoalescedBoundary]) extends ShuffleSpec {

Review Comment:
   I added because of the suggestion of @ulysses-you here https://github.com/apache/spark/pull/43435#discussion_r1368494061



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL][WIP] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "ulysses-you (via GitHub)" <gi...@apache.org>.
ulysses-you commented on code in PR #43435:
URL: https://github.com/apache/spark/pull/43435#discussion_r1368085648


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -700,14 +723,38 @@ case class HashShuffleSpec(
     }
   }
 
-  override def createPartitioning(clustering: Seq[Expression]): Partitioning = {
+  override def createPartitioning(clustering: Seq[Expression]): HashPartitioning = {
     val exprs = hashKeyPositions.map(v => clustering(v.head))
     HashPartitioning(exprs, partitioning.numPartitions)
   }
 
   override def numPartitions: Int = partitioning.numPartitions
 }
 
+case class CoalescedHashShuffleSpec(
+    from: HashShuffleSpec,
+    partitions: Seq[CoalescedBoundary]) extends ShuffleSpec {
+
+  override def isCompatibleWith(other: ShuffleSpec): Boolean = other match {
+    case SinglePartitionShuffleSpec =>
+      numPartitions == 1
+    case CoalescedHashShuffleSpec(otherParent, otherPartitions) =>
+      partitions == otherPartitions &&
+      from.isCompatibleWith(otherParent)
+    case ShuffleSpecCollection(specs) =>
+      specs.exists(isCompatibleWith)
+    case _ =>
+      false
+  }
+
+  override def canCreatePartitioning: Boolean = from.canCreatePartitioning

Review Comment:
   it can not create partitioning as it's not a standard hash partition. Otherwise `EnsureRequirememts` will take the created partitioning as the candidates partitioning.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL][WIP] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "eejbyfeldt (via GitHub)" <gi...@apache.org>.
eejbyfeldt commented on code in PR #43435:
URL: https://github.com/apache/spark/pull/43435#discussion_r1368213333


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -295,19 +284,53 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
     }
   }
 
-  override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec =
-    HashShuffleSpec(this, distribution)
-
   /**
    * Returns an expression that will produce a valid partition ID(i.e. non-negative and is less
    * than numPartitions) based on hashing expressions.
    */
   def partitionIdExpression: Expression = Pmod(new Murmur3Hash(expressions), Literal(numPartitions))
+}
+
+/**
+ * Represents a partitioning where rows are split up across partitions based on the hash
+ * of `expressions`.  All rows where `expressions` evaluate to the same values are guaranteed to be
+ * in the same partition.
+ *
+ * Since [[StatefulOpClusteredDistribution]] relies on this partitioning and Spark requires
+ * stateful operators to retain the same physical partitioning during the lifetime of the query
+ * (including restart), the result of evaluation on `partitionIdExpression` must be unchanged
+ * across Spark versions. Violation of this requirement may bring silent correctness issue.
+ */
+case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
+  extends HashPartitioningBase {
+
+  override def createShuffleSpec(distribution: ClusteredDistribution): HashShuffleSpec =
+    HashShuffleSpec(this, distribution)
 
   override protected def withNewChildrenInternal(
     newChildren: IndexedSeq[Expression]): HashPartitioning = copy(expressions = newChildren)
 }
 
+case class CoalescedBoundary(startReducerIndex: Int, endReducerIndex: Int)
+
+/**
+ * Represents a partitioning where partitions have been coalesced from a HashPartitioning into a
+ * fewer number of partitions.
+ */
+case class CoalescedHashPartitioning(from: HashPartitioning, partitions: Seq[CoalescedBoundary])

Review Comment:
   I added an override for `stringArgs` Is that the desired way to address this? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL][WIP] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "eejbyfeldt (via GitHub)" <gi...@apache.org>.
eejbyfeldt commented on PR #43435:
URL: https://github.com/apache/spark/pull/43435#issuecomment-1772360493

   @cloud-fan I too a stab at implementing your suggestion, but the reproduction of the bug still fails. So either I made some mistake or missed some other part of the code that needs to be updated. Would be great if you could provide some feedback. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #43435:
URL: https://github.com/apache/spark/pull/43435#issuecomment-1786374844

   The failed streaming test is unrelated, and my last comment is quite minor, let's merge it first to fix the correctness bug. Thanks for you great work!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL][WIP] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "ulysses-you (via GitHub)" <gi...@apache.org>.
ulysses-you commented on code in PR #43435:
URL: https://github.com/apache/spark/pull/43435#discussion_r1368490194


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -295,7 +295,7 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
     }
   }
 
-  override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec =
+  override def createShuffleSpec(distribution: ClusteredDistribution): HashShuffleSpec =

Review Comment:
   unnecessary change



##########
sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala:
##########
@@ -71,6 +71,19 @@ class DatasetSuite extends QueryTest
 
   private implicit val ordering = Ordering.by((c: ClassData) => c.a -> c.b)
 
+  test("SPARK-45592: correctness issue") {

Review Comment:
   please refine the suite name



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -306,6 +306,34 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
 
   override protected def withNewChildrenInternal(
     newChildren: IndexedSeq[Expression]): HashPartitioning = copy(expressions = newChildren)
+
+}
+
+case class CoalescedBoundary(startReducerIndex: Int, endReducerIndex: Int)
+
+/**
+ * Represents a partitioning where partitions have been coalesced from a HashPartitioning into a
+ * fewer number of partitions.
+ */
+case class CoalescedHashPartitioning(from: HashPartitioning, partitions: Seq[CoalescedBoundary])

Review Comment:
   shall we add `@transient` for `partitions` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43435:
URL: https://github.com/apache/spark/pull/43435#discussion_r1375884851


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala:
##########
@@ -75,7 +75,13 @@ case class AQEShuffleReadExec private(
       // partitions is changed.
       child.outputPartitioning match {
         case h: HashPartitioning =>
-          CurrentOrigin.withOrigin(h.origin)(h.copy(numPartitions = partitionSpecs.length))
+          val partitions = partitionSpecs.map {
+            case CoalescedPartitionSpec(start, end, _) => CoalescedBoundary(start, end)
+            // Can not happend due to isCoalescedRead
+            case unexpected =>
+              throw new RuntimeException(s"Unexpected ShufflePartitionSpec: $unexpected")

Review Comment:
   use `SparkException.internalError` to indicate bugs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "maryannxue (via GitHub)" <gi...@apache.org>.
maryannxue commented on PR #43435:
URL: https://github.com/apache/spark/pull/43435#issuecomment-1785553724

   @eejbyfeldt Can you briefly describe the triggering condition of this bug? Does it only occur when coalescing happens to produce just the exact number of partitions as the other side of the join?
   
   In the meantime, I'm wondering if it would be better to:
   1. not coalesce for the top/last shuffle of the physical plan of InMemoryTableScan
   2. have coalesce rule deal with `InMemoryTableScan` from the caller side (user of the cache)
   
   This PR, just to address the correctness issue, only needs to do 1. And we can do 2 (a little trickier I suppose) for performance improvement.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "JJACOB0806 (via GitHub)" <gi...@apache.org>.
JJACOB0806 commented on PR #43435:
URL: https://github.com/apache/spark/pull/43435#issuecomment-1888462184

   Hello, is there a timeline for 3.5.1 release? We are facing the issue in 3.5.0 and would like to know when the next stable version will be rolled out.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL][WIP] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #43435:
URL: https://github.com/apache/spark/pull/43435#issuecomment-1769985482

   `AQEShuffleRead` should probably return a different partitioning, e.g. `CoalescedHashPartitioning`. It still satisfies `ClusterDistribution`, so `Aggregate` is fine and there will be no shuffle. For joins, two `CoalescedHashPartitioning`s are compatible if they have the same original partition number and coalesce boundaries, and `CoalescedHashPartitioning` is not compatible with `HashPartitioning`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43435:
URL: https://github.com/apache/spark/pull/43435#discussion_r1375883216


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala:
##########
@@ -146,63 +146,75 @@ class DistributionSuite extends SparkFunSuite {
       false)
   }
 
-  test("HashPartitioning is the output partitioning") {
-    // HashPartitioning can satisfy ClusteredDistribution iff its hash expressions are a subset of
-    // the required clustering expressions.
-    checkSatisfied(
-      HashPartitioning(Seq($"a", $"b", $"c"), 10),
-      ClusteredDistribution(Seq($"a", $"b", $"c")),
-      true)
-
-    checkSatisfied(
-      HashPartitioning(Seq($"b", $"c"), 10),
-      ClusteredDistribution(Seq($"a", $"b", $"c")),
-      true)
-
-    checkSatisfied(
-      HashPartitioning(Seq($"a", $"b", $"c"), 10),
-      ClusteredDistribution(Seq($"b", $"c")),
-      false)
-
-    checkSatisfied(
-      HashPartitioning(Seq($"a", $"b", $"c"), 10),
-      ClusteredDistribution(Seq($"d", $"e")),
-      false)
-
-    // When ClusteredDistribution.requireAllClusterKeys is set to true,
-    // HashPartitioning can only satisfy ClusteredDistribution iff its hash expressions are
-    // exactly same as the required clustering expressions.
-    checkSatisfied(
-      HashPartitioning(Seq($"a", $"b", $"c"), 10),
-      ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = true),
-      true)
-
-    checkSatisfied(
-      HashPartitioning(Seq($"b", $"c"), 10),
-      ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = true),
-      false)
-
-    checkSatisfied(
-      HashPartitioning(Seq($"b", $"a", $"c"), 10),
-      ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = true),
-      false)
-
-    // HashPartitioning cannot satisfy OrderedDistribution
-    checkSatisfied(
-      HashPartitioning(Seq($"a", $"b", $"c"), 10),
-      OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)),
-      false)
+  private def testHashPartitioningLike(
+    partitioningName: String,

Review Comment:
   nit: 4 spaces indentation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43435:
URL: https://github.com/apache/spark/pull/43435#discussion_r1376174938


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -306,6 +306,35 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
 
   override protected def withNewChildrenInternal(
     newChildren: IndexedSeq[Expression]): HashPartitioning = copy(expressions = newChildren)
+
+}
+
+case class CoalescedBoundary(startReducerIndex: Int, endReducerIndex: Int)
+
+/**
+ * Represents a partitioning where partitions have been coalesced from a HashPartitioning into a
+ * fewer number of partitions.
+ */
+case class CoalescedHashPartitioning(from: HashPartitioning, partitions: Seq[CoalescedBoundary])
+  extends Expression with Partitioning with Unevaluable {
+
+  override def children: Seq[Expression] = from.expressions
+  override def nullable: Boolean = from.nullable
+  override def dataType: DataType = from.dataType
+
+  override def satisfies0(required: Distribution): Boolean = from.satisfies0(required)
+
+  override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec =
+    CoalescedHashShuffleSpec(from.createShuffleSpec(distribution), partitions)
+
+  override protected def withNewChildrenInternal(
+    newChildren: IndexedSeq[Expression]): CoalescedHashPartitioning =
+      copy(from = from.copy(expressions = newChildren))
+
+  override val numPartitions: Int = partitions.length
+
+  override def toString: String = from.toString
+  override def sql: String = from.sql

Review Comment:
   After a second thought, why do we need to hide `CoalescedHashPartitioning`? Can we run some example queries and check EXPLAIN and SQL web UI?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "maryannxue (via GitHub)" <gi...@apache.org>.
maryannxue commented on PR #43435:
URL: https://github.com/apache/spark/pull/43435#issuecomment-1786266047

   Synced with @cloud-fan offline, (2) in the above suggestion wouldn't work. Let's go ahead with current fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL][WIP] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "ulysses-you (via GitHub)" <gi...@apache.org>.
ulysses-you commented on PR #43435:
URL: https://github.com/apache/spark/pull/43435#issuecomment-1769841139

   I think the issue is that, we propagate a coalesced shuffle exchange through `InMemoryTableScanExec`, and then the `EnsureRequirements` use the coalesced shuffle exchange to create other side shuffle exchange.
   However, the shuffle exchanges are actually not compatible. i.e., One side shuffle is from `HashPartitioning(200)` and then coalesce to  `HashPartitioning(10)` and other side shuffle is `HashPartitioning(10)`. So it causes the join data issue.
   ```
                         Scan
                          |
                      Shuffle(200)
                          |
     Scan           AQEShuffleRead(10)
      |                   |
   Shuffle(10)   InMemoryTableScanExec
       \            /
            Join    
   ```
   
   BTW, if you set `spark.sql.shuffle.partitions=5` , I think this issue should be resolved.
   
   There are two code place related to this issue:
   1. `AQEShuffleRead` always think the coalesced partitioning is not changed, so just refresh the partition number. I think it is based on the assumption that all the initial shuffle partition numbers are same but it seems not. The `EnsureRequirements` support `shouldConsiderMinParallelism` which cause different initial shuffle partition number in one query execution.
   2. The `InMemoryTableScanExec` propagates the output partitioning. `InMemoryTableScanExec` would introduce one more query execution which also breaks the assumption of `AQEShuffleRead`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL][WIP] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "ulysses-you (via GitHub)" <gi...@apache.org>.
ulysses-you commented on code in PR #43435:
URL: https://github.com/apache/spark/pull/43435#discussion_r1368087357


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -295,19 +284,53 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
     }
   }
 
-  override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec =
-    HashShuffleSpec(this, distribution)
-
   /**
    * Returns an expression that will produce a valid partition ID(i.e. non-negative and is less
    * than numPartitions) based on hashing expressions.
    */
   def partitionIdExpression: Expression = Pmod(new Murmur3Hash(expressions), Literal(numPartitions))
+}
+
+/**
+ * Represents a partitioning where rows are split up across partitions based on the hash
+ * of `expressions`.  All rows where `expressions` evaluate to the same values are guaranteed to be
+ * in the same partition.
+ *
+ * Since [[StatefulOpClusteredDistribution]] relies on this partitioning and Spark requires
+ * stateful operators to retain the same physical partitioning during the lifetime of the query
+ * (including restart), the result of evaluation on `partitionIdExpression` must be unchanged
+ * across Spark versions. Violation of this requirement may bring silent correctness issue.
+ */
+case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
+  extends HashPartitioningBase {
+
+  override def createShuffleSpec(distribution: ClusteredDistribution): HashShuffleSpec =
+    HashShuffleSpec(this, distribution)
 
   override protected def withNewChildrenInternal(
     newChildren: IndexedSeq[Expression]): HashPartitioning = copy(expressions = newChildren)
 }
 
+case class CoalescedBoundary(startReducerIndex: Int, endReducerIndex: Int)
+
+/**
+ * Represents a partitioning where partitions have been coalesced from a HashPartitioning into a
+ * fewer number of partitions.
+ */
+case class CoalescedHashPartitioning(from: HashPartitioning, partitions: Seq[CoalescedBoundary])

Review Comment:
   We'd better to avoid show the details of `partitions` for explain and UI.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL][WIP] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "ulysses-you (via GitHub)" <gi...@apache.org>.
ulysses-you commented on code in PR #43435:
URL: https://github.com/apache/spark/pull/43435#discussion_r1368085777


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -700,14 +723,38 @@ case class HashShuffleSpec(
     }
   }
 
-  override def createPartitioning(clustering: Seq[Expression]): Partitioning = {
+  override def createPartitioning(clustering: Seq[Expression]): HashPartitioning = {
     val exprs = hashKeyPositions.map(v => clustering(v.head))
     HashPartitioning(exprs, partitioning.numPartitions)
   }
 
   override def numPartitions: Int = partitioning.numPartitions
 }
 
+case class CoalescedHashShuffleSpec(
+    from: HashShuffleSpec,
+    partitions: Seq[CoalescedBoundary]) extends ShuffleSpec {
+
+  override def isCompatibleWith(other: ShuffleSpec): Boolean = other match {
+    case SinglePartitionShuffleSpec =>
+      numPartitions == 1
+    case CoalescedHashShuffleSpec(otherParent, otherPartitions) =>
+      partitions == otherPartitions &&
+      from.isCompatibleWith(otherParent)
+    case ShuffleSpecCollection(specs) =>
+      specs.exists(isCompatibleWith)
+    case _ =>
+      false
+  }
+
+  override def canCreatePartitioning: Boolean = from.canCreatePartitioning
+
+  override def createPartitioning(clustering: Seq[Expression]): Partitioning =
+    CoalescedHashPartitioning(from.createPartitioning(clustering), partitions)

Review Comment:
   So we do not need override `createPartitioning` since we can not create partitioning.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL][WIP] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "eejbyfeldt (via GitHub)" <gi...@apache.org>.
eejbyfeldt commented on code in PR #43435:
URL: https://github.com/apache/spark/pull/43435#discussion_r1368213756


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -700,14 +723,38 @@ case class HashShuffleSpec(
     }
   }
 
-  override def createPartitioning(clustering: Seq[Expression]): Partitioning = {
+  override def createPartitioning(clustering: Seq[Expression]): HashPartitioning = {

Review Comment:
   Not needed, just me being sloppy when trying out some other approach. reverted.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -700,14 +723,38 @@ case class HashShuffleSpec(
     }
   }
 
-  override def createPartitioning(clustering: Seq[Expression]): Partitioning = {
+  override def createPartitioning(clustering: Seq[Expression]): HashPartitioning = {

Review Comment:
   Not needed, just me being sloppy when trying out some other approach. Reverted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL][WIP] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "eejbyfeldt (via GitHub)" <gi...@apache.org>.
eejbyfeldt commented on code in PR #43435:
URL: https://github.com/apache/spark/pull/43435#discussion_r1368214012


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -700,14 +723,38 @@ case class HashShuffleSpec(
     }
   }
 
-  override def createPartitioning(clustering: Seq[Expression]): Partitioning = {
+  override def createPartitioning(clustering: Seq[Expression]): HashPartitioning = {
     val exprs = hashKeyPositions.map(v => clustering(v.head))
     HashPartitioning(exprs, partitioning.numPartitions)
   }
 
   override def numPartitions: Int = partitioning.numPartitions
 }
 
+case class CoalescedHashShuffleSpec(
+    from: HashShuffleSpec,
+    partitions: Seq[CoalescedBoundary]) extends ShuffleSpec {
+
+  override def isCompatibleWith(other: ShuffleSpec): Boolean = other match {
+    case SinglePartitionShuffleSpec =>
+      numPartitions == 1
+    case CoalescedHashShuffleSpec(otherParent, otherPartitions) =>
+      partitions == otherPartitions &&
+      from.isCompatibleWith(otherParent)
+    case ShuffleSpecCollection(specs) =>
+      specs.exists(isCompatibleWith)
+    case _ =>
+      false
+  }
+
+  override def canCreatePartitioning: Boolean = from.canCreatePartitioning
+
+  override def createPartitioning(clustering: Seq[Expression]): Partitioning =
+    CoalescedHashPartitioning(from.createPartitioning(clustering), partitions)

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan closed pull request #43435: [SPARK-45592][SQL] Correctness issue in AQE with InMemoryTableScanExec
URL: https://github.com/apache/spark/pull/43435


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43435:
URL: https://github.com/apache/spark/pull/43435#discussion_r1375882836


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -708,6 +736,26 @@ case class HashShuffleSpec(
   override def numPartitions: Int = partitioning.numPartitions
 }
 
+case class CoalescedHashShuffleSpec(
+    from: ShuffleSpec,
+    @transient partitions: Seq[CoalescedBoundary]) extends ShuffleSpec {

Review Comment:
   what's the rationale of `@transient` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "eejbyfeldt (via GitHub)" <gi...@apache.org>.
eejbyfeldt commented on code in PR #43435:
URL: https://github.com/apache/spark/pull/43435#discussion_r1368507835


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -295,7 +295,7 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
     }
   }
 
-  override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec =
+  override def createShuffleSpec(distribution: ClusteredDistribution): HashShuffleSpec =

Review Comment:
   The refined type is used in `createShuffleSpec` to have more specific type in CoalescedHashShuffleSpec, but it not strictly needed so I will revert this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL][WIP] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "ulysses-you (via GitHub)" <gi...@apache.org>.
ulysses-you commented on code in PR #43435:
URL: https://github.com/apache/spark/pull/43435#discussion_r1368087491


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -700,14 +723,38 @@ case class HashShuffleSpec(
     }
   }
 
-  override def createPartitioning(clustering: Seq[Expression]): Partitioning = {
+  override def createPartitioning(clustering: Seq[Expression]): HashPartitioning = {

Review Comment:
   Why do we need this change ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #43435:
URL: https://github.com/apache/spark/pull/43435#issuecomment-1784766909

   great work! With this change, we can do some cleanup in `ShuffledJoin`:
   ```
     override def requiredChildDistribution: Seq[Distribution] = {
       if (isSkewJoin) {
         // We re-arrange the shuffle partitions to deal with skew join, and the new children
         // partitioning doesn't satisfy `HashClusteredDistribution`.
         UnspecifiedDistribution :: UnspecifiedDistribution :: Nil
       } else {
         ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
       }
     }
   ```
   We don't need this hack anymore and can safely remove the `if` branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "ulysses-you (via GitHub)" <gi...@apache.org>.
ulysses-you commented on code in PR #43435:
URL: https://github.com/apache/spark/pull/43435#discussion_r1375942459


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -708,6 +736,26 @@ case class HashShuffleSpec(
   override def numPartitions: Int = partitioning.numPartitions
 }
 
+case class CoalescedHashShuffleSpec(
+    from: ShuffleSpec,
+    @transient partitions: Seq[CoalescedBoundary]) extends ShuffleSpec {

Review Comment:
   I thought it would be holden at some place... After some search, it seems ShuffleSpec is created dynamically so it's unnecessary to consider ser/de. We can remove it `@transient`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43435:
URL: https://github.com/apache/spark/pull/43435#discussion_r1375912650


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala:
##########
@@ -708,6 +736,26 @@ case class HashShuffleSpec(
   override def numPartitions: Int = partitioning.numPartitions
 }
 
+case class CoalescedHashShuffleSpec(
+    from: ShuffleSpec,
+    @transient partitions: Seq[CoalescedBoundary]) extends ShuffleSpec {

Review Comment:
   which plan may hold `ShuffleSpec`? and do we need to do the same for other `ShuffleSpec`s?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL][WIP] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "eejbyfeldt (via GitHub)" <gi...@apache.org>.
eejbyfeldt commented on PR #43435:
URL: https://github.com/apache/spark/pull/43435#issuecomment-1768525411

   Tagging @cloud-fan and @ulysses-you since they created PRs in this area and might not a better way of fixing the bug.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45592][SQL] Correctness issue in AQE with InMemoryTableScanExec [spark]

Posted by "deepakcv (via GitHub)" <gi...@apache.org>.
deepakcv commented on PR #43435:
URL: https://github.com/apache/spark/pull/43435#issuecomment-1888466670

   Hi, is there a tentative timeline for releasing spark-3.5.1 with this release?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org