You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/08/14 00:18:57 UTC
[incubator-iceberg] branch master updated: Allow read split
behavior overrides (#372)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 1b0b9c2 Allow read split behavior overrides (#372)
1b0b9c2 is described below
commit 1b0b9c27f90d710bbc95b07c41c99b0eaf36f045
Author: Xabriel J. Collazo Mojica <xc...@adobe.com>
AuthorDate: Tue Aug 13 17:18:53 2019 -0700
Allow read split behavior overrides (#372)
---
.../main/java/org/apache/iceberg/TableScan.java | 10 ++++
.../java/org/apache/iceberg/BaseTableScan.java | 61 ++++++++++++++++------
.../java/org/apache/iceberg/DataFilesTable.java | 11 ++--
.../java/org/apache/iceberg/DataTableScan.java | 10 ++--
.../org/apache/iceberg/ManifestEntriesTable.java | 11 ++--
.../java/org/apache/iceberg/StaticTableScan.java | 10 ++--
.../java/org/apache/iceberg/TestSplitPlanning.java | 40 ++++++++++++++
site/docs/configuration.md | 13 +++--
.../org/apache/iceberg/spark/source/Reader.java | 22 ++++++++
.../spark/source/TestDataSourceOptions.java | 28 ++++++++++
10 files changed, 180 insertions(+), 36 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/TableScan.java b/api/src/main/java/org/apache/iceberg/TableScan.java
index 8259fd1..5bbcf13 100644
--- a/api/src/main/java/org/apache/iceberg/TableScan.java
+++ b/api/src/main/java/org/apache/iceberg/TableScan.java
@@ -59,6 +59,16 @@ public interface TableScan {
TableScan asOfTime(long timestampMillis);
/**
+ * Create a new {@link TableScan} from this scan's configuration that will override the {@link Table}'s behavior based
+ * on the incoming pair. Unknown properties will be ignored.
+ *
+ * @param property name of the table property to be overridden
+ * @param value value to override with
+ * @return a new scan based on this with overridden behavior
+ */
+ TableScan option(String property, String value);
+
+ /**
* Create a new {@link TableScan} from this with the schema as its projection.
*
* @param schema a projection schema
diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
index b0c52d7..84d852c 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
@@ -22,6 +22,7 @@ package org.apache.iceberg;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import java.time.Instant;
import java.time.LocalDateTime;
@@ -59,14 +60,15 @@ abstract class BaseTableScan implements TableScan {
private final boolean caseSensitive;
private final boolean colStats;
private final Collection<String> selectedColumns;
+ private final ImmutableMap<String, String> options;
protected BaseTableScan(TableOperations ops, Table table, Schema schema) {
- this(ops, table, null, schema, Expressions.alwaysTrue(), true, false, null);
+ this(ops, table, null, schema, Expressions.alwaysTrue(), true, false, null, ImmutableMap.of());
}
protected BaseTableScan(TableOperations ops, Table table, Long snapshotId, Schema schema,
Expression rowFilter, boolean caseSensitive, boolean colStats,
- Collection<String> selectedColumns) {
+ Collection<String> selectedColumns, ImmutableMap<String, String> options) {
this.ops = ops;
this.table = table;
this.snapshotId = snapshotId;
@@ -75,6 +77,7 @@ abstract class BaseTableScan implements TableScan {
this.caseSensitive = caseSensitive;
this.colStats = colStats;
this.selectedColumns = selectedColumns;
+ this.options = options != null ? options : ImmutableMap.of();
}
@SuppressWarnings("checkstyle:HiddenField")
@@ -83,7 +86,8 @@ abstract class BaseTableScan implements TableScan {
@SuppressWarnings("checkstyle:HiddenField")
protected abstract TableScan newRefinedScan(
TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean caseSensitive, boolean colStats, Collection<String> selectedColumns);
+ boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+ ImmutableMap<String, String> options);
@SuppressWarnings("checkstyle:HiddenField")
protected abstract CloseableIterable<FileScanTask> planFiles(
@@ -100,7 +104,8 @@ abstract class BaseTableScan implements TableScan {
"Cannot override snapshot, already set to id=%s", scanSnapshotId);
Preconditions.checkArgument(ops.current().snapshot(scanSnapshotId) != null,
"Cannot find snapshot with ID %s", scanSnapshotId);
- return newRefinedScan(ops, table, scanSnapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+ return newRefinedScan(
+ ops, table, scanSnapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
}
@Override
@@ -124,30 +129,41 @@ abstract class BaseTableScan implements TableScan {
}
@Override
+ public TableScan option(String property, String value) {
+ ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+ builder.putAll(options);
+ builder.put(property, value);
+
+ return newRefinedScan(
+ ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, builder.build());
+ }
+
+ @Override
public TableScan project(Schema projectedSchema) {
return newRefinedScan(
- ops, table, snapshotId, projectedSchema, rowFilter, caseSensitive, colStats, selectedColumns);
+ ops, table, snapshotId, projectedSchema, rowFilter, caseSensitive, colStats, selectedColumns, options);
}
@Override
public TableScan caseSensitive(boolean scanCaseSensitive) {
- return newRefinedScan(ops, table, snapshotId, schema, rowFilter, scanCaseSensitive, colStats, selectedColumns);
+ return newRefinedScan(
+ ops, table, snapshotId, schema, rowFilter, scanCaseSensitive, colStats, selectedColumns, options);
}
@Override
public TableScan includeColumnStats() {
- return newRefinedScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, true, selectedColumns);
+ return newRefinedScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, true, selectedColumns, options);
}
@Override
public TableScan select(Collection<String> columns) {
- return newRefinedScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, columns);
+ return newRefinedScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, columns, options);
}
@Override
public TableScan filter(Expression expr) {
- return newRefinedScan(
- ops, table, snapshotId, schema, Expressions.and(rowFilter, expr), caseSensitive, colStats, selectedColumns);
+ return newRefinedScan(ops, table, snapshotId, schema, Expressions.and(rowFilter, expr), caseSensitive, colStats,
+ selectedColumns, options);
}
@Override
@@ -176,11 +192,26 @@ abstract class BaseTableScan implements TableScan {
@Override
public CloseableIterable<CombinedScanTask> planTasks() {
- long splitSize = targetSplitSize(ops);
- int lookback = ops.current().propertyAsInt(
- TableProperties.SPLIT_LOOKBACK, TableProperties.SPLIT_LOOKBACK_DEFAULT);
- long openFileCost = ops.current().propertyAsLong(
- TableProperties.SPLIT_OPEN_FILE_COST, TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+ long splitSize;
+ if (options.containsKey(TableProperties.SPLIT_SIZE)) {
+ splitSize = Long.parseLong(options.get(TableProperties.SPLIT_SIZE));
+ } else {
+ splitSize = targetSplitSize(ops);
+ }
+ int lookback;
+ if (options.containsKey(TableProperties.SPLIT_LOOKBACK)) {
+ lookback = Integer.parseInt(options.get(TableProperties.SPLIT_LOOKBACK));
+ } else {
+ lookback = ops.current().propertyAsInt(
+ TableProperties.SPLIT_LOOKBACK, TableProperties.SPLIT_LOOKBACK_DEFAULT);
+ }
+ long openFileCost;
+ if (options.containsKey(TableProperties.SPLIT_OPEN_FILE_COST)) {
+ openFileCost = Long.parseLong(options.get(TableProperties.SPLIT_OPEN_FILE_COST));
+ } else {
+ openFileCost = ops.current().propertyAsLong(
+ TableProperties.SPLIT_OPEN_FILE_COST, TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+ }
Function<FileScanTask, Long> weightFunc = file -> Math.max(file.length(), openFileCost);
diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
index 6958992..21d8da6 100644
--- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
@@ -20,6 +20,7 @@
package org.apache.iceberg;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import java.util.Collection;
import org.apache.iceberg.avro.Avro;
@@ -83,17 +84,19 @@ class DataFilesTable extends BaseMetadataTable {
private FilesTableScan(
TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean caseSensitive, boolean colStats, Collection<String> selectedColumns, Schema fileSchema) {
- super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+ boolean caseSensitive, boolean colStats, Collection<String> selectedColumns, Schema fileSchema,
+ ImmutableMap<String, String> options) {
+ super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
this.fileSchema = fileSchema;
}
@Override
protected TableScan newRefinedScan(
TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean caseSensitive, boolean colStats, Collection<String> selectedColumns) {
+ boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+ ImmutableMap<String, String> options) {
return new FilesTableScan(
- ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, fileSchema);
+ ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, fileSchema, options);
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java b/core/src/main/java/org/apache/iceberg/DataTableScan.java
index 4e9e82f..bbb50d3 100644
--- a/core/src/main/java/org/apache/iceberg/DataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java
@@ -22,6 +22,7 @@ package org.apache.iceberg;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import java.util.Collection;
import java.util.List;
@@ -54,16 +55,17 @@ public class DataTableScan extends BaseTableScan {
protected DataTableScan(TableOperations ops, Table table, Long snapshotId, Schema schema,
Expression rowFilter, boolean caseSensitive, boolean colStats,
- Collection<String> selectedColumns) {
- super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+ Collection<String> selectedColumns, ImmutableMap<String, String> options) {
+ super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
}
@Override
protected TableScan newRefinedScan(
TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean caseSensitive, boolean colStats, Collection<String> selectedColumns) {
+ boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+ ImmutableMap<String, String> options) {
return new DataTableScan(
- ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+ ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
}
public CloseableIterable<FileScanTask> planFiles(TableOperations ops, Snapshot snapshot,
diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
index f3fe85e..9f91c12 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
@@ -19,6 +19,7 @@
package org.apache.iceberg;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import java.util.Collection;
import org.apache.iceberg.avro.Avro;
@@ -82,16 +83,18 @@ class ManifestEntriesTable extends BaseMetadataTable {
private EntriesTableScan(
TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean caseSensitive, boolean colStats, Collection<String> selectedColumns) {
- super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+ boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+ ImmutableMap<String, String> options) {
+ super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
}
@Override
protected TableScan newRefinedScan(
TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean caseSensitive, boolean colStats, Collection<String> selectedColumns) {
+ boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+ ImmutableMap<String, String> options) {
return new EntriesTableScan(
- ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+ ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/StaticTableScan.java b/core/src/main/java/org/apache/iceberg/StaticTableScan.java
index c29c14b..2f1f148 100644
--- a/core/src/main/java/org/apache/iceberg/StaticTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/StaticTableScan.java
@@ -19,6 +19,7 @@
package org.apache.iceberg;
+import com.google.common.collect.ImmutableMap;
import java.util.Collection;
import java.util.function.Function;
import org.apache.iceberg.expressions.Expression;
@@ -37,8 +38,8 @@ class StaticTableScan extends BaseTableScan {
private StaticTableScan(
TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
- Function<StaticTableScan, DataTask> buildTask) {
- super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+ Function<StaticTableScan, DataTask> buildTask, ImmutableMap<String, String> options) {
+ super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
this.buildTask = buildTask;
}
@@ -50,9 +51,10 @@ class StaticTableScan extends BaseTableScan {
@Override
protected TableScan newRefinedScan(
TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean caseSensitive, boolean colStats, Collection<String> selectedColumns) {
+ boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+ ImmutableMap<String, String> options) {
return new StaticTableScan(
- ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, buildTask);
+ ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, buildTask, options);
}
@Override
diff --git a/core/src/test/java/org/apache/iceberg/TestSplitPlanning.java b/core/src/test/java/org/apache/iceberg/TestSplitPlanning.java
index c8f0721..0066afd 100644
--- a/core/src/test/java/org/apache/iceberg/TestSplitPlanning.java
+++ b/core/src/test/java/org/apache/iceberg/TestSplitPlanning.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Before;
@@ -101,6 +102,45 @@ public class TestSplitPlanning {
Assert.assertEquals(1, Iterables.size(table.newScan().planTasks()));
}
+ @Test
+ public void testSplitPlanningWithOverridenSize() {
+ List<DataFile> files128Mb = newFiles(4, 128 * 1024 * 1024);
+ appendFiles(files128Mb);
+ // we expect 2 bins since we are overriding split size in scan with 256MB
+ TableScan scan = table.newScan()
+ .option(TableProperties.SPLIT_SIZE, String.valueOf(256L * 1024 * 1024));
+ Assert.assertEquals(2, Iterables.size(scan.planTasks()));
+ }
+
+ @Test
+ public void testSplitPlanningWithOverridenLookback() {
+ List<DataFile> files120Mb = newFiles(1, 120 * 1024 * 1024);
+ List<DataFile> file128Mb = newFiles(1, 128 * 1024 * 1024);
+ Iterable<DataFile> files = Iterables.concat(files120Mb, file128Mb);
+ appendFiles(files);
+ // we expect 2 bins from non-overriden table properties
+ TableScan scan = table.newScan()
+ .option(TableProperties.SPLIT_LOOKBACK, "1");
+ CloseableIterable<CombinedScanTask> tasks = scan.planTasks();
+ Assert.assertEquals(2, Iterables.size(tasks));
+
+ // since lookback was overridden to 1, we expect the first bin to be the largest of the two.
+ CombinedScanTask combinedScanTask = tasks.iterator().next();
+ FileScanTask task = combinedScanTask.files().iterator().next();
+ Assert.assertEquals(128 * 1024 * 1024, task.length());
+ }
+
+ @Test
+ public void testSplitPlanningWithOverridenOpenCostSize() {
+ List<DataFile> files16Mb = newFiles(16, 16 * 1024 * 1024);
+ appendFiles(files16Mb);
+ // we expect 4 bins since we are overriding open file cost in scan with a cost of 32MB
+ // we can fit at most 128Mb/32Mb = 4 files per bin
+ TableScan scan = table.newScan()
+ .option(TableProperties.SPLIT_OPEN_FILE_COST, String.valueOf(32L * 1024 * 1024));
+ Assert.assertEquals(4, Iterables.size(scan.planTasks()));
+ }
+
private void appendFiles(Iterable<DataFile> files) {
AppendFiles appendFiles = table.newAppend();
files.forEach(appendFiles::appendFile);
diff --git a/site/docs/configuration.md b/site/docs/configuration.md
index 3b54405..3e939fe 100644
--- a/site/docs/configuration.md
+++ b/site/docs/configuration.md
@@ -10,7 +10,7 @@ Iceberg tables support table properties to configure table behavior, like the de
| ---------------------------- | ------------------ | ------------------------------------------------------ |
| read.split.target-size | 134217728 (128 MB) | Target size when combining input splits |
| read.split.planning-lookback | 10 | Number of bins to consider when combining input splits |
-
+| read.split.open-file-cost | 4194304 (4 MB) | The estimated cost to open a file, used as a minimum weight when combining splits. |
### Write properties
@@ -52,10 +52,13 @@ spark.read
.load("db.table")
```
-| Spark option | Default | Description |
-| --------------- | -------- | ----------------------------------------------------------------------------------------- |
-| snapshot-id | (latest) | Snapshot ID of the table snapshot to read |
-| as-of-timestamp | (latest) | A timestamp in milliseconds; the snapshot used will be the snapshot current at this time. |
+| Spark option | Default | Description |
+| --------------- | --------------------- | ----------------------------------------------------------------------------------------- |
+| snapshot-id | (latest) | Snapshot ID of the table snapshot to read |
+| as-of-timestamp | (latest) | A timestamp in milliseconds; the snapshot used will be the snapshot current at this time. |
+| split-size | As per table property | Overrides this table's read.split.target-size |
+| lookback | As per table property | Overrides this table's read.split.planning-lookback |
+| file-open-cost | As per table property | Overrides this table's read.split.open-file-cost |
### Write options
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index 91359ff..43c966a 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -43,6 +43,7 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.common.DynMethods;
@@ -99,6 +100,9 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
private final Table table;
private final Long snapshotId;
private final Long asOfTimestamp;
+ private final Long splitSize;
+ private final Integer splitLookback;
+ private final Long splitOpenFileCost;
private final FileIO fileIo;
private final EncryptionManager encryptionManager;
private final boolean caseSensitive;
@@ -119,6 +123,12 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
throw new IllegalArgumentException(
"Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot");
}
+
+ // look for split behavior overrides in options
+ this.splitSize = options.get("split-size").map(Long::parseLong).orElse(null);
+ this.splitLookback = options.get("lookback").map(Integer::parseInt).orElse(null);
+ this.splitOpenFileCost = options.get("file-open-cost").map(Long::parseLong).orElse(null);
+
this.schema = table.schema();
this.fileIo = table.io();
this.encryptionManager = table.encryption();
@@ -233,6 +243,18 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
scan = scan.asOfTime(asOfTimestamp);
}
+ if (splitSize != null) {
+ scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString());
+ }
+
+ if (splitLookback != null) {
+ scan = scan.option(TableProperties.SPLIT_LOOKBACK, splitLookback.toString());
+ }
+
+ if (splitOpenFileCost != null) {
+ scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, splitOpenFileCost.toString());
+ }
+
if (filterExpressions != null) {
for (Expression filter : filterExpressions) {
scan = scan.filter(filter);
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index cbb5351..6d71e04 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -170,4 +170,32 @@ public class TestDataSourceOptions {
sparkHadoopConf.set("fs.default.name", originalDefaultFS);
}
}
+
+ @Test
+ public void testSplitOptionsOverridesTableProperties() throws IOException {
+ String tableLocation = temp.newFolder("iceberg-table").toString();
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Map<String, String> options = Maps.newHashMap();
+ options.put(TableProperties.SPLIT_SIZE, String.valueOf(128L * 1024 * 1024)); // 128Mb
+ tables.create(SCHEMA, spec, options, tableLocation);
+
+ List<SimpleRecord> expectedRecords = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b")
+ );
+ Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+ originalDf.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(tableLocation);
+
+ Dataset<Row> resultDf = spark.read()
+ .format("iceberg")
+ .option("split-size", String.valueOf(562L)) // 562 bytes is the size of SimpleRecord(1,"a")
+ .load(tableLocation);
+
+ Assert.assertEquals("Spark partitions should match", 2, resultDf.javaRDD().getNumPartitions());
+ }
}