You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/11/09 17:03:37 UTC
[iceberg] branch master updated: Spark: Support reading from a tag or branch (#5150)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 305e320b0f Spark: Support reading from a tag or branch (#5150)
305e320b0f is described below
commit 305e320b0fc8fb7aebc447fc291777c27fb8486b
Author: Namratha Mysore Keshavaprakash <nm...@gmail.com>
AuthorDate: Wed Nov 9 09:03:30 2022 -0800
Spark: Support reading from a tag or branch (#5150)
---
.../org/apache/iceberg/spark/SparkReadConf.java | 8 ++
.../org/apache/iceberg/spark/SparkReadOptions.java | 6 +
.../iceberg/spark/source/SparkBatchQueryScan.java | 21 ++-
.../iceberg/spark/source/SparkScanBuilder.java | 32 ++++-
.../spark/source/TestSnapshotSelection.java | 144 +++++++++++++++++++++
5 files changed, 203 insertions(+), 8 deletions(-)
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
index ef262e11f0..85c71827d7 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
@@ -94,6 +94,14 @@ public class SparkReadConf {
return confParser.longConf().option(SparkReadOptions.END_SNAPSHOT_ID).parseOptional();
}
+ public String branch() {
+ return confParser.stringConf().option(SparkReadOptions.BRANCH).parseOptional();
+ }
+
+ public String tag() {
+ return confParser.stringConf().option(SparkReadOptions.TAG).parseOptional();
+ }
+
public String fileScanTaskSetId() {
return confParser.stringConf().option(SparkReadOptions.FILE_SCAN_TASK_SET_ID).parseOptional();
}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
index 9515a48bc2..96e09d70ef 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
@@ -35,6 +35,12 @@ public class SparkReadOptions {
// A timestamp in milliseconds; the snapshot used will be the snapshot current at this time.
public static final String AS_OF_TIMESTAMP = "as-of-timestamp";
+ // Branch to read from
+ public static final String BRANCH = "branch";
+
+ // Tag to read from
+ public static final String TAG = "tag";
+
// Overrides the table's read.split.target-size and read.split.metadata-target-size
public static final String SPLIT_SIZE = "split-size";
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
index 13e7bb82fa..fb83636b4c 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
@@ -68,6 +68,8 @@ class SparkBatchQueryScan extends SparkScan implements SupportsRuntimeFiltering
private final Long startSnapshotId;
private final Long endSnapshotId;
private final Long asOfTimestamp;
+ private final String branch;
+ private final String tag;
private final List<Expression> runtimeFilterExpressions;
private Set<Integer> specIds = null; // lazy cache of scanned spec IDs
@@ -89,6 +91,8 @@ class SparkBatchQueryScan extends SparkScan implements SupportsRuntimeFiltering
this.startSnapshotId = readConf.startSnapshotId();
this.endSnapshotId = readConf.endSnapshotId();
this.asOfTimestamp = readConf.asOfTimestamp();
+ this.branch = readConf.branch();
+ this.tag = readConf.tag();
this.runtimeFilterExpressions = Lists.newArrayList();
if (scan == null) {
@@ -247,6 +251,14 @@ class SparkBatchQueryScan extends SparkScan implements SupportsRuntimeFiltering
Snapshot snapshot = table().snapshot(snapshotIdAsOfTime);
return estimateStatistics(snapshot);
+ } else if (branch != null) {
+ Snapshot snapshot = table().snapshot(branch);
+ return estimateStatistics(snapshot);
+
+ } else if (tag != null) {
+ Snapshot snapshot = table().snapshot(tag);
+ return estimateStatistics(snapshot);
+
} else {
Snapshot snapshot = table().currentSnapshot();
return estimateStatistics(snapshot);
@@ -254,6 +266,7 @@ class SparkBatchQueryScan extends SparkScan implements SupportsRuntimeFiltering
}
@Override
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
public boolean equals(Object o) {
if (this == o) {
return true;
@@ -272,7 +285,9 @@ class SparkBatchQueryScan extends SparkScan implements SupportsRuntimeFiltering
&& Objects.equals(snapshotId, that.snapshotId)
&& Objects.equals(startSnapshotId, that.startSnapshotId)
&& Objects.equals(endSnapshotId, that.endSnapshotId)
- && Objects.equals(asOfTimestamp, that.asOfTimestamp);
+ && Objects.equals(asOfTimestamp, that.asOfTimestamp)
+ && Objects.equals(branch, that.branch)
+ && Objects.equals(tag, that.tag);
}
@Override
@@ -285,7 +300,9 @@ class SparkBatchQueryScan extends SparkScan implements SupportsRuntimeFiltering
snapshotId,
startSnapshotId,
endSnapshotId,
- asOfTimestamp);
+ asOfTimestamp,
+ branch,
+ tag);
}
@Override
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index b291a8e267..150da814ba 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -183,6 +183,8 @@ public class SparkScanBuilder
public Scan build() {
Long snapshotId = readConf.snapshotId();
Long asOfTimestamp = readConf.asOfTimestamp();
+ String branch = readConf.branch();
+ String tag = readConf.tag();
Preconditions.checkArgument(
snapshotId == null || asOfTimestamp == null,
@@ -226,6 +228,14 @@ public class SparkScanBuilder
scan = scan.asOfTime(asOfTimestamp);
}
+ if (branch != null) {
+ scan = scan.useRef(branch);
+ }
+
+ if (tag != null) {
+ scan = scan.useRef(tag);
+ }
+
if (startSnapshotId != null) {
if (endSnapshotId != null) {
scan = scan.appendsBetween(startSnapshotId, endSnapshotId);
@@ -241,10 +251,15 @@ public class SparkScanBuilder
public Scan buildChangelogScan() {
Preconditions.checkArgument(
- readConf.snapshotId() == null && readConf.asOfTimestamp() == null,
- "Cannot set neither %s nor %s for changelogs",
+ readConf.snapshotId() == null
+ && readConf.asOfTimestamp() == null
+ && readConf.branch() == null
+ && readConf.tag() == null,
+ "Cannot set neither %s, %s, %s and %s for changelogs",
SparkReadOptions.SNAPSHOT_ID,
- SparkReadOptions.AS_OF_TIMESTAMP);
+ SparkReadOptions.AS_OF_TIMESTAMP,
+ SparkReadOptions.BRANCH,
+ SparkReadOptions.TAG);
Long startSnapshotId = readConf.startSnapshotId();
Long endSnapshotId = readConf.endSnapshotId();
@@ -273,10 +288,15 @@ public class SparkScanBuilder
public Scan buildMergeOnReadScan() {
Preconditions.checkArgument(
- readConf.snapshotId() == null && readConf.asOfTimestamp() == null,
- "Cannot set time travel options %s and %s for row-level command scans",
+ readConf.snapshotId() == null
+ && readConf.asOfTimestamp() == null
+ && readConf.branch() == null
+ && readConf.tag() == null,
+ "Cannot set time travel options %s, %s, %s and %s for row-level command scans",
SparkReadOptions.SNAPSHOT_ID,
- SparkReadOptions.AS_OF_TIMESTAMP);
+ SparkReadOptions.AS_OF_TIMESTAMP,
+ SparkReadOptions.BRANCH,
+ SparkReadOptions.TAG);
Preconditions.checkArgument(
readConf.startSnapshotId() == null && readConf.endSnapshotId() == null,
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
index 9661cfe20b..0b7348fa07 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
@@ -226,4 +226,148 @@ public class TestSnapshotSelection {
.hasMessageContaining("Cannot specify both snapshot-id")
.hasMessageContaining("and as-of-timestamp");
}
+
+ @Test
+ public void testSnapshotSelectionByTag() throws IOException {
+ String tableLocation = temp.newFolder("iceberg-table").toString();
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Table table = tables.create(SCHEMA, spec, tableLocation);
+
+ // produce the first snapshot
+ List<SimpleRecord> firstBatchRecords =
+ Lists.newArrayList(
+ new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
+ Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class);
+ firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation);
+
+ table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit();
+
+ // produce the second snapshot
+ List<SimpleRecord> secondBatchRecords =
+ Lists.newArrayList(
+ new SimpleRecord(4, "d"), new SimpleRecord(5, "e"), new SimpleRecord(6, "f"));
+ Dataset<Row> secondDf = spark.createDataFrame(secondBatchRecords, SimpleRecord.class);
+ secondDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation);
+
+ // verify records in the current snapshot by tag
+ Dataset<Row> currentSnapshotResult =
+ spark.read().format("iceberg").option("tag", "tag").load(tableLocation);
+ List<SimpleRecord> currentSnapshotRecords =
+ currentSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ List<SimpleRecord> expectedRecords = Lists.newArrayList();
+ expectedRecords.addAll(firstBatchRecords);
+ Assert.assertEquals(
+ "Current snapshot rows should match", expectedRecords, currentSnapshotRecords);
+ }
+
+ @Test
+ public void testSnapshotSelectionByBranch() throws IOException {
+ String tableLocation = temp.newFolder("iceberg-table").toString();
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Table table = tables.create(SCHEMA, spec, tableLocation);
+
+ // produce the first snapshot
+ List<SimpleRecord> firstBatchRecords =
+ Lists.newArrayList(
+ new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
+ Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class);
+ firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation);
+
+ table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit();
+
+ // produce the second snapshot
+ List<SimpleRecord> secondBatchRecords =
+ Lists.newArrayList(
+ new SimpleRecord(4, "d"), new SimpleRecord(5, "e"), new SimpleRecord(6, "f"));
+ Dataset<Row> secondDf = spark.createDataFrame(secondBatchRecords, SimpleRecord.class);
+ secondDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation);
+
+ // verify records in the current snapshot by branch
+ Dataset<Row> currentSnapshotResult =
+ spark.read().format("iceberg").option("branch", "branch").load(tableLocation);
+ List<SimpleRecord> currentSnapshotRecords =
+ currentSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ List<SimpleRecord> expectedRecords = Lists.newArrayList();
+ expectedRecords.addAll(firstBatchRecords);
+ Assert.assertEquals(
+ "Current snapshot rows should match", expectedRecords, currentSnapshotRecords);
+ }
+
+ @Test
+ public void testSnapshotSelectionByBranchAndTagFails() throws IOException {
+ String tableLocation = temp.newFolder("iceberg-table").toString();
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Table table = tables.create(SCHEMA, spec, tableLocation);
+
+ // produce the first snapshot
+ List<SimpleRecord> firstBatchRecords =
+ Lists.newArrayList(
+ new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
+ Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class);
+ firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation);
+
+ table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit();
+ table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit();
+
+ Assertions.assertThatThrownBy(
+ () ->
+ spark
+ .read()
+ .format("iceberg")
+ .option(SparkReadOptions.TAG, "tag")
+ .option(SparkReadOptions.BRANCH, "branch")
+ .load(tableLocation)
+ .show())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageStartingWith("Cannot override ref, already set snapshot id=");
+ }
+
+ @Test
+ public void testSnapshotSelectionByTimestampAndBranchOrTagFails() throws IOException {
+ String tableLocation = temp.newFolder("iceberg-table").toString();
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Table table = tables.create(SCHEMA, spec, tableLocation);
+
+ List<SimpleRecord> firstBatchRecords =
+ Lists.newArrayList(
+ new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
+ Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class);
+ firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation);
+
+ long timestamp = System.currentTimeMillis();
+ table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit();
+ table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit();
+
+ Assertions.assertThatThrownBy(
+ () ->
+ spark
+ .read()
+ .format("iceberg")
+ .option(SparkReadOptions.AS_OF_TIMESTAMP, timestamp)
+ .option(SparkReadOptions.BRANCH, "branch")
+ .load(tableLocation)
+ .show())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageStartingWith("Cannot override ref, already set snapshot id=");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ spark
+ .read()
+ .format("iceberg")
+ .option(SparkReadOptions.AS_OF_TIMESTAMP, timestamp)
+ .option(SparkReadOptions.TAG, "tag")
+ .load(tableLocation)
+ .show())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageStartingWith("Cannot override ref, already set snapshot id=");
+ }
}