You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by st...@apache.org on 2023/04/16 03:58:36 UTC
[iceberg] branch master updated: Flink: use correct scan mode when in TABLE_SCAN_THEN_INCREMENTAL mode (#7338)
This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new b78d336163 Flink: use correct scan mode when in TABLE_SCAN_THEN_INCREMENTAL mode (#7338)
b78d336163 is described below
commit b78d3361635e924ed23fd8fe87b5f966b1953dd9
Author: Chen, Junjie <ji...@tencent.com>
AuthorDate: Sun Apr 16 11:58:29 2023 +0800
Flink: use correct scan mode when in TABLE_SCAN_THEN_INCREMENTAL mode (#7338)
---
.../iceberg/flink/source/FlinkSplitPlanner.java | 10 +++--
.../enumerator/ContinuousSplitPlannerImpl.java | 4 +-
.../flink/source/TestIcebergSourceContinuous.java | 51 ++++++++++++++++++++++
3 files changed, 60 insertions(+), 5 deletions(-)
diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
index 38a55e437d..ea317e93d8 100644
--- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
+++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.Tasks;
@@ -136,14 +137,15 @@ public class FlinkSplitPlanner {
}
}
- private enum ScanMode {
+ @VisibleForTesting
+ enum ScanMode {
BATCH,
INCREMENTAL_APPEND_SCAN
}
- private static ScanMode checkScanMode(ScanContext context) {
- if (context.isStreaming()
- || context.startSnapshotId() != null
+ @VisibleForTesting
+ static ScanMode checkScanMode(ScanContext context) {
+ if (context.startSnapshotId() != null
|| context.endSnapshotId() != null
|| context.startTag() != null
|| context.endTag() != null) {
diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
index 85ecaaee1f..f0d8ca8d70 100644
--- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
+++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
@@ -166,7 +166,9 @@ public class ContinuousSplitPlannerImpl implements ContinuousSplitPlanner {
if (scanContext.streamingStartingStrategy()
== StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) {
// do a batch table scan first
- splits = FlinkSplitPlanner.planIcebergSourceSplits(table, scanContext, workerPool);
+ splits =
+ FlinkSplitPlanner.planIcebergSourceSplits(
+ table, scanContext.copyWithSnapshotId(startSnapshot.snapshotId()), workerPool);
LOG.info(
"Discovered {} splits from initial batch table scan with snapshot Id {}",
splits.size(),
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
index 582a125233..6d26f933b3 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
@@ -47,6 +47,7 @@ import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.flink.data.RowDataToRowMapper;
import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
@@ -111,6 +112,56 @@ public class TestIcebergSourceContinuous {
}
}
+ @Test
+ public void testTableScanThenIncrementalAfterExpiration() throws Exception {
+ GenericAppenderHelper dataAppender =
+ new GenericAppenderHelper(tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
+
+ // snapshot1
+ List<Record> batch1 =
+ RandomGenericData.generate(tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+ dataAppender.appendToTable(batch1);
+ long snapshotId = tableResource.table().currentSnapshot().snapshotId();
+
+ // snapshot2
+ List<Record> batch2 =
+ RandomGenericData.generate(tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+ dataAppender.appendToTable(batch2);
+
+ tableResource.table().expireSnapshots().expireSnapshotId(snapshotId).commit();
+
+ Assert.assertEquals(1, tableResource.table().history().size());
+
+ ScanContext scanContext =
+ ScanContext.builder()
+ .streaming(true)
+ .monitorInterval(Duration.ofMillis(10L))
+ .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+ .build();
+
+ Assert.assertEquals(
+ FlinkSplitPlanner.ScanMode.BATCH, FlinkSplitPlanner.checkScanMode(scanContext));
+
+ try (CloseableIterator<Row> iter =
+ createStream(scanContext).executeAndCollect(getClass().getSimpleName())) {
+ List<Row> result1 = waitForResult(iter, 4);
+ List<Record> initialRecords = Lists.newArrayList();
+ initialRecords.addAll(batch1);
+ initialRecords.addAll(batch2);
+ TestHelpers.assertRecords(result1, initialRecords, tableResource.table().schema());
+
+ // snapshot3
+ List<Record> batch3 =
+ RandomGenericData.generate(
+ tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+ dataAppender.appendToTable(batch3);
+ tableResource.table().currentSnapshot().snapshotId();
+
+ List<Row> result3 = waitForResult(iter, 2);
+ TestHelpers.assertRecords(result3, batch3, tableResource.table().schema());
+ }
+ }
+
@Test
public void testEarliestSnapshot() throws Exception {
GenericAppenderHelper dataAppender =