You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/08/14 00:18:57 UTC

[incubator-iceberg] branch master updated: Allow read split behavior overrides (#372)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b0b9c2  Allow read split behavior overrides (#372)
1b0b9c2 is described below

commit 1b0b9c27f90d710bbc95b07c41c99b0eaf36f045
Author: Xabriel J. Collazo Mojica <xc...@adobe.com>
AuthorDate: Tue Aug 13 17:18:53 2019 -0700

    Allow read split behavior overrides (#372)
---
 .../main/java/org/apache/iceberg/TableScan.java    | 10 ++++
 .../java/org/apache/iceberg/BaseTableScan.java     | 61 ++++++++++++++++------
 .../java/org/apache/iceberg/DataFilesTable.java    | 11 ++--
 .../java/org/apache/iceberg/DataTableScan.java     | 10 ++--
 .../org/apache/iceberg/ManifestEntriesTable.java   | 11 ++--
 .../java/org/apache/iceberg/StaticTableScan.java   | 10 ++--
 .../java/org/apache/iceberg/TestSplitPlanning.java | 40 ++++++++++++++
 site/docs/configuration.md                         | 13 +++--
 .../org/apache/iceberg/spark/source/Reader.java    | 22 ++++++++
 .../spark/source/TestDataSourceOptions.java        | 28 ++++++++++
 10 files changed, 180 insertions(+), 36 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/TableScan.java b/api/src/main/java/org/apache/iceberg/TableScan.java
index 8259fd1..5bbcf13 100644
--- a/api/src/main/java/org/apache/iceberg/TableScan.java
+++ b/api/src/main/java/org/apache/iceberg/TableScan.java
@@ -59,6 +59,16 @@ public interface TableScan {
   TableScan asOfTime(long timestampMillis);
 
   /**
+   * Create a new {@link TableScan} from this scan's configuration that will override the {@link Table}'s behavior based
+   * on the incoming pair. Unknown properties will be ignored.
+   *
+   * @param property name of the table property to be overridden
+   * @param value value to override with
+   * @return a new scan based on this with overridden behavior
+   */
+  TableScan option(String property, String value);
+
+  /**
    * Create a new {@link TableScan} from this with the schema as its projection.
    *
    * @param schema a projection schema
diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
index b0c52d7..84d852c 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
@@ -22,6 +22,7 @@ package org.apache.iceberg;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
 import java.time.Instant;
 import java.time.LocalDateTime;
@@ -59,14 +60,15 @@ abstract class BaseTableScan implements TableScan {
   private final boolean caseSensitive;
   private final boolean colStats;
   private final Collection<String> selectedColumns;
+  private final ImmutableMap<String, String> options;
 
   protected BaseTableScan(TableOperations ops, Table table, Schema schema) {
-    this(ops, table, null, schema, Expressions.alwaysTrue(), true, false, null);
+    this(ops, table, null, schema, Expressions.alwaysTrue(), true, false, null, ImmutableMap.of());
   }
 
   protected BaseTableScan(TableOperations ops, Table table, Long snapshotId, Schema schema,
                         Expression rowFilter, boolean caseSensitive, boolean colStats,
-                        Collection<String> selectedColumns) {
+                        Collection<String> selectedColumns, ImmutableMap<String, String> options) {
     this.ops = ops;
     this.table = table;
     this.snapshotId = snapshotId;
@@ -75,6 +77,7 @@ abstract class BaseTableScan implements TableScan {
     this.caseSensitive = caseSensitive;
     this.colStats = colStats;
     this.selectedColumns = selectedColumns;
+    this.options = options != null ? options : ImmutableMap.of();
   }
 
   @SuppressWarnings("checkstyle:HiddenField")
@@ -83,7 +86,8 @@ abstract class BaseTableScan implements TableScan {
   @SuppressWarnings("checkstyle:HiddenField")
   protected abstract TableScan newRefinedScan(
       TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-      boolean caseSensitive, boolean colStats, Collection<String> selectedColumns);
+      boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+      ImmutableMap<String, String> options);
 
   @SuppressWarnings("checkstyle:HiddenField")
   protected abstract CloseableIterable<FileScanTask> planFiles(
@@ -100,7 +104,8 @@ abstract class BaseTableScan implements TableScan {
         "Cannot override snapshot, already set to id=%s", scanSnapshotId);
     Preconditions.checkArgument(ops.current().snapshot(scanSnapshotId) != null,
         "Cannot find snapshot with ID %s", scanSnapshotId);
-    return newRefinedScan(ops, table, scanSnapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+    return newRefinedScan(
+        ops, table, scanSnapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
   }
 
   @Override
@@ -124,30 +129,41 @@ abstract class BaseTableScan implements TableScan {
   }
 
   @Override
+  public TableScan option(String property, String value) {
+    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+    builder.putAll(options);
+    builder.put(property, value);
+
+    return newRefinedScan(
+        ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, builder.build());
+  }
+
+  @Override
   public TableScan project(Schema projectedSchema) {
     return newRefinedScan(
-        ops, table, snapshotId, projectedSchema, rowFilter, caseSensitive, colStats, selectedColumns);
+        ops, table, snapshotId, projectedSchema, rowFilter, caseSensitive, colStats, selectedColumns, options);
   }
 
   @Override
   public TableScan caseSensitive(boolean scanCaseSensitive) {
-    return newRefinedScan(ops, table, snapshotId, schema, rowFilter, scanCaseSensitive, colStats, selectedColumns);
+    return newRefinedScan(
+        ops, table, snapshotId, schema, rowFilter, scanCaseSensitive, colStats, selectedColumns, options);
   }
 
   @Override
   public TableScan includeColumnStats() {
-    return newRefinedScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, true, selectedColumns);
+    return newRefinedScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, true, selectedColumns, options);
   }
 
   @Override
   public TableScan select(Collection<String> columns) {
-    return newRefinedScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, columns);
+    return newRefinedScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, columns, options);
   }
 
   @Override
   public TableScan filter(Expression expr) {
-    return newRefinedScan(
-        ops, table, snapshotId, schema, Expressions.and(rowFilter, expr), caseSensitive, colStats, selectedColumns);
+    return newRefinedScan(ops, table, snapshotId, schema, Expressions.and(rowFilter, expr), caseSensitive, colStats,
+        selectedColumns, options);
   }
 
   @Override
@@ -176,11 +192,26 @@ abstract class BaseTableScan implements TableScan {
 
   @Override
   public CloseableIterable<CombinedScanTask> planTasks() {
-    long splitSize = targetSplitSize(ops);
-    int lookback = ops.current().propertyAsInt(
-        TableProperties.SPLIT_LOOKBACK, TableProperties.SPLIT_LOOKBACK_DEFAULT);
-    long openFileCost = ops.current().propertyAsLong(
-        TableProperties.SPLIT_OPEN_FILE_COST, TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+    long splitSize;
+    if (options.containsKey(TableProperties.SPLIT_SIZE)) {
+      splitSize = Long.parseLong(options.get(TableProperties.SPLIT_SIZE));
+    } else {
+      splitSize = targetSplitSize(ops);
+    }
+    int lookback;
+    if (options.containsKey(TableProperties.SPLIT_LOOKBACK)) {
+      lookback = Integer.parseInt(options.get(TableProperties.SPLIT_LOOKBACK));
+    } else {
+      lookback = ops.current().propertyAsInt(
+          TableProperties.SPLIT_LOOKBACK, TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    }
+    long openFileCost;
+    if (options.containsKey(TableProperties.SPLIT_OPEN_FILE_COST)) {
+      openFileCost = Long.parseLong(options.get(TableProperties.SPLIT_OPEN_FILE_COST));
+    } else {
+      openFileCost = ops.current().propertyAsLong(
+          TableProperties.SPLIT_OPEN_FILE_COST, TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+    }
 
     Function<FileScanTask, Long> weightFunc = file -> Math.max(file.length(), openFileCost);
 
diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
index 6958992..21d8da6 100644
--- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
 import java.util.Collection;
 import org.apache.iceberg.avro.Avro;
@@ -83,17 +84,19 @@ class DataFilesTable extends BaseMetadataTable {
 
     private FilesTableScan(
         TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-        boolean caseSensitive, boolean colStats, Collection<String> selectedColumns, Schema fileSchema) {
-      super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+        boolean caseSensitive, boolean colStats, Collection<String> selectedColumns, Schema fileSchema,
+        ImmutableMap<String, String> options) {
+      super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
       this.fileSchema = fileSchema;
     }
 
     @Override
     protected TableScan newRefinedScan(
         TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-        boolean caseSensitive, boolean colStats, Collection<String> selectedColumns) {
+        boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+        ImmutableMap<String, String> options) {
       return new FilesTableScan(
-          ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, fileSchema);
+          ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, fileSchema, options);
     }
 
     @Override
diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java b/core/src/main/java/org/apache/iceberg/DataTableScan.java
index 4e9e82f..bbb50d3 100644
--- a/core/src/main/java/org/apache/iceberg/DataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java
@@ -22,6 +22,7 @@ package org.apache.iceberg;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import java.util.Collection;
 import java.util.List;
@@ -54,16 +55,17 @@ public class DataTableScan extends BaseTableScan {
 
   protected DataTableScan(TableOperations ops, Table table, Long snapshotId, Schema schema,
                           Expression rowFilter, boolean caseSensitive, boolean colStats,
-                          Collection<String> selectedColumns) {
-    super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+                          Collection<String> selectedColumns, ImmutableMap<String, String> options) {
+    super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
   }
 
   @Override
   protected TableScan newRefinedScan(
       TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-      boolean caseSensitive, boolean colStats, Collection<String> selectedColumns) {
+      boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+      ImmutableMap<String, String> options) {
     return new DataTableScan(
-        ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+        ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
   }
 
   public CloseableIterable<FileScanTask> planFiles(TableOperations ops, Snapshot snapshot,
diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
index f3fe85e..9f91c12 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
@@ -19,6 +19,7 @@
 
 package org.apache.iceberg;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
 import java.util.Collection;
 import org.apache.iceberg.avro.Avro;
@@ -82,16 +83,18 @@ class ManifestEntriesTable extends BaseMetadataTable {
 
     private EntriesTableScan(
         TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-        boolean caseSensitive, boolean colStats, Collection<String> selectedColumns) {
-      super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+        boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+        ImmutableMap<String, String> options) {
+      super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
     }
 
     @Override
     protected TableScan newRefinedScan(
         TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-        boolean caseSensitive, boolean colStats, Collection<String> selectedColumns) {
+        boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+        ImmutableMap<String, String> options) {
       return new EntriesTableScan(
-          ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+          ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
     }
 
     @Override
diff --git a/core/src/main/java/org/apache/iceberg/StaticTableScan.java b/core/src/main/java/org/apache/iceberg/StaticTableScan.java
index c29c14b..2f1f148 100644
--- a/core/src/main/java/org/apache/iceberg/StaticTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/StaticTableScan.java
@@ -19,6 +19,7 @@
 
 package org.apache.iceberg;
 
+import com.google.common.collect.ImmutableMap;
 import java.util.Collection;
 import java.util.function.Function;
 import org.apache.iceberg.expressions.Expression;
@@ -37,8 +38,8 @@ class StaticTableScan extends BaseTableScan {
   private StaticTableScan(
       TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
       boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
-      Function<StaticTableScan, DataTask> buildTask) {
-    super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+      Function<StaticTableScan, DataTask> buildTask, ImmutableMap<String, String> options) {
+    super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
     this.buildTask = buildTask;
   }
 
@@ -50,9 +51,10 @@ class StaticTableScan extends BaseTableScan {
   @Override
   protected TableScan newRefinedScan(
       TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-      boolean caseSensitive, boolean colStats, Collection<String> selectedColumns) {
+      boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+      ImmutableMap<String, String> options) {
     return new StaticTableScan(
-        ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, buildTask);
+        ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, buildTask, options);
   }
 
   @Override
diff --git a/core/src/test/java/org/apache/iceberg/TestSplitPlanning.java b/core/src/test/java/org/apache/iceberg/TestSplitPlanning.java
index c8f0721..0066afd 100644
--- a/core/src/test/java/org/apache/iceberg/TestSplitPlanning.java
+++ b/core/src/test/java/org/apache/iceberg/TestSplitPlanning.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.UUID;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.types.Types;
 import org.junit.Assert;
 import org.junit.Before;
@@ -101,6 +102,45 @@ public class TestSplitPlanning {
     Assert.assertEquals(1, Iterables.size(table.newScan().planTasks()));
   }
 
+  @Test
+  public void testSplitPlanningWithOverridenSize() {
+    List<DataFile> files128Mb = newFiles(4, 128 * 1024 * 1024);
+    appendFiles(files128Mb);
+    // we expect 2 bins since we are overriding split size in scan with 256MB
+    TableScan scan = table.newScan()
+        .option(TableProperties.SPLIT_SIZE, String.valueOf(256L * 1024 * 1024));
+    Assert.assertEquals(2, Iterables.size(scan.planTasks()));
+  }
+
+  @Test
+  public void testSplitPlanningWithOverridenLookback() {
+    List<DataFile> files120Mb = newFiles(1, 120 * 1024 * 1024);
+    List<DataFile> file128Mb = newFiles(1, 128 * 1024 * 1024);
+    Iterable<DataFile> files = Iterables.concat(files120Mb, file128Mb);
+    appendFiles(files);
+    // we expect 2 bins from non-overriden table properties
+    TableScan scan = table.newScan()
+        .option(TableProperties.SPLIT_LOOKBACK, "1");
+    CloseableIterable<CombinedScanTask> tasks = scan.planTasks();
+    Assert.assertEquals(2, Iterables.size(tasks));
+
+    // since lookback was overridden to 1, we expect the first bin to be the largest of the two.
+    CombinedScanTask combinedScanTask = tasks.iterator().next();
+    FileScanTask task = combinedScanTask.files().iterator().next();
+    Assert.assertEquals(128 * 1024 * 1024, task.length());
+  }
+
+  @Test
+  public void testSplitPlanningWithOverridenOpenCostSize() {
+    List<DataFile> files16Mb = newFiles(16, 16 * 1024 * 1024);
+    appendFiles(files16Mb);
+    // we expect 4 bins since we are overriding open file cost in scan with a cost of 32MB
+    // we can fit at most 128Mb/32Mb = 4 files per bin
+    TableScan scan = table.newScan()
+        .option(TableProperties.SPLIT_OPEN_FILE_COST, String.valueOf(32L * 1024 * 1024));
+    Assert.assertEquals(4, Iterables.size(scan.planTasks()));
+  }
+
   private void appendFiles(Iterable<DataFile> files) {
     AppendFiles appendFiles = table.newAppend();
     files.forEach(appendFiles::appendFile);
diff --git a/site/docs/configuration.md b/site/docs/configuration.md
index 3b54405..3e939fe 100644
--- a/site/docs/configuration.md
+++ b/site/docs/configuration.md
@@ -10,7 +10,7 @@ Iceberg tables support table properties to configure table behavior, like the de
 | ---------------------------- | ------------------ | ------------------------------------------------------ |
 | read.split.target-size       | 134217728 (128 MB) | Target size when combining input splits                |
 | read.split.planning-lookback | 10                 | Number of bins to consider when combining input splits |
-
+| read.split.open-file-cost    | 4194304 (4 MB)     | The estimated cost to open a file, used as a minimum weight when combining splits. |
 
 ### Write properties
 
@@ -52,10 +52,13 @@ spark.read
     .load("db.table")
 ```
 
-| Spark option    | Default  | Description                                                                               |
-| --------------- | -------- | ----------------------------------------------------------------------------------------- |
-| snapshot-id     | (latest) | Snapshot ID of the table snapshot to read                                                 |
-| as-of-timestamp | (latest) | A timestamp in milliseconds; the snapshot used will be the snapshot current at this time. |
+| Spark option    | Default               | Description                                                                               |
+| --------------- | --------------------- | ----------------------------------------------------------------------------------------- |
+| snapshot-id     | (latest)              | Snapshot ID of the table snapshot to read                                                 |
+| as-of-timestamp | (latest)              | A timestamp in milliseconds; the snapshot used will be the snapshot current at this time. |
+| split-size      | As per table property | Overrides this table's read.split.target-size                                             |
+| lookback        | As per table property | Overrides this table's read.split.planning-lookback                                       |
+| file-open-cost  | As per table property | Overrides this table's read.split.open-file-cost                                          |
 
 ### Write options
 
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index 91359ff..43c966a 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -43,6 +43,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.common.DynMethods;
@@ -99,6 +100,9 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
   private final Table table;
   private final Long snapshotId;
   private final Long asOfTimestamp;
+  private final Long splitSize;
+  private final Integer splitLookback;
+  private final Long splitOpenFileCost;
   private final FileIO fileIo;
   private final EncryptionManager encryptionManager;
   private final boolean caseSensitive;
@@ -119,6 +123,12 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
       throw new IllegalArgumentException(
           "Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot");
     }
+
+    // look for split behavior overrides in options
+    this.splitSize = options.get("split-size").map(Long::parseLong).orElse(null);
+    this.splitLookback = options.get("lookback").map(Integer::parseInt).orElse(null);
+    this.splitOpenFileCost = options.get("file-open-cost").map(Long::parseLong).orElse(null);
+
     this.schema = table.schema();
     this.fileIo = table.io();
     this.encryptionManager = table.encryption();
@@ -233,6 +243,18 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
         scan = scan.asOfTime(asOfTimestamp);
       }
 
+      if (splitSize != null) {
+        scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString());
+      }
+
+      if (splitLookback != null) {
+        scan = scan.option(TableProperties.SPLIT_LOOKBACK, splitLookback.toString());
+      }
+
+      if (splitOpenFileCost != null) {
+        scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, splitOpenFileCost.toString());
+      }
+
       if (filterExpressions != null) {
         for (Expression filter : filterExpressions) {
           scan = scan.filter(filter);
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index cbb5351..6d71e04 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -170,4 +170,32 @@ public class TestDataSourceOptions {
       sparkHadoopConf.set("fs.default.name", originalDefaultFS);
     }
   }
+
+  @Test
+  public void testSplitOptionsOverridesTableProperties() throws IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    options.put(TableProperties.SPLIT_SIZE, String.valueOf(128L * 1024 * 1024)); // 128Mb
+    tables.create(SCHEMA, spec, options, tableLocation);
+
+    List<SimpleRecord> expectedRecords = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b")
+    );
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .save(tableLocation);
+
+    Dataset<Row> resultDf = spark.read()
+        .format("iceberg")
+        .option("split-size", String.valueOf(562L)) // 562 bytes is the size of SimpleRecord(1,"a")
+        .load(tableLocation);
+
+    Assert.assertEquals("Spark partitions should match", 2, resultDf.javaRDD().getNumPartitions());
+  }
 }