You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ja...@apache.org on 2023/02/14 15:45:16 UTC
[iceberg] branch master updated: Flink: use tag or branch to scan data (#5029)
This is an automated email from the ASF dual-hosted git repository.
jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 6cd3d240fb Flink: use tag or branch to scan data (#5029)
6cd3d240fb is described below
commit 6cd3d240fb0492ffb8164022bfd9b382c33d1c7f
Author: Liwei Li <hi...@gmail.com>
AuthorDate: Tue Feb 14 23:45:07 2023 +0800
Flink: use tag or branch to scan data (#5029)
---
.../apache/iceberg/data/GenericAppenderHelper.java | 21 ++-
.../org/apache/iceberg/flink/FlinkReadConf.java | 16 +++
.../org/apache/iceberg/flink/FlinkReadOptions.java | 12 ++
.../apache/iceberg/flink/source/FlinkSource.java | 20 +++
.../iceberg/flink/source/FlinkSplitPlanner.java | 29 ++++-
.../apache/iceberg/flink/source/IcebergSource.java | 20 +++
.../apache/iceberg/flink/source/ScanContext.java | 89 ++++++++++++-
.../flink/source/StreamingMonitorFunction.java | 27 +++-
.../apache/iceberg/flink/source/TestFlinkScan.java | 144 +++++++++++++++++++++
.../iceberg/flink/source/TestFlinkSource.java | 4 +
.../iceberg/flink/source/TestStreamScanSql.java | 71 +++++++++-
.../flink/source/TestStreamingMonitorFunction.java | 36 ++++++
12 files changed, 478 insertions(+), 11 deletions(-)
diff --git a/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java b/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java
index 96d0a96c72..eb8aefb046 100644
--- a/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java
+++ b/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java
@@ -27,6 +27,7 @@ import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.FileAppender;
@@ -56,10 +57,11 @@ public class GenericAppenderHelper {
this(table, fileFormat, tmp, null);
}
- public void appendToTable(DataFile... dataFiles) {
+ public void appendToTable(String branch, DataFile... dataFiles) {
Preconditions.checkNotNull(table, "table not set");
- AppendFiles append = table.newAppend();
+ AppendFiles append =
+ table.newAppend().toBranch(branch != null ? branch : SnapshotRef.MAIN_BRANCH);
for (DataFile dataFile : dataFiles) {
append = append.appendFile(dataFile);
@@ -68,8 +70,21 @@ public class GenericAppenderHelper {
append.commit();
}
+ public void appendToTable(DataFile... dataFiles) {
+ appendToTable(null, dataFiles);
+ }
+
public void appendToTable(List<Record> records) throws IOException {
- appendToTable(null, records);
+ appendToTable(null, null, records);
+ }
+
+ public void appendToTable(String branch, List<Record> records) throws IOException {
+ appendToTable(null, branch, records);
+ }
+
+ public void appendToTable(StructLike partition, String branch, List<Record> records)
+ throws IOException {
+ appendToTable(branch, writeFile(partition, records));
}
public void appendToTable(StructLike partition, List<Record> records) throws IOException {
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
index e2cacc2adf..baef57a8e7 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
@@ -39,6 +39,22 @@ public class FlinkReadConf {
return confParser.longConf().option(FlinkReadOptions.SNAPSHOT_ID.key()).parseOptional();
}
+ public String tag() {
+ return confParser.stringConf().option(FlinkReadOptions.TAG.key()).parseOptional();
+ }
+
+ public String startTag() {
+ return confParser.stringConf().option(FlinkReadOptions.START_TAG.key()).parseOptional();
+ }
+
+ public String endTag() {
+ return confParser.stringConf().option(FlinkReadOptions.END_TAG.key()).parseOptional();
+ }
+
+ public String branch() {
+ return confParser.stringConf().option(FlinkReadOptions.BRANCH.key()).parseOptional();
+ }
+
public boolean caseSensitive() {
return confParser
.booleanConf()
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
index 54f64dbfa8..d75b2234d7 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
@@ -32,6 +32,18 @@ public class FlinkReadOptions {
public static final ConfigOption<Long> SNAPSHOT_ID =
ConfigOptions.key("snapshot-id").longType().defaultValue(null);
+ public static final ConfigOption<String> TAG =
+ ConfigOptions.key("tag").stringType().defaultValue(null);
+
+ public static final ConfigOption<String> BRANCH =
+ ConfigOptions.key("branch").stringType().defaultValue(null);
+
+ public static final ConfigOption<String> START_TAG =
+ ConfigOptions.key("start-tag").stringType().defaultValue(null);
+
+ public static final ConfigOption<String> END_TAG =
+ ConfigOptions.key("end-tag").stringType().defaultValue(null);
+
public static final String CASE_SENSITIVE = "case-sensitive";
public static final ConfigOption<Boolean> CASE_SENSITIVE_OPTION =
ConfigOptions.key(PREFIX + CASE_SENSITIVE).booleanType().defaultValue(false);
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
index 35004bad38..fa1656c552 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
@@ -145,6 +145,16 @@ public class FlinkSource {
return this;
}
+ public Builder branch(String branch) {
+ readOptions.put(FlinkReadOptions.BRANCH.key(), branch);
+ return this;
+ }
+
+ public Builder tag(String tag) {
+ readOptions.put(FlinkReadOptions.TAG.key(), tag);
+ return this;
+ }
+
public Builder startSnapshotId(Long startSnapshotId) {
readOptions.put(FlinkReadOptions.START_SNAPSHOT_ID.key(), Long.toString(startSnapshotId));
return this;
@@ -155,6 +165,16 @@ public class FlinkSource {
return this;
}
+ public Builder startTag(String startTag) {
+ readOptions.put(FlinkReadOptions.START_TAG.key(), startTag);
+ return this;
+ }
+
+ public Builder endTag(String endTag) {
+ readOptions.put(FlinkReadOptions.END_TAG.key(), endTag);
+ return this;
+ }
+
public Builder asOfTimestamp(Long asOfTimestamp) {
readOptions.put(FlinkReadOptions.AS_OF_TIMESTAMP.key(), Long.toString(asOfTimestamp));
return this;
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
index 3ff349dd8b..38a55e437d 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.Tasks;
@@ -86,11 +87,31 @@ public class FlinkSplitPlanner {
IncrementalAppendScan scan = table.newIncrementalAppendScan();
scan = refineScanWithBaseConfigs(scan, context, workerPool);
+ if (context.startTag() != null) {
+ Preconditions.checkArgument(
+ table.snapshot(context.startTag()) != null,
+ "Cannot find snapshot with tag %s",
+ context.startTag());
+ scan = scan.fromSnapshotExclusive(table.snapshot(context.startTag()).snapshotId());
+ }
+
if (context.startSnapshotId() != null) {
+ Preconditions.checkArgument(
+ context.startTag() == null, "START_SNAPSHOT_ID and START_TAG cannot both be set");
scan = scan.fromSnapshotExclusive(context.startSnapshotId());
}
+ if (context.endTag() != null) {
+ Preconditions.checkArgument(
+ table.snapshot(context.endTag()) != null,
+ "Cannot find snapshot with tag %s",
+ context.endTag());
+ scan = scan.toSnapshot(table.snapshot(context.endTag()).snapshotId());
+ }
+
if (context.endSnapshotId() != null) {
+ Preconditions.checkArgument(
+ context.endTag() == null, "END_SNAPSHOT_ID and END_TAG cannot both be set");
scan = scan.toSnapshot(context.endSnapshotId());
}
@@ -101,6 +122,10 @@ public class FlinkSplitPlanner {
if (context.snapshotId() != null) {
scan = scan.useSnapshot(context.snapshotId());
+ } else if (context.tag() != null) {
+ scan = scan.useRef(context.tag());
+ } else if (context.branch() != null) {
+ scan = scan.useRef(context.branch());
}
if (context.asOfTimestamp() != null) {
@@ -119,7 +144,9 @@ public class FlinkSplitPlanner {
private static ScanMode checkScanMode(ScanContext context) {
if (context.isStreaming()
|| context.startSnapshotId() != null
- || context.endSnapshotId() != null) {
+ || context.endSnapshotId() != null
+ || context.startTag() != null
+ || context.endTag() != null) {
return ScanMode.INCREMENTAL_APPEND_SCAN;
} else {
return ScanMode.BATCH;
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index 4ed74676aa..718460ae8c 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -278,6 +278,26 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
return this;
}
+ public Builder<T> tag(String tag) {
+ readOptions.put(FlinkReadOptions.TAG.key(), tag);
+ return this;
+ }
+
+ public Builder<T> branch(String branch) {
+ readOptions.put(FlinkReadOptions.BRANCH.key(), branch);
+ return this;
+ }
+
+ public Builder<T> startTag(String startTag) {
+ readOptions.put(FlinkReadOptions.START_TAG.key(), startTag);
+ return this;
+ }
+
+ public Builder<T> endTag(String endTag) {
+ readOptions.put(FlinkReadOptions.END_TAG.key(), endTag);
+ return this;
+ }
+
public Builder<T> endSnapshotId(Long newEndSnapshotId) {
if (newEndSnapshotId != null) {
readOptions.put(FlinkReadOptions.END_SNAPSHOT_ID.key(), Long.toString(newEndSnapshotId));
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index 02c4943fe9..23f33e6d2e 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -42,11 +42,15 @@ public class ScanContext implements Serializable {
private final boolean caseSensitive;
private final boolean exposeLocality;
private final Long snapshotId;
+ private final String branch;
+ private final String tag;
private final StreamingStartingStrategy startingStrategy;
private final Long startSnapshotId;
private final Long startSnapshotTimestamp;
private final Long endSnapshotId;
private final Long asOfTimestamp;
+ private final String startTag;
+ private final String endTag;
private final Long splitSize;
private final Integer splitLookback;
private final Long splitOpenFileCost;
@@ -81,14 +85,22 @@ public class ScanContext implements Serializable {
boolean includeColumnStats,
boolean exposeLocality,
Integer planParallelism,
- int maxPlanningSnapshotCount) {
+ int maxPlanningSnapshotCount,
+ String branch,
+ String tag,
+ String startTag,
+ String endTag) {
this.caseSensitive = caseSensitive;
this.snapshotId = snapshotId;
+ this.tag = tag;
+ this.branch = branch;
this.startingStrategy = startingStrategy;
this.startSnapshotTimestamp = startSnapshotTimestamp;
this.startSnapshotId = startSnapshotId;
this.endSnapshotId = endSnapshotId;
this.asOfTimestamp = asOfTimestamp;
+ this.startTag = startTag;
+ this.endTag = endTag;
this.splitSize = splitSize;
this.splitLookback = splitLookback;
this.splitOpenFileCost = splitOpenFileCost;
@@ -125,7 +137,24 @@ public class ScanContext implements Serializable {
startSnapshotId == null,
"Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: not null");
}
+
+ Preconditions.checkArgument(
+ branch == null,
+ String.format(
+ "Cannot scan table using ref %s configured for streaming reader yet", branch));
+
+ Preconditions.checkArgument(
+ tag == null,
+ String.format("Cannot scan table using ref %s configured for streaming reader", tag));
}
+
+ Preconditions.checkArgument(
+ !(startTag != null && startSnapshotId() != null),
+ "START_SNAPSHOT_ID and START_TAG cannot both be set.");
+
+ Preconditions.checkArgument(
+ !(endTag != null && endSnapshotId() != null),
+ "END_SNAPSHOT_ID and END_TAG cannot both be set.");
}
public boolean caseSensitive() {
@@ -136,6 +165,22 @@ public class ScanContext implements Serializable {
return snapshotId;
}
+ public String branch() {
+ return branch;
+ }
+
+ public String tag() {
+ return tag;
+ }
+
+ public String startTag() {
+ return startTag;
+ }
+
+ public String endTag() {
+ return endTag;
+ }
+
public StreamingStartingStrategy streamingStartingStrategy() {
return startingStrategy;
}
@@ -212,8 +257,12 @@ public class ScanContext implements Serializable {
return ScanContext.builder()
.caseSensitive(caseSensitive)
.useSnapshotId(null)
+ .useBranch(branch)
+ .useTag(null)
.startSnapshotId(newStartSnapshotId)
.endSnapshotId(newEndSnapshotId)
+ .startTag(null)
+ .endTag(null)
.asOfTimestamp(null)
.splitSize(splitSize)
.splitLookback(splitLookback)
@@ -235,8 +284,12 @@ public class ScanContext implements Serializable {
return ScanContext.builder()
.caseSensitive(caseSensitive)
.useSnapshotId(newSnapshotId)
+ .useBranch(branch)
+ .useTag(tag)
.startSnapshotId(null)
.endSnapshotId(null)
+ .startTag(null)
+ .endTag(null)
.asOfTimestamp(null)
.splitSize(splitSize)
.splitLookback(splitLookback)
@@ -261,6 +314,10 @@ public class ScanContext implements Serializable {
public static class Builder {
private boolean caseSensitive = FlinkReadOptions.CASE_SENSITIVE_OPTION.defaultValue();
private Long snapshotId = FlinkReadOptions.SNAPSHOT_ID.defaultValue();
+ private String branch = FlinkReadOptions.BRANCH.defaultValue();
+ private String tag = FlinkReadOptions.TAG.defaultValue();
+ private String startTag = FlinkReadOptions.START_TAG.defaultValue();
+ private String endTag = FlinkReadOptions.END_TAG.defaultValue();
private StreamingStartingStrategy startingStrategy =
FlinkReadOptions.STARTING_STRATEGY_OPTION.defaultValue();
private Long startSnapshotTimestamp = FlinkReadOptions.START_SNAPSHOT_TIMESTAMP.defaultValue();
@@ -297,6 +354,16 @@ public class ScanContext implements Serializable {
return this;
}
+ public Builder useTag(String newTag) {
+ this.tag = newTag;
+ return this;
+ }
+
+ public Builder useBranch(String newBranch) {
+ this.branch = newBranch;
+ return this;
+ }
+
public Builder startingStrategy(StreamingStartingStrategy newStartingStrategy) {
this.startingStrategy = newStartingStrategy;
return this;
@@ -317,6 +384,16 @@ public class ScanContext implements Serializable {
return this;
}
+ public Builder startTag(String newStartTag) {
+ this.startTag = newStartTag;
+ return this;
+ }
+
+ public Builder endTag(String newEndTag) {
+ this.endTag = newEndTag;
+ return this;
+ }
+
public Builder asOfTimestamp(Long newAsOfTimestamp) {
this.asOfTimestamp = newAsOfTimestamp;
return this;
@@ -392,6 +469,10 @@ public class ScanContext implements Serializable {
FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, readableConfig);
return this.useSnapshotId(flinkReadConf.snapshotId())
+ .useTag(flinkReadConf.tag())
+ .useBranch(flinkReadConf.branch())
+ .startTag(flinkReadConf.startTag())
+ .endTag(flinkReadConf.endTag())
.caseSensitive(flinkReadConf.caseSensitive())
.asOfTimestamp(flinkReadConf.asOfTimestamp())
.startingStrategy(flinkReadConf.startingStrategy())
@@ -431,7 +512,11 @@ public class ScanContext implements Serializable {
includeColumnStats,
exposeLocality,
planParallelism,
- maxPlanningSnapshotCount);
+ maxPlanningSnapshotCount,
+ branch,
+ tag,
+ startTag,
+ endTag);
}
}
}
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
index 75791c95bd..c27e29613f 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
@@ -87,6 +87,8 @@ public class StreamingMonitorFunction extends RichSourceFunction<FlinkInputSplit
Preconditions.checkArgument(
scanContext.endSnapshotId() == null,
"Cannot set end-snapshot-id option for streaming reader");
+ Preconditions.checkArgument(
+ scanContext.endTag() == null, "Cannot set end-tag option for streaming reader");
Preconditions.checkArgument(
scanContext.maxPlanningSnapshotCount() > 0,
"The max-planning-snapshot-count must be greater than zero");
@@ -124,17 +126,34 @@ public class StreamingMonitorFunction extends RichSourceFunction<FlinkInputSplit
if (context.isRestored()) {
LOG.info("Restoring state for the {}.", getClass().getSimpleName());
lastSnapshotId = lastSnapshotIdState.get().iterator().next();
- } else if (scanContext.startSnapshotId() != null) {
+ } else if (scanContext.startTag() != null || scanContext.startSnapshotId() != null) {
+ Preconditions.checkArgument(
+ !(scanContext.startTag() != null && scanContext.startSnapshotId() != null),
+ "START_SNAPSHOT_ID and START_TAG cannot both be set.");
+ Preconditions.checkArgument(
+ scanContext.branch() == null,
+ "Cannot scan table using ref %s configured for streaming reader yet.");
Preconditions.checkNotNull(
table.currentSnapshot(), "Don't have any available snapshot in table.");
+ long startSnapshotId;
+ if (scanContext.startTag() != null) {
+ Preconditions.checkArgument(
+ table.snapshot(scanContext.startTag()) != null,
+ "Cannot find snapshot with tag %s in table.",
+ scanContext.startTag());
+ startSnapshotId = table.snapshot(scanContext.startTag()).snapshotId();
+ } else {
+ startSnapshotId = scanContext.startSnapshotId();
+ }
+
long currentSnapshotId = table.currentSnapshot().snapshotId();
Preconditions.checkState(
- SnapshotUtil.isAncestorOf(table, currentSnapshotId, scanContext.startSnapshotId()),
+ SnapshotUtil.isAncestorOf(table, currentSnapshotId, startSnapshotId),
"The option start-snapshot-id %s is not an ancestor of the current snapshot.",
- scanContext.startSnapshotId());
+ startSnapshotId);
- lastSnapshotId = scanContext.startSnapshotId();
+ lastSnapshotId = startSnapshotId;
}
}
diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
index 5e4154490f..a6cdc212b7 100644
--- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
+++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
@@ -28,6 +28,7 @@ import java.util.Map;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
@@ -229,6 +230,149 @@ public abstract class TestFlinkScan {
TestFixtures.SCHEMA);
}
+ @Test
+ public void testTagReads() throws Exception {
+ Table table =
+ catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
+
+ GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
+
+ List<Record> expectedRecords1 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
+ helper.appendToTable(expectedRecords1);
+ long snapshotId = table.currentSnapshot().snapshotId();
+
+ table.manageSnapshots().createTag("t1", snapshotId).commit();
+
+ TestHelpers.assertRecords(
+ runWithOptions(ImmutableMap.of("tag", "t1")), expectedRecords1, TestFixtures.SCHEMA);
+
+ List<Record> expectedRecords2 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
+ helper.appendToTable(expectedRecords2);
+ snapshotId = table.currentSnapshot().snapshotId();
+
+ table.manageSnapshots().replaceTag("t1", snapshotId).commit();
+
+ List<Record> expectedRecords = Lists.newArrayList();
+ expectedRecords.addAll(expectedRecords1);
+ expectedRecords.addAll(expectedRecords2);
+ TestHelpers.assertRecords(
+ runWithOptions(ImmutableMap.of("tag", "t1")), expectedRecords, TestFixtures.SCHEMA);
+ }
+
+ @Test
+ public void testBranchReads() throws Exception {
+ Table table =
+ catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
+
+ GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
+
+ List<Record> expectedRecordsBase = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
+ helper.appendToTable(expectedRecordsBase);
+ long snapshotId = table.currentSnapshot().snapshotId();
+
+ String branchName = "b1";
+ table.manageSnapshots().createBranch(branchName, snapshotId).commit();
+
+ List<Record> expectedRecordsForBranch = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
+ helper.appendToTable(branchName, expectedRecordsForBranch);
+
+ List<Record> expectedRecordsForMain = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
+ helper.appendToTable(expectedRecordsForMain);
+
+ List<Record> branchExpectedRecords = Lists.newArrayList();
+ branchExpectedRecords.addAll(expectedRecordsBase);
+ branchExpectedRecords.addAll(expectedRecordsForBranch);
+
+ TestHelpers.assertRecords(
+ runWithOptions(ImmutableMap.of("branch", branchName)),
+ branchExpectedRecords,
+ TestFixtures.SCHEMA);
+
+ List<Record> mainExpectedRecords = Lists.newArrayList();
+ mainExpectedRecords.addAll(expectedRecordsBase);
+ mainExpectedRecords.addAll(expectedRecordsForMain);
+
+ TestHelpers.assertRecords(run(), mainExpectedRecords, TestFixtures.SCHEMA);
+ }
+
+ @Test
+ public void testIncrementalReadViaTag() throws Exception {
+ Table table =
+ catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
+
+ GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
+
+ List<Record> records1 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
+ helper.appendToTable(records1);
+ long snapshotId1 = table.currentSnapshot().snapshotId();
+ String startTag = "t1";
+ table.manageSnapshots().createTag(startTag, snapshotId1).commit();
+
+ List<Record> records2 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 1L);
+ helper.appendToTable(records2);
+
+ List<Record> records3 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 2L);
+ helper.appendToTable(records3);
+ long snapshotId3 = table.currentSnapshot().snapshotId();
+ String endTag = "t2";
+ table.manageSnapshots().createTag(endTag, snapshotId3).commit();
+
+ helper.appendToTable(RandomGenericData.generate(TestFixtures.SCHEMA, 1, 3L));
+
+ List<Record> expected = Lists.newArrayList();
+ expected.addAll(records2);
+ expected.addAll(records3);
+
+ TestHelpers.assertRecords(
+ runWithOptions(
+ ImmutableMap.<String, String>builder()
+ .put("start-tag", startTag)
+ .put("end-tag", endTag)
+ .buildOrThrow()),
+ expected,
+ TestFixtures.SCHEMA);
+
+ TestHelpers.assertRecords(
+ runWithOptions(
+ ImmutableMap.<String, String>builder()
+ .put("start-snapshot-id", Long.toString(snapshotId1))
+ .put("end-tag", endTag)
+ .buildOrThrow()),
+ expected,
+ TestFixtures.SCHEMA);
+
+ TestHelpers.assertRecords(
+ runWithOptions(
+ ImmutableMap.<String, String>builder()
+ .put("start-tag", startTag)
+ .put("end-snapshot-id", Long.toString(snapshotId3))
+ .buildOrThrow()),
+ expected,
+ TestFixtures.SCHEMA);
+
+ AssertHelpers.assertThrows(
+ "START_SNAPSHOT_ID and START_TAG cannot both be set.",
+ Exception.class,
+ () ->
+ runWithOptions(
+ ImmutableMap.<String, String>builder()
+ .put("start-tag", startTag)
+ .put("end-tag", endTag)
+ .put("start-snapshot-id", Long.toString(snapshotId1))
+ .buildOrThrow()));
+
+ AssertHelpers.assertThrows(
+ "END_SNAPSHOT_ID and END_TAG cannot both be set.",
+ Exception.class,
+ () ->
+ runWithOptions(
+ ImmutableMap.<String, String>builder()
+ .put("start-tag", startTag)
+ .put("end-tag", endTag)
+ .put("end-snapshot-id", Long.toString(snapshotId3))
+ .buildOrThrow()));
+ }
+
@Test
public void testIncrementalRead() throws Exception {
Table table =
diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java
index 64c84bbf5c..cebce61c08 100644
--- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java
+++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java
@@ -62,6 +62,10 @@ public abstract class TestFlinkSource extends TestFlinkScan {
FlinkSource.Builder builder = FlinkSource.forRowData();
Optional.ofNullable(options.get("snapshot-id"))
.ifPresent(value -> builder.snapshotId(Long.parseLong(value)));
+ Optional.ofNullable(options.get("tag")).ifPresent(value -> builder.tag(value));
+ Optional.ofNullable(options.get("branch")).ifPresent(value -> builder.branch(value));
+ Optional.ofNullable(options.get("start-tag")).ifPresent(value -> builder.startTag(value));
+ Optional.ofNullable(options.get("end-tag")).ifPresent(value -> builder.endTag(value));
Optional.ofNullable(options.get("start-snapshot-id"))
.ifPresent(value -> builder.startSnapshotId(Long.parseLong(value)));
Optional.ofNullable(options.get("end-snapshot-id"))
diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
index 10fa4ecf13..abcce11e36 100644
--- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
+++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
@@ -30,6 +30,7 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
+import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestHelpers;
@@ -211,6 +212,24 @@ public class TestStreamScanSql extends FlinkCatalogTestBase {
result.getJobClient().ifPresent(JobClient::cancel);
}
+ @Test
+ public void testConsumeFilesWithBranch() throws Exception {
+ sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
+ Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+ Row row1 = Row.of(1, "aaa", "2021-01-01");
+ Row row2 = Row.of(2, "bbb", "2021-01-01");
+ insertRows(table, row1, row2);
+
+ AssertHelpers.assertThrows(
+ "Cannot scan table using ref for stream yet",
+ IllegalArgumentException.class,
+ "Cannot scan table using ref",
+ () ->
+ exec(
+ "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='b1')*/",
+ TABLE));
+ }
+
@Test
public void testConsumeFromStartSnapshotId() throws Exception {
sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
@@ -234,7 +253,47 @@ public class TestStreamScanSql extends FlinkCatalogTestBase {
+ "'start-snapshot-id'='%d')*/",
TABLE, startSnapshotId);
try (CloseableIterator<Row> iterator = result.collect()) {
- // The row2 in start snapshot will be excluded.
+ // the start snapshot(row2) is exclusive.
+ assertRows(ImmutableList.of(row3, row4), iterator);
+
+ Row row5 = Row.of(5, "eee", "2021-01-01");
+ Row row6 = Row.of(6, "fff", "2021-01-01");
+ insertRows(table, row5, row6);
+ assertRows(ImmutableList.of(row5, row6), iterator);
+
+ Row row7 = Row.of(7, "ggg", "2021-01-01");
+ insertRows(table, row7);
+ assertRows(ImmutableList.of(row7), iterator);
+ }
+ result.getJobClient().ifPresent(JobClient::cancel);
+ }
+
+ @Test
+ public void testConsumeFromStartTag() throws Exception {
+ sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
+ Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+
+ // Produce two snapshots.
+ Row row1 = Row.of(1, "aaa", "2021-01-01");
+ Row row2 = Row.of(2, "bbb", "2021-01-01");
+ insertRows(table, row1);
+ insertRows(table, row2);
+
+ String tagName = "t1";
+ long startSnapshotId = table.currentSnapshot().snapshotId();
+ table.manageSnapshots().createTag(tagName, startSnapshotId).commit();
+
+ Row row3 = Row.of(3, "ccc", "2021-01-01");
+ Row row4 = Row.of(4, "ddd", "2021-01-01");
+ insertRows(table, row3, row4);
+
+ TableResult result =
+ exec(
+ "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', "
+ + "'start-tag'='%s')*/",
+ TABLE, tagName);
+ try (CloseableIterator<Row> iterator = result.collect()) {
+ // the start snapshot(row2) is exclusive.
assertRows(ImmutableList.of(row3, row4), iterator);
Row row5 = Row.of(5, "eee", "2021-01-01");
@@ -247,5 +306,15 @@ public class TestStreamScanSql extends FlinkCatalogTestBase {
assertRows(ImmutableList.of(row7), iterator);
}
result.getJobClient().ifPresent(JobClient::cancel);
+
+ AssertHelpers.assertThrows(
+ "START_SNAPSHOT_ID and START_TAG cannot both be set.",
+ IllegalArgumentException.class,
+ "START_SNAPSHOT_ID and START_TAG cannot both be set.",
+ () ->
+ exec(
+ "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-tag'='%s', "
+ + "'start-snapshot-id'='%d' )*/",
+ TABLE, tagName, startSnapshotId));
}
}
diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
index 6f8789c92b..a161645979 100644
--- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
+++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
@@ -161,6 +161,42 @@ public class TestStreamingMonitorFunction extends TableTestBase {
}
}
+ @Test
+ public void testConsumeFromStartTag() throws Exception {
+ // Commit the first five transactions.
+ generateRecordsAndCommitTxn(5);
+ long startSnapshotId = table.currentSnapshot().snapshotId();
+ String tagName = "t1";
+ table.manageSnapshots().createTag(tagName, startSnapshotId).commit();
+
+ // Commit the next five transactions.
+ List<List<Record>> recordsList = generateRecordsAndCommitTxn(5);
+
+ ScanContext scanContext =
+ ScanContext.builder().monitorInterval(Duration.ofMillis(100)).startTag(tagName).build();
+
+ StreamingMonitorFunction function = createFunction(scanContext);
+ try (AbstractStreamOperatorTestHarness<FlinkInputSplit> harness = createHarness(function)) {
+ harness.setup();
+ harness.open();
+
+ CountDownLatch latch = new CountDownLatch(1);
+ TestSourceContext sourceContext = new TestSourceContext(latch);
+ runSourceFunctionInTask(sourceContext, function);
+
+ Assert.assertTrue(
+ "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
+ Thread.sleep(1000L);
+
+ // Stop the stream task.
+ function.close();
+
+ Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size());
+ TestHelpers.assertRecords(
+ sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA);
+ }
+ }
+
@Test
public void testCheckpointRestore() throws Exception {
List<List<Record>> recordsList = generateRecordsAndCommitTxn(10);