You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/03/06 19:51:16 UTC
[incubator-iceberg] branch master updated: Spark: Add incremental
scan support (#829)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 58a2574 Spark: Add incremental scan support (#829)
58a2574 is described below
commit 58a257460e6fe04cb6eadd15153f88997852c86d
Author: Saisai Shao <je...@tencent.com>
AuthorDate: Sat Mar 7 03:50:37 2020 +0800
Spark: Add incremental scan support (#829)
Adds start-snapshot-id and end-snapshot-id options to configure an incremental scan.
---
.../org/apache/iceberg/spark/source/Reader.java | 24 ++++++
.../spark/source/TestDataSourceOptions.java | 86 ++++++++++++++++++++++
2 files changed, 110 insertions(+)
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index 270172e..51f6979 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -112,6 +112,8 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
private final Table table;
private final Long snapshotId;
+ private final Long startSnapshotId;
+ private final Long endSnapshotId;
private final Long asOfTimestamp;
private final Long splitSize;
private final Integer splitLookback;
@@ -139,6 +141,20 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
"Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot");
}
+ this.startSnapshotId = options.get("start-snapshot-id").map(Long::parseLong).orElse(null);
+ this.endSnapshotId = options.get("end-snapshot-id").map(Long::parseLong).orElse(null);
+ if (snapshotId != null || asOfTimestamp != null) {
+ if (startSnapshotId != null || endSnapshotId != null) {
+ throw new IllegalArgumentException(
+ "Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan when either snapshot-id or " +
+ "as-of-timestamp is specified");
+ }
+ } else {
+ if (startSnapshotId == null && endSnapshotId != null) {
+ throw new IllegalArgumentException("Cannot only specify option end-snapshot-id to do incremental scan");
+ }
+ }
+
// look for split behavior overrides in options
this.splitSize = options.get("split-size").map(Long::parseLong).orElse(null);
this.splitLookback = options.get("lookback").map(Integer::parseInt).orElse(null);
@@ -274,6 +290,14 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
scan = scan.asOfTime(asOfTimestamp);
}
+ if (startSnapshotId != null) {
+ if (endSnapshotId != null) {
+ scan = scan.appendsBetween(startSnapshotId, endSnapshotId);
+ } else {
+ scan = scan.appendsAfter(startSnapshotId);
+ }
+ }
+
if (splitSize != null) {
scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString());
}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 9160604..2d3301f 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
@@ -34,6 +35,7 @@ import org.apache.iceberg.TableProperties;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
@@ -198,4 +200,88 @@ public class TestDataSourceOptions {
Assert.assertEquals("Spark partitions should match", 2, resultDf.javaRDD().getNumPartitions());
}
+
+ @Test
+ public void testIncrementalScanOptions() throws IOException {
+ String tableLocation = temp.newFolder("iceberg-table").toString();
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Map<String, String> options = Maps.newHashMap();
+ Table table = tables.create(SCHEMA, spec, options, tableLocation);
+
+ List<SimpleRecord> expectedRecords = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "c"),
+ new SimpleRecord(4, "d")
+ );
+ for (SimpleRecord record : expectedRecords) {
+ Dataset<Row> originalDf = spark.createDataFrame(Lists.newArrayList(record), SimpleRecord.class);
+ originalDf.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(tableLocation);
+ }
+ List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+
+ // start-snapshot-id and snapshot-id are both configured.
+ AssertHelpers.assertThrows(
+ "Check both start-snapshot-id and snapshot-id are configured",
+ IllegalArgumentException.class,
+ "Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan",
+ () -> {
+ spark.read()
+ .format("iceberg")
+ .option("snapshot-id", snapshotIds.get(3).toString())
+ .option("start-snapshot-id", snapshotIds.get(3).toString())
+ .load(tableLocation);
+ });
+
+ // end-snapshot-id and as-of-timestamp are both configured.
+ AssertHelpers.assertThrows(
+ "Check both start-snapshot-id and snapshot-id are configured",
+ IllegalArgumentException.class,
+ "Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan",
+ () -> {
+ spark.read()
+ .format("iceberg")
+ .option("as-of-timestamp", Long.toString(table.snapshot(snapshotIds.get(3)).timestampMillis()))
+ .option("end-snapshot-id", snapshotIds.get(2).toString())
+ .load(tableLocation);
+ });
+
+ // only end-snapshot-id is configured.
+ AssertHelpers.assertThrows(
+ "Check both start-snapshot-id and snapshot-id are configured",
+ IllegalArgumentException.class,
+ "Cannot only specify option end-snapshot-id to do incremental scan",
+ () -> {
+ spark.read()
+ .format("iceberg")
+ .option("end-snapshot-id", snapshotIds.get(2).toString())
+ .load(tableLocation);
+ });
+
+ // test (1st snapshot, current snapshot] incremental scan.
+ List<SimpleRecord> result = spark.read()
+ .format("iceberg")
+ .option("start-snapshot-id", snapshotIds.get(3).toString())
+ .load(tableLocation)
+ .orderBy("id")
+ .as(Encoders.bean(SimpleRecord.class))
+ .collectAsList();
+ Assert.assertEquals("Records should match", expectedRecords.subList(1, 4), result);
+
+ // test (2nd snapshot, 3rd snapshot] incremental scan.
+ List<SimpleRecord> result1 = spark.read()
+ .format("iceberg")
+ .option("start-snapshot-id", snapshotIds.get(2).toString())
+ .option("end-snapshot-id", snapshotIds.get(1).toString())
+ .load(tableLocation)
+ .orderBy("id")
+ .as(Encoders.bean(SimpleRecord.class))
+ .collectAsList();
+ Assert.assertEquals("Records should match", expectedRecords.subList(2, 3), result1);
+ }
}