You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2023/02/16 05:10:08 UTC

[iceberg] branch master updated: Spark 3.3: Change default distribution modes (#6828)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new a6ad1d1312 Spark 3.3: Change default distribution modes (#6828)
a6ad1d1312 is described below

commit a6ad1d1312bd1a0937bf7c9297f8bd553ab050a0
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Wed Feb 15 21:10:02 2023 -0800

    Spark 3.3: Change default distribution modes (#6828)
---
 .../extensions/TestRewriteDataFilesProcedure.java  |  10 +-
 .../org/apache/iceberg/spark/SparkWriteConf.java   |  21 +-
 .../TestSparkDistributionAndOrderingUtil.java      | 227 ++++++++++++++++++++-
 .../spark/actions/TestRewriteManifestsAction.java  |   8 +-
 .../spark/source/TestIcebergSourceTablesBase.java  |   2 +
 .../iceberg/spark/source/TestPartitionValues.java  |   1 +
 .../iceberg/spark/source/TestSparkDataWrite.java   |   7 +-
 7 files changed, 260 insertions(+), 16 deletions(-)

diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 4126f35919..d13e0967b6 100644
--- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.stream.IntStream;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.expressions.NamedReference;
@@ -661,8 +662,13 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
 
   private void createPartitionTable() {
     sql(
-        "CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg PARTITIONED BY (c2)",
-        tableName);
+        "CREATE TABLE %s (c1 int, c2 string, c3 string) "
+            + "USING iceberg "
+            + "PARTITIONED BY (c2) "
+            + "TBLPROPERTIES ('%s' '%s')",
+        tableName,
+        TableProperties.WRITE_DISTRIBUTION_MODE,
+        TableProperties.WRITE_DISTRIBUTION_MODE_NONE);
   }
 
   private void insertData(int filesCount) {
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
index 7a7b0d0bbc..b2fbb1fa78 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
@@ -214,7 +214,7 @@ public class SparkWriteConf {
       DistributionMode mode = DistributionMode.fromName(modeName);
       return adjustWriteDistributionMode(mode);
     } else {
-      return table.sortOrder().isSorted() ? RANGE : NONE;
+      return defaultWriteDistributionMode();
     }
   }
 
@@ -228,6 +228,16 @@ public class SparkWriteConf {
     }
   }
 
+  private DistributionMode defaultWriteDistributionMode() {
+    if (table.sortOrder().isSorted()) {
+      return RANGE;
+    } else if (table.spec().isPartitioned()) {
+      return HASH;
+    } else {
+      return NONE;
+    }
+  }
+
   public DistributionMode deleteDistributionMode() {
     String deleteModeName =
         confParser
@@ -261,6 +271,10 @@ public class SparkWriteConf {
     if (mergeModeName != null) {
       DistributionMode mergeMode = DistributionMode.fromName(mergeModeName);
       return adjustWriteDistributionMode(mergeMode);
+
+    } else if (table.spec().isPartitioned()) {
+      return HASH;
+
     } else {
       return distributionMode();
     }
@@ -272,8 +286,9 @@ public class SparkWriteConf {
             .stringConf()
             .option(SparkWriteOptions.DISTRIBUTION_MODE)
             .tableProperty(TableProperties.MERGE_DISTRIBUTION_MODE)
-            .parseOptional();
-    return mergeModeName != null ? DistributionMode.fromName(mergeModeName) : distributionMode();
+            .defaultValue(TableProperties.WRITE_DISTRIBUTION_MODE_HASH)
+            .parse();
+    return DistributionMode.fromName(mergeModeName);
   }
 
   public boolean useTableDistributionAndOrdering() {
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
index 7a5fdad5a5..c6b1eaeceb 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
@@ -191,13 +191,17 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
 
     Table table = validationCatalog.loadTable(tableIdent);
 
+    Expression[] expectedClustering =
+        new Expression[] {Expressions.identity("date"), Expressions.days("ts")};
+    Distribution expectedDistribution = Distributions.clustered(expectedClustering);
+
     SortOrder[] expectedOrdering =
         new SortOrder[] {
           Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING),
           Expressions.sort(Expressions.days("ts"), SortDirection.ASCENDING)
         };
 
-    checkWriteDistributionAndOrdering(table, UNSPECIFIED_DISTRIBUTION, expectedOrdering);
+    checkWriteDistributionAndOrdering(table, expectedDistribution, expectedOrdering);
   }
 
   @Test
@@ -1050,18 +1054,27 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
   //
   // PARTITIONED BY date, days(ts) UNORDERED
   // -------------------------------------------------------------------------
-  // merge mode is NOT SET -> use write distribution and ordering
+  // merge mode is NOT SET -> CLUSTER BY date, days(ts) + LOCALLY ORDER BY date, days(ts)
   // merge mode is NONE -> unspecified distribution + LOCALLY ORDERED BY date, days(ts)
   // merge mode is HASH -> CLUSTER BY date, days(ts) + LOCALLY ORDER BY date, days(ts)
   // merge mode is RANGE -> ORDER BY date, days(ts)
   //
   // PARTITIONED BY date ORDERED BY id
   // -------------------------------------------------------------------------
-  // merge mode is NOT SET -> use write distribution and ordering
+  // merge mode is NOT SET -> CLUSTER BY date + LOCALLY ORDER BY date, id
   // merge mode is NONE -> unspecified distribution + LOCALLY ORDERED BY date, id
   // merge mode is HASH -> CLUSTER BY date + LOCALLY ORDER BY date, id
   // merge mode is RANGE -> ORDERED BY date, id
 
+  @Test
+  public void testDefaultCopyOnWriteMergeUnpartitionedUnsortedTable() {
+    sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    checkCopyOnWriteDistributionAndOrdering(table, MERGE, UNSPECIFIED_DISTRIBUTION, EMPTY_ORDERING);
+  }
+
   @Test
   public void testNoneCopyOnWriteMergeUnpartitionedUnsortedTable() {
     sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
@@ -1095,6 +1108,25 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
     checkCopyOnWriteDistributionAndOrdering(table, MERGE, UNSPECIFIED_DISTRIBUTION, EMPTY_ORDERING);
   }
 
+  @Test
+  public void testDefaultCopyOnWriteMergeUnpartitionedSortedTable() {
+    sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    table.replaceSortOrder().asc("id").asc("data").commit();
+
+    SortOrder[] expectedOrdering =
+        new SortOrder[] {
+          Expressions.sort(Expressions.column("id"), SortDirection.ASCENDING),
+          Expressions.sort(Expressions.column("data"), SortDirection.ASCENDING)
+        };
+
+    Distribution expectedDistribution = Distributions.ordered(expectedOrdering);
+
+    checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
+  }
+
   @Test
   public void testNoneCopyOnWriteMergeUnpartitionedSortedTable() {
     sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
@@ -1156,6 +1188,29 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
     checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
   }
 
+  @Test
+  public void testDefaultCopyOnWriteMergePartitionedUnsortedTable() {
+    sql(
+        "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
+            + "USING iceberg "
+            + "PARTITIONED BY (date, days(ts))",
+        tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Expression[] expectedClustering =
+        new Expression[] {Expressions.identity("date"), Expressions.days("ts")};
+    Distribution expectedDistribution = Distributions.clustered(expectedClustering);
+
+    SortOrder[] expectedOrdering =
+        new SortOrder[] {
+          Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING),
+          Expressions.sort(Expressions.days("ts"), SortDirection.ASCENDING)
+        };
+
+    checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
+  }
+
   @Test
   public void testNoneCopyOnWriteMergePartitionedUnsortedTable() {
     sql(
@@ -1226,6 +1281,30 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
     checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
   }
 
+  @Test
+  public void testDefaultCopyOnWriteMergePartitionedSortedTable() {
+    sql(
+        "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
+            + "USING iceberg "
+            + "PARTITIONED BY (date)",
+        tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    table.replaceSortOrder().desc("id").commit();
+
+    Expression[] expectedClustering = new Expression[] {Expressions.identity("date")};
+    Distribution expectedDistribution = Distributions.clustered(expectedClustering);
+
+    SortOrder[] expectedOrdering =
+        new SortOrder[] {
+          Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING),
+          Expressions.sort(Expressions.column("id"), SortDirection.DESCENDING)
+        };
+
+    checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
+  }
+
   @Test
   public void testNoneCopyOnWriteMergePartitionedSortedTable() {
     sql(
@@ -1585,9 +1664,10 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
   //
   // UNPARTITIONED UNORDERED
   // -------------------------------------------------------------------------
-  // merge mode is NOT SET -> use write mode
-  // merge mode is NONE -> unspecified distribution + LOCALLY ORDER BY _spec_id, _partition, _file,
-  // _pos
+  // merge mode is NOT SET -> CLUSTER BY _spec_id, _partition, _file +
+  //                          LOCALLY ORDER BY _spec_id, _partition, _file, _pos
+  // merge mode is NONE -> unspecified distribution +
+  //                       LOCALLY ORDER BY _spec_id, _partition, _file, _pos
   // merge mode is HASH -> CLUSTER BY _spec_id, _partition, _file +
   //                       LOCALLY ORDER BY _spec_id, _partition, _file, _pos
   // merge mode is RANGE -> RANGE DISTRIBUTE BY _spec_id, _partition, _file +
@@ -1595,7 +1675,8 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
   //
   // UNPARTITIONED ORDERED BY id, data
   // -------------------------------------------------------------------------
-  // merge mode is NOT SET -> use write mode
+  // merge mode is NOT SET -> CLUSTER BY _spec_id, _partition, _file +
+  //                          LOCALLY ORDER BY _spec_id, _partition, _file, _pos, id, data
   // merge mode is NONE -> unspecified distribution +
   //                       LOCALLY ORDER BY _spec_id, _partition, _file, _pos, id, data
   // merge mode is HASH -> CLUSTER BY _spec_id, _partition, _file +
@@ -1605,7 +1686,8 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
   //
   // PARTITIONED BY date, days(ts) UNORDERED
   // -------------------------------------------------------------------------
-  // merge mode is NOT SET -> use write mode
+  // merge mode is NOT SET -> CLUSTER BY _spec_id, _partition, date, days(ts) +
+  //                          LOCALLY ORDER BY _spec_id, _partition, _file, _pos, date, days(ts)
   // merge mode is NONE -> unspecified distribution +
   //                       LOCALLY ORDERED BY _spec_id, _partition, _file, _pos, date, days(ts)
   // merge mode is HASH -> CLUSTER BY _spec_id, _partition, date, days(ts) +
@@ -1615,7 +1697,8 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
   //
   // PARTITIONED BY date ORDERED BY id
   // -------------------------------------------------------------------------
-  // merge mode is NOT SET -> use write mode
+  // merge mode is NOT SET -> CLUSTER BY _spec_id, _partition, date +
+  //                          LOCALLY ORDER BY _spec_id, _partition, _file, _pos, date, id
   // merge mode is NONE -> unspecified distribution +
   //                       LOCALLY ORDERED BY _spec_id, _partition, _file, _pos, date, id
   // merge mode is HASH -> CLUSTER BY _spec_id, _partition, date +
@@ -1623,6 +1706,24 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
   // merge mode is RANGE -> RANGE DISTRIBUTE BY _spec_id, _partition, _file, date, id
   //                        LOCALLY ORDERED BY _spec_id, _partition, _file, _pos, date, id
 
+  @Test
+  public void testDefaultPositionDeltaMergeUnpartitionedUnsortedTable() {
+    sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Expression[] expectedClustering =
+        new Expression[] {
+          Expressions.column(MetadataColumns.SPEC_ID.name()),
+          Expressions.column(MetadataColumns.PARTITION_COLUMN_NAME),
+          Expressions.column(MetadataColumns.FILE_PATH.name())
+        };
+    Distribution expectedDistribution = Distributions.clustered(expectedClustering);
+
+    checkPositionDeltaDistributionAndOrdering(
+        table, MERGE, expectedDistribution, SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
+  }
+
   @Test
   public void testNonePositionDeltaMergeUnpartitionedUnsortedTable() {
     sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
@@ -1678,6 +1779,39 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
         table, MERGE, expectedDistribution, SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
   }
 
+  @Test
+  public void testDefaultPositionDeltaMergeUnpartitionedSortedTable() {
+    sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    table.replaceSortOrder().asc("id").asc("data").commit();
+
+    Expression[] expectedClustering =
+        new Expression[] {
+          Expressions.column(MetadataColumns.SPEC_ID.name()),
+          Expressions.column(MetadataColumns.PARTITION_COLUMN_NAME),
+          Expressions.column(MetadataColumns.FILE_PATH.name())
+        };
+    Distribution expectedDistribution = Distributions.clustered(expectedClustering);
+
+    SortOrder[] expectedOrdering =
+        new SortOrder[] {
+          Expressions.sort(
+              Expressions.column(MetadataColumns.SPEC_ID.name()), SortDirection.ASCENDING),
+          Expressions.sort(
+              Expressions.column(MetadataColumns.PARTITION_COLUMN_NAME), SortDirection.ASCENDING),
+          Expressions.sort(
+              Expressions.column(MetadataColumns.FILE_PATH.name()), SortDirection.ASCENDING),
+          Expressions.sort(
+              Expressions.column(MetadataColumns.ROW_POSITION.name()), SortDirection.ASCENDING),
+          Expressions.sort(Expressions.column("id"), SortDirection.ASCENDING),
+          Expressions.sort(Expressions.column("data"), SortDirection.ASCENDING)
+        };
+
+    checkPositionDeltaDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
+  }
+
   @Test
   public void testNonePositionDeltaMergeUnpartitionedSortedTable() {
     sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
@@ -1781,6 +1915,42 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
     checkPositionDeltaDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
   }
 
+  @Test
+  public void testDefaultPositionDeltaMergePartitionedUnsortedTable() {
+    sql(
+        "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
+            + "USING iceberg "
+            + "PARTITIONED BY (date, days(ts))",
+        tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Expression[] expectedClustering =
+        new Expression[] {
+          Expressions.column(MetadataColumns.SPEC_ID.name()),
+          Expressions.column(MetadataColumns.PARTITION_COLUMN_NAME),
+          Expressions.identity("date"),
+          Expressions.days("ts")
+        };
+    Distribution expectedDistribution = Distributions.clustered(expectedClustering);
+
+    SortOrder[] expectedOrdering =
+        new SortOrder[] {
+          Expressions.sort(
+              Expressions.column(MetadataColumns.SPEC_ID.name()), SortDirection.ASCENDING),
+          Expressions.sort(
+              Expressions.column(MetadataColumns.PARTITION_COLUMN_NAME), SortDirection.ASCENDING),
+          Expressions.sort(
+              Expressions.column(MetadataColumns.FILE_PATH.name()), SortDirection.ASCENDING),
+          Expressions.sort(
+              Expressions.column(MetadataColumns.ROW_POSITION.name()), SortDirection.ASCENDING),
+          Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING),
+          Expressions.sort(Expressions.days("ts"), SortDirection.ASCENDING)
+        };
+
+    checkPositionDeltaDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
+  }
+
   @Test
   public void testNonePositionDeltaMergePartitionedUnsortedTable() {
     sql(
@@ -1923,6 +2093,45 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
         table, MERGE, UNSPECIFIED_DISTRIBUTION, expectedOrdering);
   }
 
+  @Test
+  public void testDefaultPositionDeltaMergePartitionedSortedTable() {
+    sql(
+        "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
+            + "USING iceberg "
+            + "PARTITIONED BY (date, bucket(8, data))",
+        tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    table.replaceSortOrder().asc("id").commit();
+
+    Expression[] expectedClustering =
+        new Expression[] {
+          Expressions.column(MetadataColumns.SPEC_ID.name()),
+          Expressions.column(MetadataColumns.PARTITION_COLUMN_NAME),
+          Expressions.identity("date"),
+          Expressions.bucket(8, "data")
+        };
+    Distribution expectedDistribution = Distributions.clustered(expectedClustering);
+
+    SortOrder[] expectedOrdering =
+        new SortOrder[] {
+          Expressions.sort(
+              Expressions.column(MetadataColumns.SPEC_ID.name()), SortDirection.ASCENDING),
+          Expressions.sort(
+              Expressions.column(MetadataColumns.PARTITION_COLUMN_NAME), SortDirection.ASCENDING),
+          Expressions.sort(
+              Expressions.column(MetadataColumns.FILE_PATH.name()), SortDirection.ASCENDING),
+          Expressions.sort(
+              Expressions.column(MetadataColumns.ROW_POSITION.name()), SortDirection.ASCENDING),
+          Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING),
+          Expressions.sort(Expressions.bucket(8, "data"), SortDirection.ASCENDING),
+          Expressions.sort(Expressions.column("id"), SortDirection.ASCENDING)
+        };
+
+    checkPositionDeltaDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
+  }
+
   @Test
   public void testHashPositionDeltaMergePartitionedSortedTable() {
     sql(
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index 9382846e1e..4aafb72ace 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -50,6 +50,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.SparkTableUtil;
 import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.spark.source.ThreeColumnRecord;
 import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.Dataset;
@@ -559,7 +560,12 @@ public class TestRewriteManifestsAction extends SparkTestBase {
   }
 
   private void writeDF(Dataset<Row> df) {
-    df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation);
+    df.select("c1", "c2", "c3")
+        .write()
+        .format("iceberg")
+        .option(SparkWriteOptions.DISTRIBUTION_MODE, TableProperties.WRITE_DISTRIBUTION_MODE_NONE)
+        .mode("append")
+        .save(tableLocation);
   }
 
   private long computeManifestEntrySizeBytes(List<ManifestFile> manifests) {
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 86bb980953..0f6ae3f20d 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -65,6 +65,7 @@ import org.apache.iceberg.spark.SparkReadOptions;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.SparkTableUtil;
 import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.spark.actions.SparkActions;
 import org.apache.iceberg.spark.data.TestHelpers;
 import org.apache.iceberg.types.Types;
@@ -1010,6 +1011,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
         .write()
         .format("iceberg")
         .mode("append")
+        .option(SparkWriteOptions.DISTRIBUTION_MODE, TableProperties.WRITE_DISTRIBUTION_MODE_NONE)
         .save(loadLocation(tableIdentifier));
 
     table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
index c231afd5f8..ad0984ef42 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
@@ -491,6 +491,7 @@ public class TestPartitionValues {
             .option(SparkReadOptions.VECTORIZATION_ENABLED, String.valueOf(vectorized))
             .load(baseLocation)
             .select("struct.innerName")
+            .orderBy("struct.innerName")
             .as(Encoders.STRING())
             .collectAsList();
 
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index 66247c24e2..dac1c150cd 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.when;
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.AssertHelpers;
@@ -41,6 +42,7 @@ import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.types.Types;
@@ -524,7 +526,10 @@ public class TestSparkDataWrite {
 
     HadoopTables tables = new HadoopTables(CONF);
     PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
-    Table table = tables.create(SCHEMA, spec, location.toString());
+    Map<String, String> properties =
+        ImmutableMap.of(
+            TableProperties.WRITE_DISTRIBUTION_MODE, TableProperties.WRITE_DISTRIBUTION_MODE_NONE);
+    Table table = tables.create(SCHEMA, spec, properties, location.toString());
 
     List<SimpleRecord> expected = Lists.newArrayListWithCapacity(8000);
     for (int i = 0; i < 2000; i++) {