You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/03/06 19:51:16 UTC

[incubator-iceberg] branch master updated: Spark: Add incremental scan support (#829)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 58a2574  Spark: Add incremental scan support (#829)
58a2574 is described below

commit 58a257460e6fe04cb6eadd15153f88997852c86d
Author: Saisai Shao <je...@tencent.com>
AuthorDate: Sat Mar 7 03:50:37 2020 +0800

    Spark: Add incremental scan support (#829)
    
    Adds start-snapshot-id and end-snapshot-id options to configure an incremental scan.
---
 .../org/apache/iceberg/spark/source/Reader.java    | 24 ++++++
 .../spark/source/TestDataSourceOptions.java        | 86 ++++++++++++++++++++++
 2 files changed, 110 insertions(+)

diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index 270172e..51f6979 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -112,6 +112,8 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
 
   private final Table table;
   private final Long snapshotId;
+  private final Long startSnapshotId;
+  private final Long endSnapshotId;
   private final Long asOfTimestamp;
   private final Long splitSize;
   private final Integer splitLookback;
@@ -139,6 +141,20 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
           "Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot");
     }
 
+    this.startSnapshotId = options.get("start-snapshot-id").map(Long::parseLong).orElse(null);
+    this.endSnapshotId = options.get("end-snapshot-id").map(Long::parseLong).orElse(null);
+    if (snapshotId != null || asOfTimestamp != null) {
+      if (startSnapshotId != null || endSnapshotId != null) {
+        throw new IllegalArgumentException(
+            "Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan when either snapshot-id or " +
+                "as-of-timestamp is specified");
+      }
+    } else {
+      if (startSnapshotId == null && endSnapshotId != null) {
+        throw new IllegalArgumentException("Cannot only specify option end-snapshot-id to do incremental scan");
+      }
+    }
+
     // look for split behavior overrides in options
     this.splitSize = options.get("split-size").map(Long::parseLong).orElse(null);
     this.splitLookback = options.get("lookback").map(Integer::parseInt).orElse(null);
@@ -274,6 +290,14 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
         scan = scan.asOfTime(asOfTimestamp);
       }
 
+      if (startSnapshotId != null) {
+        if (endSnapshotId != null) {
+          scan = scan.appendsBetween(startSnapshotId, endSnapshotId);
+        } else {
+          scan = scan.appendsAfter(startSnapshotId);
+        }
+      }
+
       if (splitSize != null) {
         scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString());
       }
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 9160604..2d3301f 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.PartitionSpec;
@@ -34,6 +35,7 @@ import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
@@ -198,4 +200,88 @@ public class TestDataSourceOptions {
 
     Assert.assertEquals("Spark partitions should match", 2, resultDf.javaRDD().getNumPartitions());
   }
+
+  @Test
+  public void testIncrementalScanOptions() throws IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = tables.create(SCHEMA, spec, options, tableLocation);
+
+    List<SimpleRecord> expectedRecords = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c"),
+        new SimpleRecord(4, "d")
+    );
+    for (SimpleRecord record : expectedRecords) {
+      Dataset<Row> originalDf = spark.createDataFrame(Lists.newArrayList(record), SimpleRecord.class);
+      originalDf.select("id", "data").write()
+          .format("iceberg")
+          .mode("append")
+          .save(tableLocation);
+    }
+    List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+
+    // start-snapshot-id and snapshot-id are both configured.
+    AssertHelpers.assertThrows(
+        "Check both start-snapshot-id and snapshot-id are configured",
+        IllegalArgumentException.class,
+        "Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan",
+        () -> {
+          spark.read()
+              .format("iceberg")
+              .option("snapshot-id", snapshotIds.get(3).toString())
+              .option("start-snapshot-id", snapshotIds.get(3).toString())
+              .load(tableLocation);
+        });
+
+    // end-snapshot-id and as-of-timestamp are both configured.
+    AssertHelpers.assertThrows(
+        "Check both start-snapshot-id and snapshot-id are configured",
+        IllegalArgumentException.class,
+        "Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan",
+        () -> {
+          spark.read()
+              .format("iceberg")
+              .option("as-of-timestamp", Long.toString(table.snapshot(snapshotIds.get(3)).timestampMillis()))
+              .option("end-snapshot-id", snapshotIds.get(2).toString())
+              .load(tableLocation);
+        });
+
+    // only end-snapshot-id is configured.
+    AssertHelpers.assertThrows(
+        "Check both start-snapshot-id and snapshot-id are configured",
+        IllegalArgumentException.class,
+        "Cannot only specify option end-snapshot-id to do incremental scan",
+        () -> {
+          spark.read()
+              .format("iceberg")
+              .option("end-snapshot-id", snapshotIds.get(2).toString())
+              .load(tableLocation);
+        });
+
+    // test (1st snapshot, current snapshot] incremental scan.
+    List<SimpleRecord> result = spark.read()
+        .format("iceberg")
+        .option("start-snapshot-id", snapshotIds.get(3).toString())
+        .load(tableLocation)
+        .orderBy("id")
+        .as(Encoders.bean(SimpleRecord.class))
+        .collectAsList();
+    Assert.assertEquals("Records should match", expectedRecords.subList(1, 4), result);
+
+    // test (2nd snapshot, 3rd snapshot] incremental scan.
+    List<SimpleRecord> result1 = spark.read()
+        .format("iceberg")
+        .option("start-snapshot-id", snapshotIds.get(2).toString())
+        .option("end-snapshot-id", snapshotIds.get(1).toString())
+        .load(tableLocation)
+        .orderBy("id")
+        .as(Encoders.bean(SimpleRecord.class))
+        .collectAsList();
+    Assert.assertEquals("Records should match", expectedRecords.subList(2, 3), result1);
+  }
 }