You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2023/06/27 00:58:49 UTC
[iceberg] branch master updated: Spark 3.4: Multiple shuffle partitions per file in compaction (#7897)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new d98e7a1152 Spark 3.4: Multiple shuffle partitions per file in compaction (#7897)
d98e7a1152 is described below
commit d98e7a115259d92695a22ac301f7bf66649d7396
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Mon Jun 26 17:58:42 2023 -0700
Spark 3.4: Multiple shuffle partitions per file in compaction (#7897)
---
.../v2/ExtendedDataSourceV2Strategy.scala | 5 ++
.../extensions/TestRewriteDataFilesProcedure.java | 71 ++++++++++++++++++++++
.../spark/actions/SparkShufflingDataRewriter.java | 47 +++++++++++++-
.../plans/logical/OrderAwareCoalesce.scala | 51 ++++++++++++++++
.../sql/execution/OrderAwareCoalesceExec.scala | 59 ++++++++++++++++++
.../spark/actions/TestRewriteDataFilesAction.java | 9 +++
.../spark/actions/TestSparkFileRewriter.java | 2 +
7 files changed, 242 insertions(+), 2 deletions(-)
diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
index aa81e38f29..2f67508c1f 100644
--- a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
+++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.plans.logical.DropTag
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.MergeRows
import org.apache.spark.sql.catalyst.plans.logical.NoStatsUnaryNode
+import org.apache.spark.sql.catalyst.plans.logical.OrderAwareCoalesce
import org.apache.spark.sql.catalyst.plans.logical.ReplaceIcebergData
import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
@@ -47,6 +48,7 @@ import org.apache.spark.sql.catalyst.plans.logical.UpdateRows
import org.apache.spark.sql.catalyst.plans.logical.WriteIcebergDelta
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.TableCatalog
+import org.apache.spark.sql.execution.OrderAwareCoalesceExec
import org.apache.spark.sql.execution.SparkPlan
import scala.jdk.CollectionConverters._
@@ -111,6 +113,9 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi
case NoStatsUnaryNode(child) =>
planLater(child) :: Nil
+ case OrderAwareCoalesce(numPartitions, coalescer, child) =>
+ OrderAwareCoalesceExec(numPartitions, coalescer, planLater(child)) :: Nil
+
case _ => Nil
}
diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 3ed47d54d3..88fdf29207 100644
--- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -184,6 +184,41 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
}
+ @Test
+ public void testRewriteDataFilesWithSortStrategyAndMultipleShufflePartitionsPerFile() {
+ createTable();
+ insertData(10 /* file count */);
+
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.rewrite_data_files("
+ + " table => '%s', "
+ + " strategy => 'sort', "
+ + " sort_order => 'c1', "
+ + " options => map('shuffle-partitions-per-file', '2'))",
+ catalogName, tableIdent);
+
+ assertEquals(
+ "Action should rewrite 10 data files and add 1 data files",
+ row(10, 1),
+ Arrays.copyOf(output.get(0), 2));
+
+ // as there is only one small output file, validate the query ordering (it will not change)
+ ImmutableList<Object[]> expectedRows =
+ ImmutableList.of(
+ row(1, "foo", null),
+ row(1, "foo", null),
+ row(1, "foo", null),
+ row(1, "foo", null),
+ row(1, "foo", null),
+ row(2, "bar", null),
+ row(2, "bar", null),
+ row(2, "bar", null),
+ row(2, "bar", null),
+ row(2, "bar", null));
+ assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s", tableName));
+ }
+
@Test
public void testRewriteDataFilesWithZOrder() {
createTable();
@@ -225,6 +260,42 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s", tableName));
}
+ @Test
+ public void testRewriteDataFilesWithZOrderAndMultipleShufflePartitionsPerFile() {
+ createTable();
+ insertData(10 /* file count */);
+
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.rewrite_data_files("
+ + " table => '%s', "
+ + "strategy => 'sort', "
+ + " sort_order => 'zorder(c1, c2)', "
+ + " options => map('shuffle-partitions-per-file', '2'))",
+ catalogName, tableIdent);
+
+ assertEquals(
+ "Action should rewrite 10 data files and add 1 data files",
+ row(10, 1),
+ Arrays.copyOf(output.get(0), 2));
+
+ // due to z-ordering, the data will be written in the below order
+ // as there is only one small output file, validate the query ordering (it will not change)
+ ImmutableList<Object[]> expectedRows =
+ ImmutableList.of(
+ row(2, "bar", null),
+ row(2, "bar", null),
+ row(2, "bar", null),
+ row(2, "bar", null),
+ row(2, "bar", null),
+ row(1, "foo", null),
+ row(1, "foo", null),
+ row(1, "foo", null),
+ row(1, "foo", null),
+ row(1, "foo", null));
+ assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s", tableName));
+ }
+
@Test
public void testRewriteDataFilesWithFilter() {
createTable();
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java
index 1ee469b090..c9c962526e 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java
@@ -36,6 +36,8 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.plans.logical.OrderAwareCoalesce;
+import org.apache.spark.sql.catalyst.plans.logical.OrderAwareCoalescer;
import org.apache.spark.sql.connector.distributions.Distribution;
import org.apache.spark.sql.connector.distributions.Distributions;
import org.apache.spark.sql.connector.distributions.OrderedDistribution;
@@ -59,7 +61,24 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
public static final double COMPRESSION_FACTOR_DEFAULT = 1.0;
+ /**
+ * The number of shuffle partitions to use for each output file. By default, this file rewriter
+ * assumes each shuffle partition would become a separate output file. Attempting to generate
+ * large output files of 512 MB or higher may strain the memory resources of the cluster as such
+ * rewrites would require lots of Spark memory. This parameter can be used to further divide up
+ * the data which will end up in a single file. For example, if the target file size is 2 GB, but
+ * the cluster can only handle shuffles of 512 MB, this parameter could be set to 4. Iceberg will
+ * use a custom coalesce operation to stitch these sorted partitions back together into a single
+ * sorted file.
+ *
+ * <p>Note using this parameter requires enabling Iceberg Spark session extensions.
+ */
+ public static final String SHUFFLE_PARTITIONS_PER_FILE = "shuffle-partitions-per-file";
+
+ public static final int SHUFFLE_PARTITIONS_PER_FILE_DEFAULT = 1;
+
private double compressionFactor;
+ private int numShufflePartitionsPerFile;
protected SparkShufflingDataRewriter(SparkSession spark, Table table) {
super(spark, table);
@@ -75,6 +94,7 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
return ImmutableSet.<String>builder()
.addAll(super.validOptions())
.add(COMPRESSION_FACTOR)
+ .add(SHUFFLE_PARTITIONS_PER_FILE)
.build();
}
@@ -82,6 +102,7 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
public void init(Map<String, String> options) {
super.init(options);
this.compressionFactor = compressionFactor(options);
+ this.numShufflePartitionsPerFile = numShufflePartitionsPerFile(options);
}
@Override
@@ -114,7 +135,16 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
private LogicalPlan sortPlan(LogicalPlan plan, SortOrder[] ordering, int numShufflePartitions) {
SparkFunctionCatalog catalog = SparkFunctionCatalog.get();
OrderedWrite write = new OrderedWrite(ordering, numShufflePartitions);
- return DistributionAndOrderingUtils$.MODULE$.prepareQuery(write, plan, Option.apply(catalog));
+ LogicalPlan sortPlan =
+ DistributionAndOrderingUtils$.MODULE$.prepareQuery(write, plan, Option.apply(catalog));
+
+ if (numShufflePartitionsPerFile == 1) {
+ return sortPlan;
+ } else {
+ OrderAwareCoalescer coalescer = new OrderAwareCoalescer(numShufflePartitionsPerFile);
+ int numOutputPartitions = numShufflePartitions / numShufflePartitionsPerFile;
+ return new OrderAwareCoalesce(numOutputPartitions, coalescer, sortPlan);
+ }
}
private Dataset<Row> transformPlan(Dataset<Row> df, Function<LogicalPlan, LogicalPlan> func) {
@@ -134,7 +164,7 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
private int numShufflePartitions(List<FileScanTask> group) {
int numOutputFiles = (int) numOutputFiles((long) (inputSize(group) * compressionFactor));
- return Math.max(1, numOutputFiles);
+ return Math.max(1, numOutputFiles * numShufflePartitionsPerFile);
}
private double compressionFactor(Map<String, String> options) {
@@ -145,6 +175,19 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
return value;
}
+ private int numShufflePartitionsPerFile(Map<String, String> options) {
+ int value =
+ PropertyUtil.propertyAsInt(
+ options, SHUFFLE_PARTITIONS_PER_FILE, SHUFFLE_PARTITIONS_PER_FILE_DEFAULT);
+ Preconditions.checkArgument(
+ value > 0, "'%s' is set to %s but must be > 0", SHUFFLE_PARTITIONS_PER_FILE, value);
+ Preconditions.checkArgument(
+ value == 1 || Spark3Util.extensionsEnabled(spark()),
+ "Using '%s' requires enabling Iceberg Spark session extensions",
+ SHUFFLE_PARTITIONS_PER_FILE);
+ return value;
+ }
+
private static class OrderedWrite implements RequiresDistributionAndOrdering {
private final OrderedDistribution distribution;
private final SortOrder[] ordering;
diff --git a/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/OrderAwareCoalesce.scala b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/OrderAwareCoalesce.scala
new file mode 100644
index 0000000000..5acaa6800e
--- /dev/null
+++ b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/OrderAwareCoalesce.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.rdd.PartitionCoalescer
+import org.apache.spark.rdd.PartitionGroup
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.Attribute
+
+// this node doesn't extend RepartitionOperation on purpose to keep this logic isolated
+// and ignore it in optimizer rules such as CollapseRepartition
+case class OrderAwareCoalesce(
+ numPartitions: Int,
+ coalescer: PartitionCoalescer,
+ child: LogicalPlan) extends OrderPreservingUnaryNode {
+
+ override def output: Seq[Attribute] = child.output
+
+ override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = {
+ copy(child = newChild)
+ }
+}
+
+class OrderAwareCoalescer(val groupSize: Int) extends PartitionCoalescer with Serializable {
+
+ override def coalesce(maxPartitions: Int, parent: RDD[_]): Array[PartitionGroup] = {
+ val partitionBins = parent.partitions.grouped(groupSize)
+ partitionBins.map { partitions =>
+ val group = new PartitionGroup()
+ group.partitions ++= partitions
+ group
+ }.toArray
+ }
+}
diff --git a/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/OrderAwareCoalesceExec.scala b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/OrderAwareCoalesceExec.scala
new file mode 100644
index 0000000000..2ef9955052
--- /dev/null
+++ b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/OrderAwareCoalesceExec.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.rdd.PartitionCoalescer
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.SortOrder
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
+import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning
+
+case class OrderAwareCoalesceExec(
+ numPartitions: Int,
+ coalescer: PartitionCoalescer,
+ child: SparkPlan) extends UnaryExecNode {
+
+ override def output: Seq[Attribute] = child.output
+
+ override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+ override def outputPartitioning: Partitioning = {
+ if (numPartitions == 1) SinglePartition else UnknownPartitioning(numPartitions)
+ }
+
+ protected override def doExecute(): RDD[InternalRow] = {
+ val result = child.execute()
+ if (numPartitions == 1 && result.getNumPartitions < 1) {
+ // make sure we don't output an RDD with 0 partitions,
+ // when claiming that we have a `SinglePartition`
+ // see CoalesceExec in Spark
+ new CoalesceExec.EmptyRDDWithPartitions(sparkContext, numPartitions)
+ } else {
+ result.coalesce(numPartitions, shuffle = false, Some(coalescer))
+ }
+ }
+
+ override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = {
+ copy(child = newChild)
+ }
+}
diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index bf4bef74c3..0cb9103643 100644
--- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -894,6 +894,15 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
() -> basicRewrite(table).option(RewriteDataFiles.REWRITE_JOB_ORDER, "foo").execute())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid rewrite job order name: foo");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ basicRewrite(table)
+ .sort(SortOrder.builderFor(table.schema()).asc("c2").build())
+ .option(SparkShufflingDataRewriter.SHUFFLE_PARTITIONS_PER_FILE, "5")
+ .execute())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("requires enabling Iceberg Spark session extensions");
}
@Test
diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkFileRewriter.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkFileRewriter.java
index 6800ffd404..055e5be681 100644
--- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkFileRewriter.java
+++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkFileRewriter.java
@@ -261,6 +261,7 @@ public class TestSparkFileRewriter extends SparkTestBase {
Assert.assertEquals(
"Rewriter must report all supported options",
ImmutableSet.of(
+ SparkSortDataRewriter.SHUFFLE_PARTITIONS_PER_FILE,
SparkSortDataRewriter.TARGET_FILE_SIZE_BYTES,
SparkSortDataRewriter.MIN_FILE_SIZE_BYTES,
SparkSortDataRewriter.MAX_FILE_SIZE_BYTES,
@@ -281,6 +282,7 @@ public class TestSparkFileRewriter extends SparkTestBase {
Assert.assertEquals(
"Rewriter must report all supported options",
ImmutableSet.of(
+ SparkZOrderDataRewriter.SHUFFLE_PARTITIONS_PER_FILE,
SparkZOrderDataRewriter.TARGET_FILE_SIZE_BYTES,
SparkZOrderDataRewriter.MIN_FILE_SIZE_BYTES,
SparkZOrderDataRewriter.MAX_FILE_SIZE_BYTES,