You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2023/06/27 00:58:49 UTC

[iceberg] branch master updated: Spark 3.4: Multiple shuffle partitions per file in compaction (#7897)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new d98e7a1152 Spark 3.4: Multiple shuffle partitions per file in compaction (#7897)
d98e7a1152 is described below

commit d98e7a115259d92695a22ac301f7bf66649d7396
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Mon Jun 26 17:58:42 2023 -0700

    Spark 3.4: Multiple shuffle partitions per file in compaction (#7897)
---
 .../v2/ExtendedDataSourceV2Strategy.scala          |  5 ++
 .../extensions/TestRewriteDataFilesProcedure.java  | 71 ++++++++++++++++++++++
 .../spark/actions/SparkShufflingDataRewriter.java  | 47 +++++++++++++-
 .../plans/logical/OrderAwareCoalesce.scala         | 51 ++++++++++++++++
 .../sql/execution/OrderAwareCoalesceExec.scala     | 59 ++++++++++++++++++
 .../spark/actions/TestRewriteDataFilesAction.java  |  9 +++
 .../spark/actions/TestSparkFileRewriter.java       |  2 +
 7 files changed, 242 insertions(+), 2 deletions(-)

diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
index aa81e38f29..2f67508c1f 100644
--- a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
+++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.plans.logical.DropTag
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.logical.MergeRows
 import org.apache.spark.sql.catalyst.plans.logical.NoStatsUnaryNode
+import org.apache.spark.sql.catalyst.plans.logical.OrderAwareCoalesce
 import org.apache.spark.sql.catalyst.plans.logical.ReplaceIcebergData
 import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
 import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
@@ -47,6 +48,7 @@ import org.apache.spark.sql.catalyst.plans.logical.UpdateRows
 import org.apache.spark.sql.catalyst.plans.logical.WriteIcebergDelta
 import org.apache.spark.sql.connector.catalog.Identifier
 import org.apache.spark.sql.connector.catalog.TableCatalog
+import org.apache.spark.sql.execution.OrderAwareCoalesceExec
 import org.apache.spark.sql.execution.SparkPlan
 import scala.jdk.CollectionConverters._
 
@@ -111,6 +113,9 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi
     case NoStatsUnaryNode(child) =>
       planLater(child) :: Nil
 
+    case OrderAwareCoalesce(numPartitions, coalescer, child) =>
+      OrderAwareCoalesceExec(numPartitions, coalescer, planLater(child)) :: Nil
+
     case _ => Nil
   }
 
diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 3ed47d54d3..88fdf29207 100644
--- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -184,6 +184,41 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
     assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
   }
 
+  @Test
+  public void testRewriteDataFilesWithSortStrategyAndMultipleShufflePartitionsPerFile() {
+    createTable();
+    insertData(10 /* file count */);
+
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_data_files("
+                + " table => '%s', "
+                + " strategy => 'sort', "
+                + " sort_order => 'c1', "
+                + " options => map('shuffle-partitions-per-file', '2'))",
+            catalogName, tableIdent);
+
+    assertEquals(
+        "Action should rewrite 10 data files and add 1 data files",
+        row(10, 1),
+        Arrays.copyOf(output.get(0), 2));
+
+    // as there is only one small output file, validate the query ordering (it will not change)
+    ImmutableList<Object[]> expectedRows =
+        ImmutableList.of(
+            row(1, "foo", null),
+            row(1, "foo", null),
+            row(1, "foo", null),
+            row(1, "foo", null),
+            row(1, "foo", null),
+            row(2, "bar", null),
+            row(2, "bar", null),
+            row(2, "bar", null),
+            row(2, "bar", null),
+            row(2, "bar", null));
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s", tableName));
+  }
+
   @Test
   public void testRewriteDataFilesWithZOrder() {
     createTable();
@@ -225,6 +260,42 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
     assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s", tableName));
   }
 
+  @Test
+  public void testRewriteDataFilesWithZOrderAndMultipleShufflePartitionsPerFile() {
+    createTable();
+    insertData(10 /* file count */);
+
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_data_files("
+                + " table => '%s', "
+                + "strategy => 'sort', "
+                + " sort_order => 'zorder(c1, c2)', "
+                + " options => map('shuffle-partitions-per-file', '2'))",
+            catalogName, tableIdent);
+
+    assertEquals(
+        "Action should rewrite 10 data files and add 1 data files",
+        row(10, 1),
+        Arrays.copyOf(output.get(0), 2));
+
+    // due to z-ordering, the data will be written in the below order
+    // as there is only one small output file, validate the query ordering (it will not change)
+    ImmutableList<Object[]> expectedRows =
+        ImmutableList.of(
+            row(2, "bar", null),
+            row(2, "bar", null),
+            row(2, "bar", null),
+            row(2, "bar", null),
+            row(2, "bar", null),
+            row(1, "foo", null),
+            row(1, "foo", null),
+            row(1, "foo", null),
+            row(1, "foo", null),
+            row(1, "foo", null));
+    assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s", tableName));
+  }
+
   @Test
   public void testRewriteDataFilesWithFilter() {
     createTable();
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java
index 1ee469b090..c9c962526e 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java
@@ -36,6 +36,8 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.plans.logical.OrderAwareCoalesce;
+import org.apache.spark.sql.catalyst.plans.logical.OrderAwareCoalescer;
 import org.apache.spark.sql.connector.distributions.Distribution;
 import org.apache.spark.sql.connector.distributions.Distributions;
 import org.apache.spark.sql.connector.distributions.OrderedDistribution;
@@ -59,7 +61,24 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
 
   public static final double COMPRESSION_FACTOR_DEFAULT = 1.0;
 
+  /**
+   * The number of shuffle partitions to use for each output file. By default, this file rewriter
+   * assumes each shuffle partition would become a separate output file. Attempting to generate
+   * large output files of 512 MB or higher may strain the memory resources of the cluster as such
+   * rewrites would require lots of Spark memory. This parameter can be used to further divide up
+   * the data which will end up in a single file. For example, if the target file size is 2 GB, but
+   * the cluster can only handle shuffles of 512 MB, this parameter could be set to 4. Iceberg will
+   * use a custom coalesce operation to stitch these sorted partitions back together into a single
+   * sorted file.
+   *
+   * <p>Note using this parameter requires enabling Iceberg Spark session extensions.
+   */
+  public static final String SHUFFLE_PARTITIONS_PER_FILE = "shuffle-partitions-per-file";
+
+  public static final int SHUFFLE_PARTITIONS_PER_FILE_DEFAULT = 1;
+
   private double compressionFactor;
+  private int numShufflePartitionsPerFile;
 
   protected SparkShufflingDataRewriter(SparkSession spark, Table table) {
     super(spark, table);
@@ -75,6 +94,7 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
     return ImmutableSet.<String>builder()
         .addAll(super.validOptions())
         .add(COMPRESSION_FACTOR)
+        .add(SHUFFLE_PARTITIONS_PER_FILE)
         .build();
   }
 
@@ -82,6 +102,7 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
   public void init(Map<String, String> options) {
     super.init(options);
     this.compressionFactor = compressionFactor(options);
+    this.numShufflePartitionsPerFile = numShufflePartitionsPerFile(options);
   }
 
   @Override
@@ -114,7 +135,16 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
   private LogicalPlan sortPlan(LogicalPlan plan, SortOrder[] ordering, int numShufflePartitions) {
     SparkFunctionCatalog catalog = SparkFunctionCatalog.get();
     OrderedWrite write = new OrderedWrite(ordering, numShufflePartitions);
-    return DistributionAndOrderingUtils$.MODULE$.prepareQuery(write, plan, Option.apply(catalog));
+    LogicalPlan sortPlan =
+        DistributionAndOrderingUtils$.MODULE$.prepareQuery(write, plan, Option.apply(catalog));
+
+    if (numShufflePartitionsPerFile == 1) {
+      return sortPlan;
+    } else {
+      OrderAwareCoalescer coalescer = new OrderAwareCoalescer(numShufflePartitionsPerFile);
+      int numOutputPartitions = numShufflePartitions / numShufflePartitionsPerFile;
+      return new OrderAwareCoalesce(numOutputPartitions, coalescer, sortPlan);
+    }
   }
 
   private Dataset<Row> transformPlan(Dataset<Row> df, Function<LogicalPlan, LogicalPlan> func) {
@@ -134,7 +164,7 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
 
   private int numShufflePartitions(List<FileScanTask> group) {
     int numOutputFiles = (int) numOutputFiles((long) (inputSize(group) * compressionFactor));
-    return Math.max(1, numOutputFiles);
+    return Math.max(1, numOutputFiles * numShufflePartitionsPerFile);
   }
 
   private double compressionFactor(Map<String, String> options) {
@@ -145,6 +175,19 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
     return value;
   }
 
+  private int numShufflePartitionsPerFile(Map<String, String> options) {
+    int value =
+        PropertyUtil.propertyAsInt(
+            options, SHUFFLE_PARTITIONS_PER_FILE, SHUFFLE_PARTITIONS_PER_FILE_DEFAULT);
+    Preconditions.checkArgument(
+        value > 0, "'%s' is set to %s but must be > 0", SHUFFLE_PARTITIONS_PER_FILE, value);
+    Preconditions.checkArgument(
+        value == 1 || Spark3Util.extensionsEnabled(spark()),
+        "Using '%s' requires enabling Iceberg Spark session extensions",
+        SHUFFLE_PARTITIONS_PER_FILE);
+    return value;
+  }
+
   private static class OrderedWrite implements RequiresDistributionAndOrdering {
     private final OrderedDistribution distribution;
     private final SortOrder[] ordering;
diff --git a/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/OrderAwareCoalesce.scala b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/OrderAwareCoalesce.scala
new file mode 100644
index 0000000000..5acaa6800e
--- /dev/null
+++ b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/OrderAwareCoalesce.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.rdd.PartitionCoalescer
+import org.apache.spark.rdd.PartitionGroup
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.Attribute
+
+// this node doesn't extend RepartitionOperation on purpose to keep this logic isolated
+// and ignore it in optimizer rules such as CollapseRepartition
+case class OrderAwareCoalesce(
+    numPartitions: Int,
+    coalescer: PartitionCoalescer,
+    child: LogicalPlan) extends OrderPreservingUnaryNode {
+
+  override def output: Seq[Attribute] = child.output
+
+  override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = {
+    copy(child = newChild)
+  }
+}
+
+class OrderAwareCoalescer(val groupSize: Int) extends PartitionCoalescer with Serializable {
+
+  override def coalesce(maxPartitions: Int, parent: RDD[_]): Array[PartitionGroup] = {
+    val partitionBins = parent.partitions.grouped(groupSize)
+    partitionBins.map { partitions =>
+      val group = new PartitionGroup()
+      group.partitions ++= partitions
+      group
+    }.toArray
+  }
+}
diff --git a/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/OrderAwareCoalesceExec.scala b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/OrderAwareCoalesceExec.scala
new file mode 100644
index 0000000000..2ef9955052
--- /dev/null
+++ b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/OrderAwareCoalesceExec.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.rdd.PartitionCoalescer
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.SortOrder
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
+import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning
+
+case class OrderAwareCoalesceExec(
+    numPartitions: Int,
+    coalescer: PartitionCoalescer,
+    child: SparkPlan) extends UnaryExecNode {
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  override def outputPartitioning: Partitioning = {
+    if (numPartitions == 1) SinglePartition else UnknownPartitioning(numPartitions)
+  }
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    val result = child.execute()
+    if (numPartitions == 1 && result.getNumPartitions < 1) {
+      // make sure we don't output an RDD with 0 partitions,
+      // when claiming that we have a `SinglePartition`
+      // see CoalesceExec in Spark
+      new CoalesceExec.EmptyRDDWithPartitions(sparkContext, numPartitions)
+    } else {
+      result.coalesce(numPartitions, shuffle = false, Some(coalescer))
+    }
+  }
+
+  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = {
+    copy(child = newChild)
+  }
+}
diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index bf4bef74c3..0cb9103643 100644
--- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -894,6 +894,15 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
             () -> basicRewrite(table).option(RewriteDataFiles.REWRITE_JOB_ORDER, "foo").execute())
         .isInstanceOf(IllegalArgumentException.class)
         .hasMessage("Invalid rewrite job order name: foo");
+
+    Assertions.assertThatThrownBy(
+            () ->
+                basicRewrite(table)
+                    .sort(SortOrder.builderFor(table.schema()).asc("c2").build())
+                    .option(SparkShufflingDataRewriter.SHUFFLE_PARTITIONS_PER_FILE, "5")
+                    .execute())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("requires enabling Iceberg Spark session extensions");
   }
 
   @Test
diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkFileRewriter.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkFileRewriter.java
index 6800ffd404..055e5be681 100644
--- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkFileRewriter.java
+++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkFileRewriter.java
@@ -261,6 +261,7 @@ public class TestSparkFileRewriter extends SparkTestBase {
     Assert.assertEquals(
         "Rewriter must report all supported options",
         ImmutableSet.of(
+            SparkSortDataRewriter.SHUFFLE_PARTITIONS_PER_FILE,
             SparkSortDataRewriter.TARGET_FILE_SIZE_BYTES,
             SparkSortDataRewriter.MIN_FILE_SIZE_BYTES,
             SparkSortDataRewriter.MAX_FILE_SIZE_BYTES,
@@ -281,6 +282,7 @@ public class TestSparkFileRewriter extends SparkTestBase {
     Assert.assertEquals(
         "Rewriter must report all supported options",
         ImmutableSet.of(
+            SparkZOrderDataRewriter.SHUFFLE_PARTITIONS_PER_FILE,
             SparkZOrderDataRewriter.TARGET_FILE_SIZE_BYTES,
             SparkZOrderDataRewriter.MIN_FILE_SIZE_BYTES,
             SparkZOrderDataRewriter.MAX_FILE_SIZE_BYTES,