You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "EnricoMi (via GitHub)" <gi...@apache.org> on 2023/03/08 11:20:39 UTC

[GitHub] [spark] EnricoMi opened a new pull request, #40334: Support key-grouped partitioning without HasPartitionKey

EnricoMi opened a new pull request, #40334:
URL: https://github.com/apache/spark/pull/40334

   ### What changes were proposed in this pull request?
   A `DataSourceV2` reporting a `KeyGroupedPartitioning` through `SupportsReportPartitioning` does not have to implement `HasPartitionKey`, and is thus not limited to a single key per partition.
   
   ### Why are the changes needed?
   Before Spark 3.3, `DataSourceV2` implementations could report a `ClusteredDistribution`, which allowed Spark to exploit the existing partitioning and avoid an extra hash partitioning shuffle step. Transformations like `groupBy` or window functions (with the right group / partitioning keys) would then be executed on the partitioning provided by the data source.
   
   ### Does this PR introduce _any_ user-facing change?
   This improves performance as existing partitioning is reused.
   
   ### How was this patch tested?
   Existing tests have been fixed. They falsely reported partitions with multiple keys via `HasPartitionKey`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] EnricoMi commented on pull request #40334: [SPARK-42716][SQL] DataSourceV2 supports reporting key-grouped partitioning without HasPartitionKey

Posted by "EnricoMi (via GitHub)" <gi...@apache.org>.
EnricoMi commented on PR #40334:
URL: https://github.com/apache/spark/pull/40334#issuecomment-1477439623

   @cloud-fan is reporting a clustered distribution still supported? Data sources should be able to report that partitions are partitioned by some columns, without reporting the actual partitioning mechanism (like hash). That fact can be reused by `groupby` or windows functions with partitioning.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jaceklaskowski commented on a diff in pull request #40334: [SPARK-42716][SQL] DataSourceV2 supports reporting key-grouped partitioning without HasPartitionKey

Posted by "jaceklaskowski (via GitHub)" <gi...@apache.org>.
jaceklaskowski commented on code in PR #40334:
URL: https://github.com/apache/spark/pull/40334#discussion_r1144552053


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala:
##########
@@ -945,98 +987,102 @@ object ColumnarReaderFactory extends PartitionReaderFactory {
   }
 }
 
-class PartitionAwareDataSource extends TestingV2Source {
+class MyScanBuilder(partitions: Array[InputPartition],
+                    partitionKeys: Option[Seq[String]],
+                    orderKeys: Option[Seq[String]] = None) extends SimpleScanBuilder
+  with SupportsReportPartitioning with SupportsReportOrdering {
 
-  class MyScanBuilder extends SimpleScanBuilder
-    with SupportsReportPartitioning {
+  override def planInputPartitions(): Array[InputPartition] = partitions
 
-    override def planInputPartitions(): Array[InputPartition] = {
-      // Note that we don't have same value of column `i` across partitions.
-      Array(
-        SpecificInputPartition(Array(1, 1, 3), Array(4, 4, 6)),
-        SpecificInputPartition(Array(2, 4, 4), Array(6, 2, 2)))
-    }
+  override def createReaderFactory(): PartitionReaderFactory = {
+    SpecificReaderFactory
+  }
 
-    override def createReaderFactory(): PartitionReaderFactory = {
-      SpecificReaderFactory
-    }
+  override def outputPartitioning(): Partitioning =
+    partitionKeys.map(keys =>
+      new KeyGroupedPartitioning(keys.map(FieldReference(_)).toArray, partitions.length)
+    ).getOrElse(

Review Comment:
   nit: Replace `(` with `{`



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala:
##########
@@ -945,98 +987,102 @@ object ColumnarReaderFactory extends PartitionReaderFactory {
   }
 }
 
-class PartitionAwareDataSource extends TestingV2Source {
+class MyScanBuilder(partitions: Array[InputPartition],
+                    partitionKeys: Option[Seq[String]],
+                    orderKeys: Option[Seq[String]] = None) extends SimpleScanBuilder
+  with SupportsReportPartitioning with SupportsReportOrdering {
 
-  class MyScanBuilder extends SimpleScanBuilder
-    with SupportsReportPartitioning {
+  override def planInputPartitions(): Array[InputPartition] = partitions
 
-    override def planInputPartitions(): Array[InputPartition] = {
-      // Note that we don't have same value of column `i` across partitions.
-      Array(
-        SpecificInputPartition(Array(1, 1, 3), Array(4, 4, 6)),
-        SpecificInputPartition(Array(2, 4, 4), Array(6, 2, 2)))
-    }
+  override def createReaderFactory(): PartitionReaderFactory = {
+    SpecificReaderFactory
+  }
 
-    override def createReaderFactory(): PartitionReaderFactory = {
-      SpecificReaderFactory
-    }
+  override def outputPartitioning(): Partitioning =
+    partitionKeys.map(keys =>
+      new KeyGroupedPartitioning(keys.map(FieldReference(_)).toArray, partitions.length)
+    ).getOrElse(
+      new UnknownPartitioning(partitions.length)
+    )
 
-    override def outputPartitioning(): Partitioning =
-      new KeyGroupedPartitioning(Array(FieldReference("i")), 2)
-  }
+  override def outputOrdering(): Array[SortOrder] = orderKeys.map(_.map(
+    new MySortOrder(_)
+  )).getOrElse(Seq.empty).toArray
+}
 
-  override def getTable(options: CaseInsensitiveStringMap): Table = new SimpleBatchTable {
-    override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
-      new MyScanBuilder()
-    }
-  }
+class MySortOrder(columnName: String) extends SortOrder {
+  override def expression(): Expression = new MyIdentityTransform(
+    new MyNamedReference(columnName)
+  )
+  override def direction(): SortDirection = SortDirection.ASCENDING
+  override def nullOrdering(): NullOrdering = NullOrdering.NULLS_FIRST
 }
 
-class OrderAndPartitionAwareDataSource extends PartitionAwareDataSource {
+class MyNamedReference(parts: String*) extends NamedReference {
+  override def fieldNames(): Array[String] = parts.toArray
+}
 
-  class MyScanBuilder(
-      val partitionKeys: Option[Seq[String]],
-      val orderKeys: Seq[String])
-    extends SimpleScanBuilder
-    with SupportsReportPartitioning with SupportsReportOrdering {
+class MyIdentityTransform(namedReference: NamedReference) extends Transform {
+  override def name(): String = "identity"
+  override def references(): Array[NamedReference] = Array.empty
+  override def arguments(): Array[Expression] = Seq(namedReference).toArray
+}
 
-    override def planInputPartitions(): Array[InputPartition] = {
-      // data are partitioned by column `i` or `j`, so we can report any partitioning
-      // column `i` is not ordered globally, but within partitions, together with`j`
-      // this allows us to report ordering by [i] and [i, j]
-      Array(
-        SpecificInputPartition(Array(1, 1, 3), Array(4, 5, 5)),
-        SpecificInputPartition(Array(2, 4, 4), Array(6, 1, 2)))
-    }
+class PartitionAwareDataSource extends TestingV2Source {
+  // Note that we don't have same value of column `i` across partitions.
+  val partitions: Array[InputPartition] =
+    Array(
+      new SpecificInputPartition(Array(1, 1, 3), Array(4, 4, 6)),
+      new SpecificInputPartition(Array(2, 4, 4), Array(6, 2, 2))
+    )
 
-    override def createReaderFactory(): PartitionReaderFactory = {
-      SpecificReaderFactory
+  override def getTable(options: CaseInsensitiveStringMap): Table = new SimpleBatchTable {
+    override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
+      new MyScanBuilder(partitions, Some(Seq("i")))
     }
+  }
+}
 
-    override def outputPartitioning(): Partitioning = {
-      partitionKeys.map(keys =>
-        new KeyGroupedPartitioning(keys.map(FieldReference(_)).toArray, 2)
-      ).getOrElse(
-        new UnknownPartitioning(2)
-      )
-    }
+class PartitionAwareDataSourceWithKey extends PartitionAwareDataSource {
+  // Note that we don't have same value of column `i` across partitions.
+  // Note that we have only a single value for column `i` per partition.
+  override val partitions: Array[InputPartition] =
+    Array(
+      new SpecificInputPartitionWithKey(Array(1, 1), Array(4, 4)),
+      new SpecificInputPartitionWithKey(Array(3), Array(6)),
+      new SpecificInputPartitionWithKey(Array(2), Array(6)),
+      new SpecificInputPartitionWithKey(Array(4), Array(2)),
+      new SpecificInputPartitionWithKey(Array(4), Array(2))
+    )
+}
 
-    override def outputOrdering(): Array[SortOrder] = orderKeys.map(
-      new MySortOrder(_)
-    ).toArray
-  }
+class OrderAndPartitionAwareDataSource extends PartitionAwareDataSource {
+  // data are partitioned by column `i` or `j`, so we can report any partitioning
+  // column `i` is not ordered globally, but within partitions, together with`j`
+  // this allows us to report ordering by [i] and [i, j]
+  override val partitions: Array[InputPartition] =
+  Array(
+    new SpecificInputPartition(Array(1, 1, 3), Array(4, 5, 5)),
+    new SpecificInputPartition(Array(2, 4, 4), Array(6, 1, 2))
+  )
 
   override def getTable(options: CaseInsensitiveStringMap): Table = new SimpleBatchTable {
     override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
       new MyScanBuilder(
+        partitions,
         Option(options.get("partitionKeys")).map(_.split(",")),
-        Option(options.get("orderKeys")).map(_.split(",").toSeq).getOrElse(Seq.empty)
+        Option(options.get("orderKeys")).map(_.split(",").toSeq)
       )
     }
   }
-
-  class MySortOrder(columnName: String) extends SortOrder {
-    override def expression(): Expression = new MyIdentityTransform(
-      new MyNamedReference(columnName)
-    )
-    override def direction(): SortDirection = SortDirection.ASCENDING
-    override def nullOrdering(): NullOrdering = NullOrdering.NULLS_FIRST
-  }
-
-  class MyNamedReference(parts: String*) extends NamedReference {
-    override def fieldNames(): Array[String] = parts.toArray
-  }
-
-  class MyIdentityTransform(namedReference: NamedReference) extends Transform {
-    override def name(): String = "identity"
-    override def references(): Array[NamedReference] = Array.empty
-    override def arguments(): Array[Expression] = Seq(namedReference).toArray
-  }
 }
 
-case class SpecificInputPartition(
-    i: Array[Int],
-    j: Array[Int]) extends InputPartition with HasPartitionKey {
+class SpecificInputPartition(val i: Array[Int], val j: Array[Int]) extends InputPartition
+
+class SpecificInputPartitionWithKey(i: Array[Int], j: Array[Int])
+  extends SpecificInputPartition(i, j) with HasPartitionKey {
+  assert(i.nonEmpty)
+  assert(i.forall(i.head == _))

Review Comment:
   `i.distinct.size == 1`?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala:
##########
@@ -98,7 +98,9 @@ trait DataSourceV2ScanExecBase extends LeafExecNode {
         case Some(exprs) if KeyGroupedPartitioning.supportsExpressions(exprs) =>
           groupedPartitions.map { partitionValues =>
             KeyGroupedPartitioning(exprs, partitionValues.size, partitionValues.map(_._1))
-          }.getOrElse(super.outputPartitioning)
+          }.getOrElse(

Review Comment:
   nit: Replace `(` with `{`



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala:
##########
@@ -945,98 +987,102 @@ object ColumnarReaderFactory extends PartitionReaderFactory {
   }
 }
 
-class PartitionAwareDataSource extends TestingV2Source {
+class MyScanBuilder(partitions: Array[InputPartition],
+                    partitionKeys: Option[Seq[String]],
+                    orderKeys: Option[Seq[String]] = None) extends SimpleScanBuilder
+  with SupportsReportPartitioning with SupportsReportOrdering {
 
-  class MyScanBuilder extends SimpleScanBuilder
-    with SupportsReportPartitioning {
+  override def planInputPartitions(): Array[InputPartition] = partitions
 
-    override def planInputPartitions(): Array[InputPartition] = {
-      // Note that we don't have same value of column `i` across partitions.
-      Array(
-        SpecificInputPartition(Array(1, 1, 3), Array(4, 4, 6)),
-        SpecificInputPartition(Array(2, 4, 4), Array(6, 2, 2)))
-    }
+  override def createReaderFactory(): PartitionReaderFactory = {
+    SpecificReaderFactory
+  }
 
-    override def createReaderFactory(): PartitionReaderFactory = {
-      SpecificReaderFactory
-    }
+  override def outputPartitioning(): Partitioning =
+    partitionKeys.map(keys =>
+      new KeyGroupedPartitioning(keys.map(FieldReference(_)).toArray, partitions.length)
+    ).getOrElse(
+      new UnknownPartitioning(partitions.length)
+    )
 
-    override def outputPartitioning(): Partitioning =
-      new KeyGroupedPartitioning(Array(FieldReference("i")), 2)
-  }
+  override def outputOrdering(): Array[SortOrder] = orderKeys.map(_.map(
+    new MySortOrder(_)
+  )).getOrElse(Seq.empty).toArray
+}
 
-  override def getTable(options: CaseInsensitiveStringMap): Table = new SimpleBatchTable {
-    override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
-      new MyScanBuilder()
-    }
-  }
+class MySortOrder(columnName: String) extends SortOrder {
+  override def expression(): Expression = new MyIdentityTransform(
+    new MyNamedReference(columnName)
+  )
+  override def direction(): SortDirection = SortDirection.ASCENDING
+  override def nullOrdering(): NullOrdering = NullOrdering.NULLS_FIRST
 }
 
-class OrderAndPartitionAwareDataSource extends PartitionAwareDataSource {
+class MyNamedReference(parts: String*) extends NamedReference {
+  override def fieldNames(): Array[String] = parts.toArray
+}
 
-  class MyScanBuilder(
-      val partitionKeys: Option[Seq[String]],
-      val orderKeys: Seq[String])
-    extends SimpleScanBuilder
-    with SupportsReportPartitioning with SupportsReportOrdering {
+class MyIdentityTransform(namedReference: NamedReference) extends Transform {
+  override def name(): String = "identity"
+  override def references(): Array[NamedReference] = Array.empty
+  override def arguments(): Array[Expression] = Seq(namedReference).toArray
+}
 
-    override def planInputPartitions(): Array[InputPartition] = {
-      // data are partitioned by column `i` or `j`, so we can report any partitioning
-      // column `i` is not ordered globally, but within partitions, together with`j`
-      // this allows us to report ordering by [i] and [i, j]
-      Array(
-        SpecificInputPartition(Array(1, 1, 3), Array(4, 5, 5)),
-        SpecificInputPartition(Array(2, 4, 4), Array(6, 1, 2)))
-    }
+class PartitionAwareDataSource extends TestingV2Source {
+  // Note that we don't have same value of column `i` across partitions.
+  val partitions: Array[InputPartition] =
+    Array(
+      new SpecificInputPartition(Array(1, 1, 3), Array(4, 4, 6)),
+      new SpecificInputPartition(Array(2, 4, 4), Array(6, 2, 2))
+    )
 
-    override def createReaderFactory(): PartitionReaderFactory = {
-      SpecificReaderFactory
+  override def getTable(options: CaseInsensitiveStringMap): Table = new SimpleBatchTable {
+    override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
+      new MyScanBuilder(partitions, Some(Seq("i")))
     }
+  }
+}
 
-    override def outputPartitioning(): Partitioning = {
-      partitionKeys.map(keys =>
-        new KeyGroupedPartitioning(keys.map(FieldReference(_)).toArray, 2)
-      ).getOrElse(
-        new UnknownPartitioning(2)
-      )
-    }
+class PartitionAwareDataSourceWithKey extends PartitionAwareDataSource {
+  // Note that we don't have same value of column `i` across partitions.
+  // Note that we have only a single value for column `i` per partition.
+  override val partitions: Array[InputPartition] =
+    Array(
+      new SpecificInputPartitionWithKey(Array(1, 1), Array(4, 4)),
+      new SpecificInputPartitionWithKey(Array(3), Array(6)),
+      new SpecificInputPartitionWithKey(Array(2), Array(6)),
+      new SpecificInputPartitionWithKey(Array(4), Array(2)),
+      new SpecificInputPartitionWithKey(Array(4), Array(2))
+    )
+}
 
-    override def outputOrdering(): Array[SortOrder] = orderKeys.map(
-      new MySortOrder(_)
-    ).toArray
-  }
+class OrderAndPartitionAwareDataSource extends PartitionAwareDataSource {
+  // data are partitioned by column `i` or `j`, so we can report any partitioning
+  // column `i` is not ordered globally, but within partitions, together with`j`
+  // this allows us to report ordering by [i] and [i, j]
+  override val partitions: Array[InputPartition] =
+  Array(
+    new SpecificInputPartition(Array(1, 1, 3), Array(4, 5, 5)),
+    new SpecificInputPartition(Array(2, 4, 4), Array(6, 1, 2))
+  )
 
   override def getTable(options: CaseInsensitiveStringMap): Table = new SimpleBatchTable {
     override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
       new MyScanBuilder(
+        partitions,
         Option(options.get("partitionKeys")).map(_.split(",")),
-        Option(options.get("orderKeys")).map(_.split(",").toSeq).getOrElse(Seq.empty)
+        Option(options.get("orderKeys")).map(_.split(",").toSeq)
       )
     }
   }
-
-  class MySortOrder(columnName: String) extends SortOrder {
-    override def expression(): Expression = new MyIdentityTransform(
-      new MyNamedReference(columnName)
-    )
-    override def direction(): SortDirection = SortDirection.ASCENDING
-    override def nullOrdering(): NullOrdering = NullOrdering.NULLS_FIRST
-  }
-
-  class MyNamedReference(parts: String*) extends NamedReference {
-    override def fieldNames(): Array[String] = parts.toArray
-  }
-
-  class MyIdentityTransform(namedReference: NamedReference) extends Transform {
-    override def name(): String = "identity"
-    override def references(): Array[NamedReference] = Array.empty
-    override def arguments(): Array[Expression] = Seq(namedReference).toArray
-  }
 }
 
-case class SpecificInputPartition(
-    i: Array[Int],
-    j: Array[Int]) extends InputPartition with HasPartitionKey {
+class SpecificInputPartition(val i: Array[Int], val j: Array[Int]) extends InputPartition
+
+class SpecificInputPartitionWithKey(i: Array[Int], j: Array[Int])
+  extends SpecificInputPartition(i, j) with HasPartitionKey {
+  assert(i.nonEmpty)
+  assert(i.forall(i.head == _))

Review Comment:
   `i.distinct.size == 1`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42716][SQL] DataSourceV2 supports reporting key-grouped partitioning without HasPartitionKey [spark]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #40334:
URL: https://github.com/apache/spark/pull/40334#issuecomment-1752204753

   We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42716][SQL] DataSourceV2 supports reporting key-grouped partitioning without HasPartitionKey [spark]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #40334: [SPARK-42716][SQL] DataSourceV2 supports reporting key-grouped partitioning without HasPartitionKey
URL: https://github.com/apache/spark/pull/40334


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org