You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by st...@apache.org on 2023/04/16 03:58:36 UTC

[iceberg] branch master updated: Flink: use correct scan mode when in TABLE_SCAN_THEN_INCREMENTAL mode (#7338)

This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new b78d336163 Flink: use correct scan mode when in TABLE_SCAN_THEN_INCREMENTAL mode (#7338)
b78d336163 is described below

commit b78d3361635e924ed23fd8fe87b5f966b1953dd9
Author: Chen, Junjie <ji...@tencent.com>
AuthorDate: Sun Apr 16 11:58:29 2023 +0800

    Flink: use correct scan mode when in TABLE_SCAN_THEN_INCREMENTAL mode (#7338)
---
 .../iceberg/flink/source/FlinkSplitPlanner.java    | 10 +++--
 .../enumerator/ContinuousSplitPlannerImpl.java     |  4 +-
 .../flink/source/TestIcebergSourceContinuous.java  | 51 ++++++++++++++++++++++
 3 files changed, 60 insertions(+), 5 deletions(-)

diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
index 38a55e437d..ea317e93d8 100644
--- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
+++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.hadoop.Util;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.util.Tasks;
@@ -136,14 +137,15 @@ public class FlinkSplitPlanner {
     }
   }
 
-  private enum ScanMode {
+  @VisibleForTesting
+  enum ScanMode {
     BATCH,
     INCREMENTAL_APPEND_SCAN
   }
 
-  private static ScanMode checkScanMode(ScanContext context) {
-    if (context.isStreaming()
-        || context.startSnapshotId() != null
+  @VisibleForTesting
+  static ScanMode checkScanMode(ScanContext context) {
+    if (context.startSnapshotId() != null
         || context.endSnapshotId() != null
         || context.startTag() != null
         || context.endTag() != null) {
diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
index 85ecaaee1f..f0d8ca8d70 100644
--- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
+++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
@@ -166,7 +166,9 @@ public class ContinuousSplitPlannerImpl implements ContinuousSplitPlanner {
     if (scanContext.streamingStartingStrategy()
         == StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) {
       // do a batch table scan first
-      splits = FlinkSplitPlanner.planIcebergSourceSplits(table, scanContext, workerPool);
+      splits =
+          FlinkSplitPlanner.planIcebergSourceSplits(
+              table, scanContext.copyWithSnapshotId(startSnapshot.snapshotId()), workerPool);
       LOG.info(
           "Discovered {} splits from initial batch table scan with snapshot Id {}",
           splits.size(),
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
index 582a125233..6d26f933b3 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
@@ -47,6 +47,7 @@ import org.apache.iceberg.flink.TestHelpers;
 import org.apache.iceberg.flink.data.RowDataToRowMapper;
 import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
@@ -111,6 +112,56 @@ public class TestIcebergSourceContinuous {
     }
   }
 
+  @Test
+  public void testTableScanThenIncrementalAfterExpiration() throws Exception {
+    GenericAppenderHelper dataAppender =
+        new GenericAppenderHelper(tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
+
+    // snapshot1
+    List<Record> batch1 =
+        RandomGenericData.generate(tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+    dataAppender.appendToTable(batch1);
+    long snapshotId = tableResource.table().currentSnapshot().snapshotId();
+
+    // snapshot2
+    List<Record> batch2 =
+        RandomGenericData.generate(tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+    dataAppender.appendToTable(batch2);
+
+    tableResource.table().expireSnapshots().expireSnapshotId(snapshotId).commit();
+
+    Assert.assertEquals(1, tableResource.table().history().size());
+
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .monitorInterval(Duration.ofMillis(10L))
+            .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+            .build();
+
+    Assert.assertEquals(
+        FlinkSplitPlanner.ScanMode.BATCH, FlinkSplitPlanner.checkScanMode(scanContext));
+
+    try (CloseableIterator<Row> iter =
+        createStream(scanContext).executeAndCollect(getClass().getSimpleName())) {
+      List<Row> result1 = waitForResult(iter, 4);
+      List<Record> initialRecords = Lists.newArrayList();
+      initialRecords.addAll(batch1);
+      initialRecords.addAll(batch2);
+      TestHelpers.assertRecords(result1, initialRecords, tableResource.table().schema());
+
+      // snapshot3
+      List<Record> batch3 =
+          RandomGenericData.generate(
+              tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+      dataAppender.appendToTable(batch3);
+      tableResource.table().currentSnapshot().snapshotId();
+
+      List<Row> result3 = waitForResult(iter, 2);
+      TestHelpers.assertRecords(result3, batch3, tableResource.table().schema());
+    }
+  }
+
   @Test
   public void testEarliestSnapshot() throws Exception {
     GenericAppenderHelper dataAppender =