You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ja...@apache.org on 2023/02/14 15:45:16 UTC

[iceberg] branch master updated: Flink: use tag or branch to scan data (#5029)

This is an automated email from the ASF dual-hosted git repository.

jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cd3d240fb Flink: use tag or branch to scan data (#5029)
6cd3d240fb is described below

commit 6cd3d240fb0492ffb8164022bfd9b382c33d1c7f
Author: Liwei Li <hi...@gmail.com>
AuthorDate: Tue Feb 14 23:45:07 2023 +0800

    Flink: use tag or branch to scan data (#5029)
---
 .../apache/iceberg/data/GenericAppenderHelper.java |  21 ++-
 .../org/apache/iceberg/flink/FlinkReadConf.java    |  16 +++
 .../org/apache/iceberg/flink/FlinkReadOptions.java |  12 ++
 .../apache/iceberg/flink/source/FlinkSource.java   |  20 +++
 .../iceberg/flink/source/FlinkSplitPlanner.java    |  29 ++++-
 .../apache/iceberg/flink/source/IcebergSource.java |  20 +++
 .../apache/iceberg/flink/source/ScanContext.java   |  89 ++++++++++++-
 .../flink/source/StreamingMonitorFunction.java     |  27 +++-
 .../apache/iceberg/flink/source/TestFlinkScan.java | 144 +++++++++++++++++++++
 .../iceberg/flink/source/TestFlinkSource.java      |   4 +
 .../iceberg/flink/source/TestStreamScanSql.java    |  71 +++++++++-
 .../flink/source/TestStreamingMonitorFunction.java |  36 ++++++
 12 files changed, 478 insertions(+), 11 deletions(-)

diff --git a/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java b/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java
index 96d0a96c72..eb8aefb046 100644
--- a/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java
+++ b/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java
@@ -27,6 +27,7 @@ import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Files;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.io.FileAppender;
@@ -56,10 +57,11 @@ public class GenericAppenderHelper {
     this(table, fileFormat, tmp, null);
   }
 
-  public void appendToTable(DataFile... dataFiles) {
+  public void appendToTable(String branch, DataFile... dataFiles) {
     Preconditions.checkNotNull(table, "table not set");
 
-    AppendFiles append = table.newAppend();
+    AppendFiles append =
+        table.newAppend().toBranch(branch != null ? branch : SnapshotRef.MAIN_BRANCH);
 
     for (DataFile dataFile : dataFiles) {
       append = append.appendFile(dataFile);
@@ -68,8 +70,21 @@ public class GenericAppenderHelper {
     append.commit();
   }
 
+  public void appendToTable(DataFile... dataFiles) {
+    appendToTable(null, dataFiles);
+  }
+
   public void appendToTable(List<Record> records) throws IOException {
-    appendToTable(null, records);
+    appendToTable(null, null, records);
+  }
+
+  public void appendToTable(String branch, List<Record> records) throws IOException {
+    appendToTable(null, branch, records);
+  }
+
+  public void appendToTable(StructLike partition, String branch, List<Record> records)
+      throws IOException {
+    appendToTable(branch, writeFile(partition, records));
   }
 
   public void appendToTable(StructLike partition, List<Record> records) throws IOException {
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
index e2cacc2adf..baef57a8e7 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
@@ -39,6 +39,22 @@ public class FlinkReadConf {
     return confParser.longConf().option(FlinkReadOptions.SNAPSHOT_ID.key()).parseOptional();
   }
 
+  public String tag() {
+    return confParser.stringConf().option(FlinkReadOptions.TAG.key()).parseOptional();
+  }
+
+  public String startTag() {
+    return confParser.stringConf().option(FlinkReadOptions.START_TAG.key()).parseOptional();
+  }
+
+  public String endTag() {
+    return confParser.stringConf().option(FlinkReadOptions.END_TAG.key()).parseOptional();
+  }
+
+  public String branch() {
+    return confParser.stringConf().option(FlinkReadOptions.BRANCH.key()).parseOptional();
+  }
+
   public boolean caseSensitive() {
     return confParser
         .booleanConf()
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
index 54f64dbfa8..d75b2234d7 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
@@ -32,6 +32,18 @@ public class FlinkReadOptions {
   public static final ConfigOption<Long> SNAPSHOT_ID =
       ConfigOptions.key("snapshot-id").longType().defaultValue(null);
 
+  public static final ConfigOption<String> TAG =
+      ConfigOptions.key("tag").stringType().defaultValue(null);
+
+  public static final ConfigOption<String> BRANCH =
+      ConfigOptions.key("branch").stringType().defaultValue(null);
+
+  public static final ConfigOption<String> START_TAG =
+      ConfigOptions.key("start-tag").stringType().defaultValue(null);
+
+  public static final ConfigOption<String> END_TAG =
+      ConfigOptions.key("end-tag").stringType().defaultValue(null);
+
   public static final String CASE_SENSITIVE = "case-sensitive";
   public static final ConfigOption<Boolean> CASE_SENSITIVE_OPTION =
       ConfigOptions.key(PREFIX + CASE_SENSITIVE).booleanType().defaultValue(false);
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
index 35004bad38..fa1656c552 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
@@ -145,6 +145,16 @@ public class FlinkSource {
       return this;
     }
 
+    public Builder branch(String branch) {
+      readOptions.put(FlinkReadOptions.BRANCH.key(), branch);
+      return this;
+    }
+
+    public Builder tag(String tag) {
+      readOptions.put(FlinkReadOptions.TAG.key(), tag);
+      return this;
+    }
+
     public Builder startSnapshotId(Long startSnapshotId) {
       readOptions.put(FlinkReadOptions.START_SNAPSHOT_ID.key(), Long.toString(startSnapshotId));
       return this;
@@ -155,6 +165,16 @@ public class FlinkSource {
       return this;
     }
 
+    public Builder startTag(String startTag) {
+      readOptions.put(FlinkReadOptions.START_TAG.key(), startTag);
+      return this;
+    }
+
+    public Builder endTag(String endTag) {
+      readOptions.put(FlinkReadOptions.END_TAG.key(), endTag);
+      return this;
+    }
+
     public Builder asOfTimestamp(Long asOfTimestamp) {
       readOptions.put(FlinkReadOptions.AS_OF_TIMESTAMP.key(), Long.toString(asOfTimestamp));
       return this;
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
index 3ff349dd8b..38a55e437d 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.hadoop.Util;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.util.Tasks;
 
@@ -86,11 +87,31 @@ public class FlinkSplitPlanner {
       IncrementalAppendScan scan = table.newIncrementalAppendScan();
       scan = refineScanWithBaseConfigs(scan, context, workerPool);
 
+      if (context.startTag() != null) {
+        Preconditions.checkArgument(
+            table.snapshot(context.startTag()) != null,
+            "Cannot find snapshot with tag %s",
+            context.startTag());
+        scan = scan.fromSnapshotExclusive(table.snapshot(context.startTag()).snapshotId());
+      }
+
       if (context.startSnapshotId() != null) {
+        Preconditions.checkArgument(
+            context.startTag() == null, "START_SNAPSHOT_ID and START_TAG cannot both be set");
         scan = scan.fromSnapshotExclusive(context.startSnapshotId());
       }
 
+      if (context.endTag() != null) {
+        Preconditions.checkArgument(
+            table.snapshot(context.endTag()) != null,
+            "Cannot find snapshot with tag %s",
+            context.endTag());
+        scan = scan.toSnapshot(table.snapshot(context.endTag()).snapshotId());
+      }
+
       if (context.endSnapshotId() != null) {
+        Preconditions.checkArgument(
+            context.endTag() == null, "END_SNAPSHOT_ID and END_TAG cannot both be set");
         scan = scan.toSnapshot(context.endSnapshotId());
       }
 
@@ -101,6 +122,10 @@ public class FlinkSplitPlanner {
 
       if (context.snapshotId() != null) {
         scan = scan.useSnapshot(context.snapshotId());
+      } else if (context.tag() != null) {
+        scan = scan.useRef(context.tag());
+      } else if (context.branch() != null) {
+        scan = scan.useRef(context.branch());
       }
 
       if (context.asOfTimestamp() != null) {
@@ -119,7 +144,9 @@ public class FlinkSplitPlanner {
   private static ScanMode checkScanMode(ScanContext context) {
     if (context.isStreaming()
         || context.startSnapshotId() != null
-        || context.endSnapshotId() != null) {
+        || context.endSnapshotId() != null
+        || context.startTag() != null
+        || context.endTag() != null) {
       return ScanMode.INCREMENTAL_APPEND_SCAN;
     } else {
       return ScanMode.BATCH;
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index 4ed74676aa..718460ae8c 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -278,6 +278,26 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
       return this;
     }
 
+    public Builder<T> tag(String tag) {
+      readOptions.put(FlinkReadOptions.TAG.key(), tag);
+      return this;
+    }
+
+    public Builder<T> branch(String branch) {
+      readOptions.put(FlinkReadOptions.BRANCH.key(), branch);
+      return this;
+    }
+
+    public Builder<T> startTag(String startTag) {
+      readOptions.put(FlinkReadOptions.START_TAG.key(), startTag);
+      return this;
+    }
+
+    public Builder<T> endTag(String endTag) {
+      readOptions.put(FlinkReadOptions.END_TAG.key(), endTag);
+      return this;
+    }
+
     public Builder<T> endSnapshotId(Long newEndSnapshotId) {
       if (newEndSnapshotId != null) {
         readOptions.put(FlinkReadOptions.END_SNAPSHOT_ID.key(), Long.toString(newEndSnapshotId));
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index 02c4943fe9..23f33e6d2e 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -42,11 +42,15 @@ public class ScanContext implements Serializable {
   private final boolean caseSensitive;
   private final boolean exposeLocality;
   private final Long snapshotId;
+  private final String branch;
+  private final String tag;
   private final StreamingStartingStrategy startingStrategy;
   private final Long startSnapshotId;
   private final Long startSnapshotTimestamp;
   private final Long endSnapshotId;
   private final Long asOfTimestamp;
+  private final String startTag;
+  private final String endTag;
   private final Long splitSize;
   private final Integer splitLookback;
   private final Long splitOpenFileCost;
@@ -81,14 +85,22 @@ public class ScanContext implements Serializable {
       boolean includeColumnStats,
       boolean exposeLocality,
       Integer planParallelism,
-      int maxPlanningSnapshotCount) {
+      int maxPlanningSnapshotCount,
+      String branch,
+      String tag,
+      String startTag,
+      String endTag) {
     this.caseSensitive = caseSensitive;
     this.snapshotId = snapshotId;
+    this.tag = tag;
+    this.branch = branch;
     this.startingStrategy = startingStrategy;
     this.startSnapshotTimestamp = startSnapshotTimestamp;
     this.startSnapshotId = startSnapshotId;
     this.endSnapshotId = endSnapshotId;
     this.asOfTimestamp = asOfTimestamp;
+    this.startTag = startTag;
+    this.endTag = endTag;
     this.splitSize = splitSize;
     this.splitLookback = splitLookback;
     this.splitOpenFileCost = splitOpenFileCost;
@@ -125,7 +137,24 @@ public class ScanContext implements Serializable {
             startSnapshotId == null,
             "Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: not null");
       }
+
+      Preconditions.checkArgument(
+          branch == null,
+          String.format(
+              "Cannot scan table using ref %s configured for streaming reader yet", branch));
+
+      Preconditions.checkArgument(
+          tag == null,
+          String.format("Cannot scan table using ref %s configured for streaming reader", tag));
     }
+
+    Preconditions.checkArgument(
+        !(startTag != null && startSnapshotId() != null),
+        "START_SNAPSHOT_ID and START_TAG cannot both be set.");
+
+    Preconditions.checkArgument(
+        !(endTag != null && endSnapshotId() != null),
+        "END_SNAPSHOT_ID and END_TAG cannot both be set.");
   }
 
   public boolean caseSensitive() {
@@ -136,6 +165,22 @@ public class ScanContext implements Serializable {
     return snapshotId;
   }
 
+  public String branch() {
+    return branch;
+  }
+
+  public String tag() {
+    return tag;
+  }
+
+  public String startTag() {
+    return startTag;
+  }
+
+  public String endTag() {
+    return endTag;
+  }
+
   public StreamingStartingStrategy streamingStartingStrategy() {
     return startingStrategy;
   }
@@ -212,8 +257,12 @@ public class ScanContext implements Serializable {
     return ScanContext.builder()
         .caseSensitive(caseSensitive)
         .useSnapshotId(null)
+        .useBranch(branch)
+        .useTag(null)
         .startSnapshotId(newStartSnapshotId)
         .endSnapshotId(newEndSnapshotId)
+        .startTag(null)
+        .endTag(null)
         .asOfTimestamp(null)
         .splitSize(splitSize)
         .splitLookback(splitLookback)
@@ -235,8 +284,12 @@ public class ScanContext implements Serializable {
     return ScanContext.builder()
         .caseSensitive(caseSensitive)
         .useSnapshotId(newSnapshotId)
+        .useBranch(branch)
+        .useTag(tag)
         .startSnapshotId(null)
         .endSnapshotId(null)
+        .startTag(null)
+        .endTag(null)
         .asOfTimestamp(null)
         .splitSize(splitSize)
         .splitLookback(splitLookback)
@@ -261,6 +314,10 @@ public class ScanContext implements Serializable {
   public static class Builder {
     private boolean caseSensitive = FlinkReadOptions.CASE_SENSITIVE_OPTION.defaultValue();
     private Long snapshotId = FlinkReadOptions.SNAPSHOT_ID.defaultValue();
+    private String branch = FlinkReadOptions.BRANCH.defaultValue();
+    private String tag = FlinkReadOptions.TAG.defaultValue();
+    private String startTag = FlinkReadOptions.START_TAG.defaultValue();
+    private String endTag = FlinkReadOptions.END_TAG.defaultValue();
     private StreamingStartingStrategy startingStrategy =
         FlinkReadOptions.STARTING_STRATEGY_OPTION.defaultValue();
     private Long startSnapshotTimestamp = FlinkReadOptions.START_SNAPSHOT_TIMESTAMP.defaultValue();
@@ -297,6 +354,16 @@ public class ScanContext implements Serializable {
       return this;
     }
 
+    public Builder useTag(String newTag) {
+      this.tag = newTag;
+      return this;
+    }
+
+    public Builder useBranch(String newBranch) {
+      this.branch = newBranch;
+      return this;
+    }
+
     public Builder startingStrategy(StreamingStartingStrategy newStartingStrategy) {
       this.startingStrategy = newStartingStrategy;
       return this;
@@ -317,6 +384,16 @@ public class ScanContext implements Serializable {
       return this;
     }
 
+    public Builder startTag(String newStartTag) {
+      this.startTag = newStartTag;
+      return this;
+    }
+
+    public Builder endTag(String newEndTag) {
+      this.endTag = newEndTag;
+      return this;
+    }
+
     public Builder asOfTimestamp(Long newAsOfTimestamp) {
       this.asOfTimestamp = newAsOfTimestamp;
       return this;
@@ -392,6 +469,10 @@ public class ScanContext implements Serializable {
       FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, readableConfig);
 
       return this.useSnapshotId(flinkReadConf.snapshotId())
+          .useTag(flinkReadConf.tag())
+          .useBranch(flinkReadConf.branch())
+          .startTag(flinkReadConf.startTag())
+          .endTag(flinkReadConf.endTag())
           .caseSensitive(flinkReadConf.caseSensitive())
           .asOfTimestamp(flinkReadConf.asOfTimestamp())
           .startingStrategy(flinkReadConf.startingStrategy())
@@ -431,7 +512,11 @@ public class ScanContext implements Serializable {
           includeColumnStats,
           exposeLocality,
           planParallelism,
-          maxPlanningSnapshotCount);
+          maxPlanningSnapshotCount,
+          branch,
+          tag,
+          startTag,
+          endTag);
     }
   }
 }
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
index 75791c95bd..c27e29613f 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
@@ -87,6 +87,8 @@ public class StreamingMonitorFunction extends RichSourceFunction<FlinkInputSplit
     Preconditions.checkArgument(
         scanContext.endSnapshotId() == null,
         "Cannot set end-snapshot-id option for streaming reader");
+    Preconditions.checkArgument(
+        scanContext.endTag() == null, "Cannot set end-tag option for streaming reader");
     Preconditions.checkArgument(
         scanContext.maxPlanningSnapshotCount() > 0,
         "The max-planning-snapshot-count must be greater than zero");
@@ -124,17 +126,34 @@ public class StreamingMonitorFunction extends RichSourceFunction<FlinkInputSplit
     if (context.isRestored()) {
       LOG.info("Restoring state for the {}.", getClass().getSimpleName());
       lastSnapshotId = lastSnapshotIdState.get().iterator().next();
-    } else if (scanContext.startSnapshotId() != null) {
+    } else if (scanContext.startTag() != null || scanContext.startSnapshotId() != null) {
+      Preconditions.checkArgument(
+          !(scanContext.startTag() != null && scanContext.startSnapshotId() != null),
+          "START_SNAPSHOT_ID and START_TAG cannot both be set.");
+      Preconditions.checkArgument(
+          scanContext.branch() == null,
+          "Cannot scan table using ref %s configured for streaming reader yet.");
       Preconditions.checkNotNull(
           table.currentSnapshot(), "Don't have any available snapshot in table.");
 
+      long startSnapshotId;
+      if (scanContext.startTag() != null) {
+        Preconditions.checkArgument(
+            table.snapshot(scanContext.startTag()) != null,
+            "Cannot find snapshot with tag %s in table.",
+            scanContext.startTag());
+        startSnapshotId = table.snapshot(scanContext.startTag()).snapshotId();
+      } else {
+        startSnapshotId = scanContext.startSnapshotId();
+      }
+
       long currentSnapshotId = table.currentSnapshot().snapshotId();
       Preconditions.checkState(
-          SnapshotUtil.isAncestorOf(table, currentSnapshotId, scanContext.startSnapshotId()),
+          SnapshotUtil.isAncestorOf(table, currentSnapshotId, startSnapshotId),
           "The option start-snapshot-id %s is not an ancestor of the current snapshot.",
-          scanContext.startSnapshotId());
+          startSnapshotId);
 
-      lastSnapshotId = scanContext.startSnapshotId();
+      lastSnapshotId = startSnapshotId;
     }
   }
 
diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
index 5e4154490f..a6cdc212b7 100644
--- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
+++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.types.Row;
 import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
@@ -229,6 +230,149 @@ public abstract class TestFlinkScan {
         TestFixtures.SCHEMA);
   }
 
+  @Test
+  public void testTagReads() throws Exception {
+    Table table =
+        catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
+
+    GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
+
+    List<Record> expectedRecords1 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
+    helper.appendToTable(expectedRecords1);
+    long snapshotId = table.currentSnapshot().snapshotId();
+
+    table.manageSnapshots().createTag("t1", snapshotId).commit();
+
+    TestHelpers.assertRecords(
+        runWithOptions(ImmutableMap.of("tag", "t1")), expectedRecords1, TestFixtures.SCHEMA);
+
+    List<Record> expectedRecords2 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
+    helper.appendToTable(expectedRecords2);
+    snapshotId = table.currentSnapshot().snapshotId();
+
+    table.manageSnapshots().replaceTag("t1", snapshotId).commit();
+
+    List<Record> expectedRecords = Lists.newArrayList();
+    expectedRecords.addAll(expectedRecords1);
+    expectedRecords.addAll(expectedRecords2);
+    TestHelpers.assertRecords(
+        runWithOptions(ImmutableMap.of("tag", "t1")), expectedRecords, TestFixtures.SCHEMA);
+  }
+
+  @Test
+  public void testBranchReads() throws Exception {
+    Table table =
+        catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
+
+    GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
+
+    List<Record> expectedRecordsBase = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
+    helper.appendToTable(expectedRecordsBase);
+    long snapshotId = table.currentSnapshot().snapshotId();
+
+    String branchName = "b1";
+    table.manageSnapshots().createBranch(branchName, snapshotId).commit();
+
+    List<Record> expectedRecordsForBranch = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
+    helper.appendToTable(branchName, expectedRecordsForBranch);
+
+    List<Record> expectedRecordsForMain = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
+    helper.appendToTable(expectedRecordsForMain);
+
+    List<Record> branchExpectedRecords = Lists.newArrayList();
+    branchExpectedRecords.addAll(expectedRecordsBase);
+    branchExpectedRecords.addAll(expectedRecordsForBranch);
+
+    TestHelpers.assertRecords(
+        runWithOptions(ImmutableMap.of("branch", branchName)),
+        branchExpectedRecords,
+        TestFixtures.SCHEMA);
+
+    List<Record> mainExpectedRecords = Lists.newArrayList();
+    mainExpectedRecords.addAll(expectedRecordsBase);
+    mainExpectedRecords.addAll(expectedRecordsForMain);
+
+    TestHelpers.assertRecords(run(), mainExpectedRecords, TestFixtures.SCHEMA);
+  }
+
+  @Test
+  public void testIncrementalReadViaTag() throws Exception {
+    Table table =
+        catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
+
+    GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
+
+    List<Record> records1 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
+    helper.appendToTable(records1);
+    long snapshotId1 = table.currentSnapshot().snapshotId();
+    String startTag = "t1";
+    table.manageSnapshots().createTag(startTag, snapshotId1).commit();
+
+    List<Record> records2 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 1L);
+    helper.appendToTable(records2);
+
+    List<Record> records3 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 2L);
+    helper.appendToTable(records3);
+    long snapshotId3 = table.currentSnapshot().snapshotId();
+    String endTag = "t2";
+    table.manageSnapshots().createTag(endTag, snapshotId3).commit();
+
+    helper.appendToTable(RandomGenericData.generate(TestFixtures.SCHEMA, 1, 3L));
+
+    List<Record> expected = Lists.newArrayList();
+    expected.addAll(records2);
+    expected.addAll(records3);
+
+    TestHelpers.assertRecords(
+        runWithOptions(
+            ImmutableMap.<String, String>builder()
+                .put("start-tag", startTag)
+                .put("end-tag", endTag)
+                .buildOrThrow()),
+        expected,
+        TestFixtures.SCHEMA);
+
+    TestHelpers.assertRecords(
+        runWithOptions(
+            ImmutableMap.<String, String>builder()
+                .put("start-snapshot-id", Long.toString(snapshotId1))
+                .put("end-tag", endTag)
+                .buildOrThrow()),
+        expected,
+        TestFixtures.SCHEMA);
+
+    TestHelpers.assertRecords(
+        runWithOptions(
+            ImmutableMap.<String, String>builder()
+                .put("start-tag", startTag)
+                .put("end-snapshot-id", Long.toString(snapshotId3))
+                .buildOrThrow()),
+        expected,
+        TestFixtures.SCHEMA);
+
+    AssertHelpers.assertThrows(
+        "START_SNAPSHOT_ID and START_TAG cannot both be set.",
+        Exception.class,
+        () ->
+            runWithOptions(
+                ImmutableMap.<String, String>builder()
+                    .put("start-tag", startTag)
+                    .put("end-tag", endTag)
+                    .put("start-snapshot-id", Long.toString(snapshotId1))
+                    .buildOrThrow()));
+
+    AssertHelpers.assertThrows(
+        "END_SNAPSHOT_ID and END_TAG cannot both be set.",
+        Exception.class,
+        () ->
+            runWithOptions(
+                ImmutableMap.<String, String>builder()
+                    .put("start-tag", startTag)
+                    .put("end-tag", endTag)
+                    .put("end-snapshot-id", Long.toString(snapshotId3))
+                    .buildOrThrow()));
+  }
+
   @Test
   public void testIncrementalRead() throws Exception {
     Table table =
diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java
index 64c84bbf5c..cebce61c08 100644
--- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java
+++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java
@@ -62,6 +62,10 @@ public abstract class TestFlinkSource extends TestFlinkScan {
     FlinkSource.Builder builder = FlinkSource.forRowData();
     Optional.ofNullable(options.get("snapshot-id"))
         .ifPresent(value -> builder.snapshotId(Long.parseLong(value)));
+    Optional.ofNullable(options.get("tag")).ifPresent(value -> builder.tag(value));
+    Optional.ofNullable(options.get("branch")).ifPresent(value -> builder.branch(value));
+    Optional.ofNullable(options.get("start-tag")).ifPresent(value -> builder.startTag(value));
+    Optional.ofNullable(options.get("end-tag")).ifPresent(value -> builder.endTag(value));
     Optional.ofNullable(options.get("start-snapshot-id"))
         .ifPresent(value -> builder.startSnapshotId(Long.parseLong(value)));
     Optional.ofNullable(options.get("end-snapshot-id"))
diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
index 10fa4ecf13..abcce11e36 100644
--- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
+++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
@@ -30,6 +30,7 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
+import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TestHelpers;
@@ -211,6 +212,24 @@ public class TestStreamScanSql extends FlinkCatalogTestBase {
     result.getJobClient().ifPresent(JobClient::cancel);
   }
 
+  @Test
+  public void testConsumeFilesWithBranch() throws Exception {
+    sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
+    Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+    Row row1 = Row.of(1, "aaa", "2021-01-01");
+    Row row2 = Row.of(2, "bbb", "2021-01-01");
+    insertRows(table, row1, row2);
+
+    AssertHelpers.assertThrows(
+        "Cannot scan table using ref for stream yet",
+        IllegalArgumentException.class,
+        "Cannot scan table using ref",
+        () ->
+            exec(
+                "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='b1')*/",
+                TABLE));
+  }
+
   @Test
   public void testConsumeFromStartSnapshotId() throws Exception {
     sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
@@ -234,7 +253,47 @@ public class TestStreamScanSql extends FlinkCatalogTestBase {
                 + "'start-snapshot-id'='%d')*/",
             TABLE, startSnapshotId);
     try (CloseableIterator<Row> iterator = result.collect()) {
-      // The row2 in start snapshot will be excluded.
+      // the start snapshot(row2) is exclusive.
+      assertRows(ImmutableList.of(row3, row4), iterator);
+
+      Row row5 = Row.of(5, "eee", "2021-01-01");
+      Row row6 = Row.of(6, "fff", "2021-01-01");
+      insertRows(table, row5, row6);
+      assertRows(ImmutableList.of(row5, row6), iterator);
+
+      Row row7 = Row.of(7, "ggg", "2021-01-01");
+      insertRows(table, row7);
+      assertRows(ImmutableList.of(row7), iterator);
+    }
+    result.getJobClient().ifPresent(JobClient::cancel);
+  }
+
+  @Test
+  public void testConsumeFromStartTag() throws Exception {
+    sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
+    Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+
+    // Produce two snapshots.
+    Row row1 = Row.of(1, "aaa", "2021-01-01");
+    Row row2 = Row.of(2, "bbb", "2021-01-01");
+    insertRows(table, row1);
+    insertRows(table, row2);
+
+    String tagName = "t1";
+    long startSnapshotId = table.currentSnapshot().snapshotId();
+    table.manageSnapshots().createTag(tagName, startSnapshotId).commit();
+
+    Row row3 = Row.of(3, "ccc", "2021-01-01");
+    Row row4 = Row.of(4, "ddd", "2021-01-01");
+    insertRows(table, row3, row4);
+
+    TableResult result =
+        exec(
+            "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', "
+                + "'start-tag'='%s')*/",
+            TABLE, tagName);
+    try (CloseableIterator<Row> iterator = result.collect()) {
+      // the start snapshot(row2) is exclusive.
       assertRows(ImmutableList.of(row3, row4), iterator);
 
       Row row5 = Row.of(5, "eee", "2021-01-01");
@@ -247,5 +306,15 @@ public class TestStreamScanSql extends FlinkCatalogTestBase {
       assertRows(ImmutableList.of(row7), iterator);
     }
     result.getJobClient().ifPresent(JobClient::cancel);
+
+    AssertHelpers.assertThrows(
+        "START_SNAPSHOT_ID and START_TAG cannot both be set.",
+        IllegalArgumentException.class,
+        "START_SNAPSHOT_ID and START_TAG cannot both be set.",
+        () ->
+            exec(
+                "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-tag'='%s', "
+                    + "'start-snapshot-id'='%d' )*/",
+                TABLE, tagName, startSnapshotId));
   }
 }
diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
index 6f8789c92b..a161645979 100644
--- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
+++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
@@ -161,6 +161,42 @@ public class TestStreamingMonitorFunction extends TableTestBase {
     }
   }
 
+  @Test
+  public void testConsumeFromStartTag() throws Exception {
+    // Commit the first five transactions.
+    generateRecordsAndCommitTxn(5);
+    long startSnapshotId = table.currentSnapshot().snapshotId();
+    String tagName = "t1";
+    table.manageSnapshots().createTag(tagName, startSnapshotId).commit();
+
+    // Commit the next five transactions.
+    List<List<Record>> recordsList = generateRecordsAndCommitTxn(5);
+
+    ScanContext scanContext =
+        ScanContext.builder().monitorInterval(Duration.ofMillis(100)).startTag(tagName).build();
+
+    StreamingMonitorFunction function = createFunction(scanContext);
+    try (AbstractStreamOperatorTestHarness<FlinkInputSplit> harness = createHarness(function)) {
+      harness.setup();
+      harness.open();
+
+      CountDownLatch latch = new CountDownLatch(1);
+      TestSourceContext sourceContext = new TestSourceContext(latch);
+      runSourceFunctionInTask(sourceContext, function);
+
+      Assert.assertTrue(
+          "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
+      Thread.sleep(1000L);
+
+      // Stop the stream task.
+      function.close();
+
+      Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size());
+      TestHelpers.assertRecords(
+          sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA);
+    }
+  }
+
   @Test
   public void testCheckpointRestore() throws Exception {
     List<List<Record>> recordsList = generateRecordsAndCommitTxn(10);