You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/11/09 17:03:37 UTC

[iceberg] branch master updated: Spark: Support reading from a tag or branch (#5150)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 305e320b0f Spark: Support reading from a tag or branch (#5150)
305e320b0f is described below

commit 305e320b0fc8fb7aebc447fc291777c27fb8486b
Author: Namratha Mysore Keshavaprakash <nm...@gmail.com>
AuthorDate: Wed Nov 9 09:03:30 2022 -0800

    Spark: Support reading from a tag or branch (#5150)
---
 .../org/apache/iceberg/spark/SparkReadConf.java    |   8 ++
 .../org/apache/iceberg/spark/SparkReadOptions.java |   6 +
 .../iceberg/spark/source/SparkBatchQueryScan.java  |  21 ++-
 .../iceberg/spark/source/SparkScanBuilder.java     |  32 ++++-
 .../spark/source/TestSnapshotSelection.java        | 144 +++++++++++++++++++++
 5 files changed, 203 insertions(+), 8 deletions(-)

diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
index ef262e11f0..85c71827d7 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
@@ -94,6 +94,14 @@ public class SparkReadConf {
     return confParser.longConf().option(SparkReadOptions.END_SNAPSHOT_ID).parseOptional();
   }
 
+  public String branch() {
+    return confParser.stringConf().option(SparkReadOptions.BRANCH).parseOptional();
+  }
+
+  public String tag() {
+    return confParser.stringConf().option(SparkReadOptions.TAG).parseOptional();
+  }
+
   public String fileScanTaskSetId() {
     return confParser.stringConf().option(SparkReadOptions.FILE_SCAN_TASK_SET_ID).parseOptional();
   }
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
index 9515a48bc2..96e09d70ef 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
@@ -35,6 +35,12 @@ public class SparkReadOptions {
   // A timestamp in milliseconds; the snapshot used will be the snapshot current at this time.
   public static final String AS_OF_TIMESTAMP = "as-of-timestamp";
 
+  // Branch to read from
+  public static final String BRANCH = "branch";
+
+  // Tag to read from
+  public static final String TAG = "tag";
+
   // Overrides the table's read.split.target-size and read.split.metadata-target-size
   public static final String SPLIT_SIZE = "split-size";
 
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
index 13e7bb82fa..fb83636b4c 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
@@ -68,6 +68,8 @@ class SparkBatchQueryScan extends SparkScan implements SupportsRuntimeFiltering
   private final Long startSnapshotId;
   private final Long endSnapshotId;
   private final Long asOfTimestamp;
+  private final String branch;
+  private final String tag;
   private final List<Expression> runtimeFilterExpressions;
 
   private Set<Integer> specIds = null; // lazy cache of scanned spec IDs
@@ -89,6 +91,8 @@ class SparkBatchQueryScan extends SparkScan implements SupportsRuntimeFiltering
     this.startSnapshotId = readConf.startSnapshotId();
     this.endSnapshotId = readConf.endSnapshotId();
     this.asOfTimestamp = readConf.asOfTimestamp();
+    this.branch = readConf.branch();
+    this.tag = readConf.tag();
     this.runtimeFilterExpressions = Lists.newArrayList();
 
     if (scan == null) {
@@ -247,6 +251,14 @@ class SparkBatchQueryScan extends SparkScan implements SupportsRuntimeFiltering
       Snapshot snapshot = table().snapshot(snapshotIdAsOfTime);
       return estimateStatistics(snapshot);
 
+    } else if (branch != null) {
+      Snapshot snapshot = table().snapshot(branch);
+      return estimateStatistics(snapshot);
+
+    } else if (tag != null) {
+      Snapshot snapshot = table().snapshot(tag);
+      return estimateStatistics(snapshot);
+
     } else {
       Snapshot snapshot = table().currentSnapshot();
       return estimateStatistics(snapshot);
@@ -254,6 +266,7 @@ class SparkBatchQueryScan extends SparkScan implements SupportsRuntimeFiltering
   }
 
   @Override
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
   public boolean equals(Object o) {
     if (this == o) {
       return true;
@@ -272,7 +285,9 @@ class SparkBatchQueryScan extends SparkScan implements SupportsRuntimeFiltering
         && Objects.equals(snapshotId, that.snapshotId)
         && Objects.equals(startSnapshotId, that.startSnapshotId)
         && Objects.equals(endSnapshotId, that.endSnapshotId)
-        && Objects.equals(asOfTimestamp, that.asOfTimestamp);
+        && Objects.equals(asOfTimestamp, that.asOfTimestamp)
+        && Objects.equals(branch, that.branch)
+        && Objects.equals(tag, that.tag);
   }
 
   @Override
@@ -285,7 +300,9 @@ class SparkBatchQueryScan extends SparkScan implements SupportsRuntimeFiltering
         snapshotId,
         startSnapshotId,
         endSnapshotId,
-        asOfTimestamp);
+        asOfTimestamp,
+        branch,
+        tag);
   }
 
   @Override
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index b291a8e267..150da814ba 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -183,6 +183,8 @@ public class SparkScanBuilder
   public Scan build() {
     Long snapshotId = readConf.snapshotId();
     Long asOfTimestamp = readConf.asOfTimestamp();
+    String branch = readConf.branch();
+    String tag = readConf.tag();
 
     Preconditions.checkArgument(
         snapshotId == null || asOfTimestamp == null,
@@ -226,6 +228,14 @@ public class SparkScanBuilder
       scan = scan.asOfTime(asOfTimestamp);
     }
 
+    if (branch != null) {
+      scan = scan.useRef(branch);
+    }
+
+    if (tag != null) {
+      scan = scan.useRef(tag);
+    }
+
     if (startSnapshotId != null) {
       if (endSnapshotId != null) {
         scan = scan.appendsBetween(startSnapshotId, endSnapshotId);
@@ -241,10 +251,15 @@ public class SparkScanBuilder
 
   public Scan buildChangelogScan() {
     Preconditions.checkArgument(
-        readConf.snapshotId() == null && readConf.asOfTimestamp() == null,
-        "Cannot set neither %s nor %s for changelogs",
+        readConf.snapshotId() == null
+            && readConf.asOfTimestamp() == null
+            && readConf.branch() == null
+            && readConf.tag() == null,
+        "Cannot set neither %s, %s, %s and %s for changelogs",
         SparkReadOptions.SNAPSHOT_ID,
-        SparkReadOptions.AS_OF_TIMESTAMP);
+        SparkReadOptions.AS_OF_TIMESTAMP,
+        SparkReadOptions.BRANCH,
+        SparkReadOptions.TAG);
 
     Long startSnapshotId = readConf.startSnapshotId();
     Long endSnapshotId = readConf.endSnapshotId();
@@ -273,10 +288,15 @@ public class SparkScanBuilder
 
   public Scan buildMergeOnReadScan() {
     Preconditions.checkArgument(
-        readConf.snapshotId() == null && readConf.asOfTimestamp() == null,
-        "Cannot set time travel options %s and %s for row-level command scans",
+        readConf.snapshotId() == null
+            && readConf.asOfTimestamp() == null
+            && readConf.branch() == null
+            && readConf.tag() == null,
+        "Cannot set time travel options %s, %s, %s and %s for row-level command scans",
         SparkReadOptions.SNAPSHOT_ID,
-        SparkReadOptions.AS_OF_TIMESTAMP);
+        SparkReadOptions.AS_OF_TIMESTAMP,
+        SparkReadOptions.BRANCH,
+        SparkReadOptions.TAG);
 
     Preconditions.checkArgument(
         readConf.startSnapshotId() == null && readConf.endSnapshotId() == null,
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
index 9661cfe20b..0b7348fa07 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
@@ -226,4 +226,148 @@ public class TestSnapshotSelection {
         .hasMessageContaining("Cannot specify both snapshot-id")
         .hasMessageContaining("and as-of-timestamp");
   }
+
+  @Test
+  public void testSnapshotSelectionByTag() throws IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Table table = tables.create(SCHEMA, spec, tableLocation);
+
+    // produce the first snapshot
+    List<SimpleRecord> firstBatchRecords =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
+    Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class);
+    firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation);
+
+    table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit();
+
+    // produce the second snapshot
+    List<SimpleRecord> secondBatchRecords =
+        Lists.newArrayList(
+            new SimpleRecord(4, "d"), new SimpleRecord(5, "e"), new SimpleRecord(6, "f"));
+    Dataset<Row> secondDf = spark.createDataFrame(secondBatchRecords, SimpleRecord.class);
+    secondDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation);
+
+    // verify records in the current snapshot by tag
+    Dataset<Row> currentSnapshotResult =
+        spark.read().format("iceberg").option("tag", "tag").load(tableLocation);
+    List<SimpleRecord> currentSnapshotRecords =
+        currentSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+    List<SimpleRecord> expectedRecords = Lists.newArrayList();
+    expectedRecords.addAll(firstBatchRecords);
+    Assert.assertEquals(
+        "Current snapshot rows should match", expectedRecords, currentSnapshotRecords);
+  }
+
+  @Test
+  public void testSnapshotSelectionByBranch() throws IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Table table = tables.create(SCHEMA, spec, tableLocation);
+
+    // produce the first snapshot
+    List<SimpleRecord> firstBatchRecords =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
+    Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class);
+    firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation);
+
+    table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit();
+
+    // produce the second snapshot
+    List<SimpleRecord> secondBatchRecords =
+        Lists.newArrayList(
+            new SimpleRecord(4, "d"), new SimpleRecord(5, "e"), new SimpleRecord(6, "f"));
+    Dataset<Row> secondDf = spark.createDataFrame(secondBatchRecords, SimpleRecord.class);
+    secondDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation);
+
+    // verify records in the current snapshot by branch
+    Dataset<Row> currentSnapshotResult =
+        spark.read().format("iceberg").option("branch", "branch").load(tableLocation);
+    List<SimpleRecord> currentSnapshotRecords =
+        currentSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+    List<SimpleRecord> expectedRecords = Lists.newArrayList();
+    expectedRecords.addAll(firstBatchRecords);
+    Assert.assertEquals(
+        "Current snapshot rows should match", expectedRecords, currentSnapshotRecords);
+  }
+
+  @Test
+  public void testSnapshotSelectionByBranchAndTagFails() throws IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Table table = tables.create(SCHEMA, spec, tableLocation);
+
+    // produce the first snapshot
+    List<SimpleRecord> firstBatchRecords =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
+    Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class);
+    firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation);
+
+    table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit();
+    table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit();
+
+    Assertions.assertThatThrownBy(
+            () ->
+                spark
+                    .read()
+                    .format("iceberg")
+                    .option(SparkReadOptions.TAG, "tag")
+                    .option(SparkReadOptions.BRANCH, "branch")
+                    .load(tableLocation)
+                    .show())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageStartingWith("Cannot override ref, already set snapshot id=");
+  }
+
+  @Test
+  public void testSnapshotSelectionByTimestampAndBranchOrTagFails() throws IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Table table = tables.create(SCHEMA, spec, tableLocation);
+
+    List<SimpleRecord> firstBatchRecords =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
+    Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class);
+    firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation);
+
+    long timestamp = System.currentTimeMillis();
+    table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit();
+    table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit();
+
+    Assertions.assertThatThrownBy(
+            () ->
+                spark
+                    .read()
+                    .format("iceberg")
+                    .option(SparkReadOptions.AS_OF_TIMESTAMP, timestamp)
+                    .option(SparkReadOptions.BRANCH, "branch")
+                    .load(tableLocation)
+                    .show())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageStartingWith("Cannot override ref, already set snapshot id=");
+
+    Assertions.assertThatThrownBy(
+            () ->
+                spark
+                    .read()
+                    .format("iceberg")
+                    .option(SparkReadOptions.AS_OF_TIMESTAMP, timestamp)
+                    .option(SparkReadOptions.TAG, "tag")
+                    .load(tableLocation)
+                    .show())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageStartingWith("Cannot override ref, already set snapshot id=");
+  }
 }