You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2023/02/16 05:10:08 UTC
[iceberg] branch master updated: Spark 3.3: Change default distribution modes (#6828)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new a6ad1d1312 Spark 3.3: Change default distribution modes (#6828)
a6ad1d1312 is described below
commit a6ad1d1312bd1a0937bf7c9297f8bd553ab050a0
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Wed Feb 15 21:10:02 2023 -0800
Spark 3.3: Change default distribution modes (#6828)
---
.../extensions/TestRewriteDataFilesProcedure.java | 10 +-
.../org/apache/iceberg/spark/SparkWriteConf.java | 21 +-
.../TestSparkDistributionAndOrderingUtil.java | 227 ++++++++++++++++++++-
.../spark/actions/TestRewriteManifestsAction.java | 8 +-
.../spark/source/TestIcebergSourceTablesBase.java | 2 +
.../iceberg/spark/source/TestPartitionValues.java | 1 +
.../iceberg/spark/source/TestSparkDataWrite.java | 7 +-
7 files changed, 260 insertions(+), 16 deletions(-)
diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 4126f35919..d13e0967b6 100644
--- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.stream.IntStream;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.NamedReference;
@@ -661,8 +662,13 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
private void createPartitionTable() {
sql(
- "CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg PARTITIONED BY (c2)",
- tableName);
+ "CREATE TABLE %s (c1 int, c2 string, c3 string) "
+ + "USING iceberg "
+ + "PARTITIONED BY (c2) "
+ + "TBLPROPERTIES ('%s' '%s')",
+ tableName,
+ TableProperties.WRITE_DISTRIBUTION_MODE,
+ TableProperties.WRITE_DISTRIBUTION_MODE_NONE);
}
private void insertData(int filesCount) {
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
index 7a7b0d0bbc..b2fbb1fa78 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
@@ -214,7 +214,7 @@ public class SparkWriteConf {
DistributionMode mode = DistributionMode.fromName(modeName);
return adjustWriteDistributionMode(mode);
} else {
- return table.sortOrder().isSorted() ? RANGE : NONE;
+ return defaultWriteDistributionMode();
}
}
@@ -228,6 +228,16 @@ public class SparkWriteConf {
}
}
+ private DistributionMode defaultWriteDistributionMode() {
+ if (table.sortOrder().isSorted()) {
+ return RANGE;
+ } else if (table.spec().isPartitioned()) {
+ return HASH;
+ } else {
+ return NONE;
+ }
+ }
+
public DistributionMode deleteDistributionMode() {
String deleteModeName =
confParser
@@ -261,6 +271,10 @@ public class SparkWriteConf {
if (mergeModeName != null) {
DistributionMode mergeMode = DistributionMode.fromName(mergeModeName);
return adjustWriteDistributionMode(mergeMode);
+
+ } else if (table.spec().isPartitioned()) {
+ return HASH;
+
} else {
return distributionMode();
}
@@ -272,8 +286,9 @@ public class SparkWriteConf {
.stringConf()
.option(SparkWriteOptions.DISTRIBUTION_MODE)
.tableProperty(TableProperties.MERGE_DISTRIBUTION_MODE)
- .parseOptional();
- return mergeModeName != null ? DistributionMode.fromName(mergeModeName) : distributionMode();
+ .defaultValue(TableProperties.WRITE_DISTRIBUTION_MODE_HASH)
+ .parse();
+ return DistributionMode.fromName(mergeModeName);
}
public boolean useTableDistributionAndOrdering() {
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
index 7a5fdad5a5..c6b1eaeceb 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
@@ -191,13 +191,17 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
Table table = validationCatalog.loadTable(tableIdent);
+ Expression[] expectedClustering =
+ new Expression[] {Expressions.identity("date"), Expressions.days("ts")};
+ Distribution expectedDistribution = Distributions.clustered(expectedClustering);
+
SortOrder[] expectedOrdering =
new SortOrder[] {
Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING),
Expressions.sort(Expressions.days("ts"), SortDirection.ASCENDING)
};
- checkWriteDistributionAndOrdering(table, UNSPECIFIED_DISTRIBUTION, expectedOrdering);
+ checkWriteDistributionAndOrdering(table, expectedDistribution, expectedOrdering);
}
@Test
@@ -1050,18 +1054,27 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
//
// PARTITIONED BY date, days(ts) UNORDERED
// -------------------------------------------------------------------------
- // merge mode is NOT SET -> use write distribution and ordering
+ // merge mode is NOT SET -> CLUSTER BY date, days(ts) + LOCALLY ORDER BY date, days(ts)
// merge mode is NONE -> unspecified distribution + LOCALLY ORDERED BY date, days(ts)
// merge mode is HASH -> CLUSTER BY date, days(ts) + LOCALLY ORDER BY date, days(ts)
// merge mode is RANGE -> ORDER BY date, days(ts)
//
// PARTITIONED BY date ORDERED BY id
// -------------------------------------------------------------------------
- // merge mode is NOT SET -> use write distribution and ordering
+ // merge mode is NOT SET -> CLUSTER BY date + LOCALLY ORDER BY date, id
// merge mode is NONE -> unspecified distribution + LOCALLY ORDERED BY date, id
// merge mode is HASH -> CLUSTER BY date + LOCALLY ORDER BY date, id
// merge mode is RANGE -> ORDERED BY date, id
+ @Test
+ public void testDefaultCopyOnWriteMergeUnpartitionedUnsortedTable() {
+ sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ checkCopyOnWriteDistributionAndOrdering(table, MERGE, UNSPECIFIED_DISTRIBUTION, EMPTY_ORDERING);
+ }
+
@Test
public void testNoneCopyOnWriteMergeUnpartitionedUnsortedTable() {
sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
@@ -1095,6 +1108,25 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
checkCopyOnWriteDistributionAndOrdering(table, MERGE, UNSPECIFIED_DISTRIBUTION, EMPTY_ORDERING);
}
+ @Test
+ public void testDefaultCopyOnWriteMergeUnpartitionedSortedTable() {
+ sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ table.replaceSortOrder().asc("id").asc("data").commit();
+
+ SortOrder[] expectedOrdering =
+ new SortOrder[] {
+ Expressions.sort(Expressions.column("id"), SortDirection.ASCENDING),
+ Expressions.sort(Expressions.column("data"), SortDirection.ASCENDING)
+ };
+
+ Distribution expectedDistribution = Distributions.ordered(expectedOrdering);
+
+ checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
+ }
+
@Test
public void testNoneCopyOnWriteMergeUnpartitionedSortedTable() {
sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
@@ -1156,6 +1188,29 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
}
+ @Test
+ public void testDefaultCopyOnWriteMergePartitionedUnsortedTable() {
+ sql(
+ "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
+ + "USING iceberg "
+ + "PARTITIONED BY (date, days(ts))",
+ tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ Expression[] expectedClustering =
+ new Expression[] {Expressions.identity("date"), Expressions.days("ts")};
+ Distribution expectedDistribution = Distributions.clustered(expectedClustering);
+
+ SortOrder[] expectedOrdering =
+ new SortOrder[] {
+ Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING),
+ Expressions.sort(Expressions.days("ts"), SortDirection.ASCENDING)
+ };
+
+ checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
+ }
+
@Test
public void testNoneCopyOnWriteMergePartitionedUnsortedTable() {
sql(
@@ -1226,6 +1281,30 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
}
+ @Test
+ public void testDefaultCopyOnWriteMergePartitionedSortedTable() {
+ sql(
+ "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
+ + "USING iceberg "
+ + "PARTITIONED BY (date)",
+ tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ table.replaceSortOrder().desc("id").commit();
+
+ Expression[] expectedClustering = new Expression[] {Expressions.identity("date")};
+ Distribution expectedDistribution = Distributions.clustered(expectedClustering);
+
+ SortOrder[] expectedOrdering =
+ new SortOrder[] {
+ Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING),
+ Expressions.sort(Expressions.column("id"), SortDirection.DESCENDING)
+ };
+
+ checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
+ }
+
@Test
public void testNoneCopyOnWriteMergePartitionedSortedTable() {
sql(
@@ -1585,9 +1664,10 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
//
// UNPARTITIONED UNORDERED
// -------------------------------------------------------------------------
- // merge mode is NOT SET -> use write mode
- // merge mode is NONE -> unspecified distribution + LOCALLY ORDER BY _spec_id, _partition, _file,
- // _pos
+ // merge mode is NOT SET -> CLUSTER BY _spec_id, _partition, _file +
+ // LOCALLY ORDER BY _spec_id, _partition, _file, _pos
+ // merge mode is NONE -> unspecified distribution +
+ // LOCALLY ORDER BY _spec_id, _partition, _file, _pos
// merge mode is HASH -> CLUSTER BY _spec_id, _partition, _file +
// LOCALLY ORDER BY _spec_id, _partition, _file, _pos
// merge mode is RANGE -> RANGE DISTRIBUTE BY _spec_id, _partition, _file +
@@ -1595,7 +1675,8 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
//
// UNPARTITIONED ORDERED BY id, data
// -------------------------------------------------------------------------
- // merge mode is NOT SET -> use write mode
+ // merge mode is NOT SET -> CLUSTER BY _spec_id, _partition, _file +
+ // LOCALLY ORDER BY _spec_id, _partition, _file, _pos, id, data
// merge mode is NONE -> unspecified distribution +
// LOCALLY ORDER BY _spec_id, _partition, _file, _pos, id, data
// merge mode is HASH -> CLUSTER BY _spec_id, _partition, _file +
@@ -1605,7 +1686,8 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
//
// PARTITIONED BY date, days(ts) UNORDERED
// -------------------------------------------------------------------------
- // merge mode is NOT SET -> use write mode
+ // merge mode is NOT SET -> CLUSTER BY _spec_id, _partition, date, days(ts) +
+ // LOCALLY ORDER BY _spec_id, _partition, _file, _pos, date, days(ts)
// merge mode is NONE -> unspecified distribution +
// LOCALLY ORDERED BY _spec_id, _partition, _file, _pos, date, days(ts)
// merge mode is HASH -> CLUSTER BY _spec_id, _partition, date, days(ts) +
@@ -1615,7 +1697,8 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
//
// PARTITIONED BY date ORDERED BY id
// -------------------------------------------------------------------------
- // merge mode is NOT SET -> use write mode
+ // merge mode is NOT SET -> CLUSTER BY _spec_id, _partition, date +
+ // LOCALLY ORDER BY _spec_id, _partition, _file, _pos, date, id
// merge mode is NONE -> unspecified distribution +
// LOCALLY ORDERED BY _spec_id, _partition, _file, _pos, date, id
// merge mode is HASH -> CLUSTER BY _spec_id, _partition, date +
@@ -1623,6 +1706,24 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
// merge mode is RANGE -> RANGE DISTRIBUTE BY _spec_id, _partition, _file, date, id
// LOCALLY ORDERED BY _spec_id, _partition, _file, _pos, date, id
+ @Test
+ public void testDefaultPositionDeltaMergeUnpartitionedUnsortedTable() {
+ sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ Expression[] expectedClustering =
+ new Expression[] {
+ Expressions.column(MetadataColumns.SPEC_ID.name()),
+ Expressions.column(MetadataColumns.PARTITION_COLUMN_NAME),
+ Expressions.column(MetadataColumns.FILE_PATH.name())
+ };
+ Distribution expectedDistribution = Distributions.clustered(expectedClustering);
+
+ checkPositionDeltaDistributionAndOrdering(
+ table, MERGE, expectedDistribution, SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
+ }
+
@Test
public void testNonePositionDeltaMergeUnpartitionedUnsortedTable() {
sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
@@ -1678,6 +1779,39 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
table, MERGE, expectedDistribution, SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
}
+ @Test
+ public void testDefaultPositionDeltaMergeUnpartitionedSortedTable() {
+ sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ table.replaceSortOrder().asc("id").asc("data").commit();
+
+ Expression[] expectedClustering =
+ new Expression[] {
+ Expressions.column(MetadataColumns.SPEC_ID.name()),
+ Expressions.column(MetadataColumns.PARTITION_COLUMN_NAME),
+ Expressions.column(MetadataColumns.FILE_PATH.name())
+ };
+ Distribution expectedDistribution = Distributions.clustered(expectedClustering);
+
+ SortOrder[] expectedOrdering =
+ new SortOrder[] {
+ Expressions.sort(
+ Expressions.column(MetadataColumns.SPEC_ID.name()), SortDirection.ASCENDING),
+ Expressions.sort(
+ Expressions.column(MetadataColumns.PARTITION_COLUMN_NAME), SortDirection.ASCENDING),
+ Expressions.sort(
+ Expressions.column(MetadataColumns.FILE_PATH.name()), SortDirection.ASCENDING),
+ Expressions.sort(
+ Expressions.column(MetadataColumns.ROW_POSITION.name()), SortDirection.ASCENDING),
+ Expressions.sort(Expressions.column("id"), SortDirection.ASCENDING),
+ Expressions.sort(Expressions.column("data"), SortDirection.ASCENDING)
+ };
+
+ checkPositionDeltaDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
+ }
+
@Test
public void testNonePositionDeltaMergeUnpartitionedSortedTable() {
sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
@@ -1781,6 +1915,42 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
checkPositionDeltaDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
}
+ @Test
+ public void testDefaultPositionDeltaMergePartitionedUnsortedTable() {
+ sql(
+ "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
+ + "USING iceberg "
+ + "PARTITIONED BY (date, days(ts))",
+ tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ Expression[] expectedClustering =
+ new Expression[] {
+ Expressions.column(MetadataColumns.SPEC_ID.name()),
+ Expressions.column(MetadataColumns.PARTITION_COLUMN_NAME),
+ Expressions.identity("date"),
+ Expressions.days("ts")
+ };
+ Distribution expectedDistribution = Distributions.clustered(expectedClustering);
+
+ SortOrder[] expectedOrdering =
+ new SortOrder[] {
+ Expressions.sort(
+ Expressions.column(MetadataColumns.SPEC_ID.name()), SortDirection.ASCENDING),
+ Expressions.sort(
+ Expressions.column(MetadataColumns.PARTITION_COLUMN_NAME), SortDirection.ASCENDING),
+ Expressions.sort(
+ Expressions.column(MetadataColumns.FILE_PATH.name()), SortDirection.ASCENDING),
+ Expressions.sort(
+ Expressions.column(MetadataColumns.ROW_POSITION.name()), SortDirection.ASCENDING),
+ Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING),
+ Expressions.sort(Expressions.days("ts"), SortDirection.ASCENDING)
+ };
+
+ checkPositionDeltaDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
+ }
+
@Test
public void testNonePositionDeltaMergePartitionedUnsortedTable() {
sql(
@@ -1923,6 +2093,45 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
table, MERGE, UNSPECIFIED_DISTRIBUTION, expectedOrdering);
}
+ @Test
+ public void testDefaultPositionDeltaMergePartitionedSortedTable() {
+ sql(
+ "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
+ + "USING iceberg "
+ + "PARTITIONED BY (date, bucket(8, data))",
+ tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ table.replaceSortOrder().asc("id").commit();
+
+ Expression[] expectedClustering =
+ new Expression[] {
+ Expressions.column(MetadataColumns.SPEC_ID.name()),
+ Expressions.column(MetadataColumns.PARTITION_COLUMN_NAME),
+ Expressions.identity("date"),
+ Expressions.bucket(8, "data")
+ };
+ Distribution expectedDistribution = Distributions.clustered(expectedClustering);
+
+ SortOrder[] expectedOrdering =
+ new SortOrder[] {
+ Expressions.sort(
+ Expressions.column(MetadataColumns.SPEC_ID.name()), SortDirection.ASCENDING),
+ Expressions.sort(
+ Expressions.column(MetadataColumns.PARTITION_COLUMN_NAME), SortDirection.ASCENDING),
+ Expressions.sort(
+ Expressions.column(MetadataColumns.FILE_PATH.name()), SortDirection.ASCENDING),
+ Expressions.sort(
+ Expressions.column(MetadataColumns.ROW_POSITION.name()), SortDirection.ASCENDING),
+ Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING),
+ Expressions.sort(Expressions.bucket(8, "data"), SortDirection.ASCENDING),
+ Expressions.sort(Expressions.column("id"), SortDirection.ASCENDING)
+ };
+
+ checkPositionDeltaDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
+ }
+
@Test
public void testHashPositionDeltaMergePartitionedSortedTable() {
sql(
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index 9382846e1e..4aafb72ace 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -50,6 +50,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
@@ -559,7 +560,12 @@ public class TestRewriteManifestsAction extends SparkTestBase {
}
private void writeDF(Dataset<Row> df) {
- df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation);
+ df.select("c1", "c2", "c3")
+ .write()
+ .format("iceberg")
+ .option(SparkWriteOptions.DISTRIBUTION_MODE, TableProperties.WRITE_DISTRIBUTION_MODE_NONE)
+ .mode("append")
+ .save(tableLocation);
}
private long computeManifestEntrySizeBytes(List<ManifestFile> manifests) {
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 86bb980953..0f6ae3f20d 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -65,6 +65,7 @@ import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.types.Types;
@@ -1010,6 +1011,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
.write()
.format("iceberg")
.mode("append")
+ .option(SparkWriteOptions.DISTRIBUTION_MODE, TableProperties.WRITE_DISTRIBUTION_MODE_NONE)
.save(loadLocation(tableIdentifier));
table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
index c231afd5f8..ad0984ef42 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
@@ -491,6 +491,7 @@ public class TestPartitionValues {
.option(SparkReadOptions.VECTORIZATION_ENABLED, String.valueOf(vectorized))
.load(baseLocation)
.select("struct.innerName")
+ .orderBy("struct.innerName")
.as(Encoders.STRING())
.collectAsList();
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index 66247c24e2..dac1c150cd 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.AssertHelpers;
@@ -41,6 +42,7 @@ import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.types.Types;
@@ -524,7 +526,10 @@ public class TestSparkDataWrite {
HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
- Table table = tables.create(SCHEMA, spec, location.toString());
+ Map<String, String> properties =
+ ImmutableMap.of(
+ TableProperties.WRITE_DISTRIBUTION_MODE, TableProperties.WRITE_DISTRIBUTION_MODE_NONE);
+ Table table = tables.create(SCHEMA, spec, properties, location.toString());
List<SimpleRecord> expected = Lists.newArrayListWithCapacity(8000);
for (int i = 0; i < 2000; i++) {