You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/06/07 00:20:06 UTC
[iceberg] branch master updated: Flink 1.15: Port PR #4329 to add FLIP-27 enumerator classes (#4979)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 5d6c6ccec Flink 1.15: Port PR #4329 to add FLIP-27 enumerator classes (#4979)
5d6c6ccec is described below
commit 5d6c6ccecc43f9d9d2348fddbde45b747016d643
Author: Steven Zhen Wu <st...@gmail.com>
AuthorDate: Mon Jun 6 17:20:01 2022 -0700
Flink 1.15: Port PR #4329 to add FLIP-27 enumerator classes (#4979)
---
.../iceberg/flink/source/FlinkSplitPlanner.java | 79 +++-
.../apache/iceberg/flink/source/ScanContext.java | 149 ++++---
.../flink/source/StreamingStartingStrategy.java | 58 +++
.../enumerator/ContinuousEnumerationResult.java | 58 +++
.../source/enumerator/ContinuousSplitPlanner.java | 35 ++
.../enumerator/ContinuousSplitPlannerImpl.java | 189 +++++++++
.../enumerator/IcebergEnumeratorPosition.java | 82 ++++
.../apache/iceberg/flink/HadoopTableResource.java | 93 +++++
.../apache/iceberg/flink/source/TestFlinkScan.java | 6 +-
.../enumerator/ManualContinuousSplitPlanner.java | 57 +++
.../enumerator/TestContinuousSplitPlannerImpl.java | 463 +++++++++++++++++++++
...estContinuousSplitPlannerImplStartStrategy.java | 187 +++++++++
12 files changed, 1383 insertions(+), 73 deletions(-)
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
index a2df652e8..2e5024324 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
@@ -25,6 +25,8 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.IncrementalAppendScan;
+import org.apache.iceberg.Scan;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
@@ -76,50 +78,81 @@ public class FlinkSplitPlanner {
}
static CloseableIterable<CombinedScanTask> planTasks(Table table, ScanContext context, ExecutorService workerPool) {
- TableScan scan = table
- .newScan()
- .caseSensitive(context.caseSensitive())
- .project(context.project())
- .planWith(workerPool);
+ ScanMode scanMode = checkScanMode(context);
+ if (scanMode == ScanMode.INCREMENTAL_APPEND_SCAN) {
+ IncrementalAppendScan scan = table.newIncrementalAppendScan();
+ scan = refineScanWithBaseConfigs(scan, context, workerPool);
- if (context.includeColumnStats()) {
- scan = scan.includeColumnStats();
- }
+ if (context.startSnapshotId() != null) {
+ scan = scan.fromSnapshotExclusive(context.startSnapshotId());
+ }
+
+ if (context.endSnapshotId() != null) {
+ scan = scan.toSnapshot(context.endSnapshotId());
+ }
+
+ return scan.planTasks();
+ } else {
+ TableScan scan = table.newScan();
+ scan = refineScanWithBaseConfigs(scan, context, workerPool);
+
+ if (context.snapshotId() != null) {
+ scan = scan.useSnapshot(context.snapshotId());
+ }
+
+ if (context.asOfTimestamp() != null) {
+ scan = scan.asOfTime(context.asOfTimestamp());
+ }
- if (context.snapshotId() != null) {
- scan = scan.useSnapshot(context.snapshotId());
+ return scan.planTasks();
}
+ }
+
+ private enum ScanMode {
+ BATCH,
+ INCREMENTAL_APPEND_SCAN
+ }
- if (context.asOfTimestamp() != null) {
- scan = scan.asOfTime(context.asOfTimestamp());
+ private static ScanMode checkScanMode(ScanContext context) {
+ if (context.isStreaming() || context.startSnapshotId() != null || context.endSnapshotId() != null) {
+ return ScanMode.INCREMENTAL_APPEND_SCAN;
+ } else {
+ return ScanMode.BATCH;
}
+ }
- if (context.startSnapshotId() != null) {
- if (context.endSnapshotId() != null) {
- scan = scan.appendsBetween(context.startSnapshotId(), context.endSnapshotId());
- } else {
- scan = scan.appendsAfter(context.startSnapshotId());
- }
+ /**
+ * refine scan with common configs
+ */
+ private static <T extends Scan<T>> T refineScanWithBaseConfigs(
+ T scan, ScanContext context, ExecutorService workerPool) {
+ T refinedScan = scan
+ .caseSensitive(context.caseSensitive())
+ .project(context.project())
+ .planWith(workerPool);
+
+ if (context.includeColumnStats()) {
+ refinedScan = refinedScan.includeColumnStats();
}
if (context.splitSize() != null) {
- scan = scan.option(TableProperties.SPLIT_SIZE, context.splitSize().toString());
+ refinedScan = refinedScan.option(TableProperties.SPLIT_SIZE, context.splitSize().toString());
}
if (context.splitLookback() != null) {
- scan = scan.option(TableProperties.SPLIT_LOOKBACK, context.splitLookback().toString());
+ refinedScan = refinedScan.option(TableProperties.SPLIT_LOOKBACK, context.splitLookback().toString());
}
if (context.splitOpenFileCost() != null) {
- scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, context.splitOpenFileCost().toString());
+ refinedScan = refinedScan.option(TableProperties.SPLIT_OPEN_FILE_COST, context.splitOpenFileCost().toString());
}
if (context.filters() != null) {
for (Expression filter : context.filters()) {
- scan = scan.filter(filter);
+ refinedScan = refinedScan.filter(filter);
}
}
- return scan.planTasks();
+ return refinedScan;
}
}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index 806006dd1..84c7652ef 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -23,9 +23,11 @@ import java.io.Serializable;
import java.time.Duration;
import java.util.List;
import java.util.Map;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Preconditions;
import org.apache.iceberg.Schema;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkConfigOptions;
@@ -35,7 +37,8 @@ import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
/**
* Context object with optional arguments for a Flink Scan.
*/
-class ScanContext implements Serializable {
+@Internal
+public class ScanContext implements Serializable {
private static final long serialVersionUID = 1L;
@@ -48,6 +51,13 @@ class ScanContext implements Serializable {
private static final ConfigOption<Long> AS_OF_TIMESTAMP =
ConfigOptions.key("as-of-timestamp").longType().defaultValue(null);
+ private static final ConfigOption<StreamingStartingStrategy> STARTING_STRATEGY =
+ ConfigOptions.key("starting-strategy").enumType(StreamingStartingStrategy.class)
+ .defaultValue(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT);
+
+ private static final ConfigOption<Long> START_SNAPSHOT_TIMESTAMP =
+ ConfigOptions.key("start-snapshot-timestamp").longType().defaultValue(null);
+
private static final ConfigOption<Long> START_SNAPSHOT_ID =
ConfigOptions.key("start-snapshot-id").longType().defaultValue(null);
@@ -75,7 +85,9 @@ class ScanContext implements Serializable {
private final boolean caseSensitive;
private final boolean exposeLocality;
private final Long snapshotId;
+ private final StreamingStartingStrategy startingStrategy;
private final Long startSnapshotId;
+ private final Long startSnapshotTimestamp;
private final Long endSnapshotId;
private final Long asOfTimestamp;
private final Long splitSize;
@@ -91,13 +103,15 @@ class ScanContext implements Serializable {
private final boolean includeColumnStats;
private final Integer planParallelism;
- private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId, Long endSnapshotId,
- Long asOfTimestamp, Long splitSize, Integer splitLookback, Long splitOpenFileCost,
- boolean isStreaming, Duration monitorInterval, String nameMapping, Schema schema,
- List<Expression> filters, long limit, boolean includeColumnStats, boolean exposeLocality,
- Integer planParallelism) {
+ private ScanContext(boolean caseSensitive, Long snapshotId, StreamingStartingStrategy startingStrategy,
+ Long startSnapshotTimestamp, Long startSnapshotId, Long endSnapshotId, Long asOfTimestamp,
+ Long splitSize, Integer splitLookback, Long splitOpenFileCost, boolean isStreaming,
+ Duration monitorInterval, String nameMapping, Schema schema, List<Expression> filters,
+ long limit, boolean includeColumnStats, boolean exposeLocality, Integer planParallelism) {
this.caseSensitive = caseSensitive;
this.snapshotId = snapshotId;
+ this.startingStrategy = startingStrategy;
+ this.startSnapshotTimestamp = startSnapshotTimestamp;
this.startSnapshotId = startSnapshotId;
this.endSnapshotId = endSnapshotId;
this.asOfTimestamp = asOfTimestamp;
@@ -114,77 +128,104 @@ class ScanContext implements Serializable {
this.includeColumnStats = includeColumnStats;
this.exposeLocality = exposeLocality;
this.planParallelism = planParallelism;
+
+ validate();
+ }
+
+ private void validate() {
+ if (isStreaming) {
+ if (startingStrategy == StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID) {
+ Preconditions.checkArgument(startSnapshotId != null,
+ "Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: null");
+ Preconditions.checkArgument(startSnapshotTimestamp == null,
+ "Invalid starting snapshot timestamp for SPECIFIC_START_SNAPSHOT_ID strategy: not null");
+ }
+ if (startingStrategy == StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP) {
+ Preconditions.checkArgument(startSnapshotTimestamp != null,
+ "Invalid starting snapshot timestamp for SPECIFIC_START_SNAPSHOT_TIMESTAMP strategy: null");
+ Preconditions.checkArgument(startSnapshotId == null,
+ "Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: not null");
+ }
+ }
}
- boolean caseSensitive() {
+ public boolean caseSensitive() {
return caseSensitive;
}
- Long snapshotId() {
+ public Long snapshotId() {
return snapshotId;
}
- Long startSnapshotId() {
+ public StreamingStartingStrategy startingStrategy() {
+ return startingStrategy;
+ }
+
+ public Long startSnapshotTimestamp() {
+ return startSnapshotTimestamp;
+ }
+
+ public Long startSnapshotId() {
return startSnapshotId;
}
- Long endSnapshotId() {
+ public Long endSnapshotId() {
return endSnapshotId;
}
- Long asOfTimestamp() {
+ public Long asOfTimestamp() {
return asOfTimestamp;
}
- Long splitSize() {
+ public Long splitSize() {
return splitSize;
}
- Integer splitLookback() {
+ public Integer splitLookback() {
return splitLookback;
}
- Long splitOpenFileCost() {
+ public Long splitOpenFileCost() {
return splitOpenFileCost;
}
- boolean isStreaming() {
+ public boolean isStreaming() {
return isStreaming;
}
- Duration monitorInterval() {
+ public Duration monitorInterval() {
return monitorInterval;
}
- String nameMapping() {
+ public String nameMapping() {
return nameMapping;
}
- Schema project() {
+ public Schema project() {
return schema;
}
- List<Expression> filters() {
+ public List<Expression> filters() {
return filters;
}
- long limit() {
+ public long limit() {
return limit;
}
- boolean includeColumnStats() {
+ public boolean includeColumnStats() {
return includeColumnStats;
}
- boolean exposeLocality() {
+ public boolean exposeLocality() {
return exposeLocality;
}
- Integer planParallelism() {
+ public Integer planParallelism() {
return planParallelism;
}
- ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotId) {
+ public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSnapshotId) {
return ScanContext.builder()
.caseSensitive(caseSensitive)
.useSnapshotId(null)
@@ -206,7 +247,7 @@ class ScanContext implements Serializable {
.build();
}
- ScanContext copyWithSnapshotId(long newSnapshotId) {
+ public ScanContext copyWithSnapshotId(long newSnapshotId) {
return ScanContext.builder()
.caseSensitive(caseSensitive)
.useSnapshotId(newSnapshotId)
@@ -228,13 +269,15 @@ class ScanContext implements Serializable {
.build();
}
- static Builder builder() {
+ public static Builder builder() {
return new Builder();
}
- static class Builder {
+ public static class Builder {
private boolean caseSensitive = CASE_SENSITIVE.defaultValue();
private Long snapshotId = SNAPSHOT_ID.defaultValue();
+ private StreamingStartingStrategy startingStrategy = STARTING_STRATEGY.defaultValue();
+ private Long startSnapshotTimestamp = START_SNAPSHOT_TIMESTAMP.defaultValue();
private Long startSnapshotId = START_SNAPSHOT_ID.defaultValue();
private Long endSnapshotId = END_SNAPSHOT_ID.defaultValue();
private Long asOfTimestamp = AS_OF_TIMESTAMP.defaultValue();
@@ -254,98 +297,110 @@ class ScanContext implements Serializable {
private Builder() {
}
- Builder caseSensitive(boolean newCaseSensitive) {
+ public Builder caseSensitive(boolean newCaseSensitive) {
this.caseSensitive = newCaseSensitive;
return this;
}
- Builder useSnapshotId(Long newSnapshotId) {
+ public Builder useSnapshotId(Long newSnapshotId) {
this.snapshotId = newSnapshotId;
return this;
}
- Builder startSnapshotId(Long newStartSnapshotId) {
+ public Builder startingStrategy(StreamingStartingStrategy newStartingStrategy) {
+ this.startingStrategy = newStartingStrategy;
+ return this;
+ }
+
+ public Builder startSnapshotTimestamp(Long newStartSnapshotTimestamp) {
+ this.startSnapshotTimestamp = newStartSnapshotTimestamp;
+ return this;
+ }
+
+ public Builder startSnapshotId(Long newStartSnapshotId) {
this.startSnapshotId = newStartSnapshotId;
return this;
}
- Builder endSnapshotId(Long newEndSnapshotId) {
+ public Builder endSnapshotId(Long newEndSnapshotId) {
this.endSnapshotId = newEndSnapshotId;
return this;
}
- Builder asOfTimestamp(Long newAsOfTimestamp) {
+ public Builder asOfTimestamp(Long newAsOfTimestamp) {
this.asOfTimestamp = newAsOfTimestamp;
return this;
}
- Builder splitSize(Long newSplitSize) {
+ public Builder splitSize(Long newSplitSize) {
this.splitSize = newSplitSize;
return this;
}
- Builder splitLookback(Integer newSplitLookback) {
+ public Builder splitLookback(Integer newSplitLookback) {
this.splitLookback = newSplitLookback;
return this;
}
- Builder splitOpenFileCost(Long newSplitOpenFileCost) {
+ public Builder splitOpenFileCost(Long newSplitOpenFileCost) {
this.splitOpenFileCost = newSplitOpenFileCost;
return this;
}
- Builder streaming(boolean streaming) {
+ public Builder streaming(boolean streaming) {
this.isStreaming = streaming;
return this;
}
- Builder monitorInterval(Duration newMonitorInterval) {
+ public Builder monitorInterval(Duration newMonitorInterval) {
this.monitorInterval = newMonitorInterval;
return this;
}
- Builder nameMapping(String newNameMapping) {
+ public Builder nameMapping(String newNameMapping) {
this.nameMapping = newNameMapping;
return this;
}
- Builder project(Schema newProjectedSchema) {
+ public Builder project(Schema newProjectedSchema) {
this.projectedSchema = newProjectedSchema;
return this;
}
- Builder filters(List<Expression> newFilters) {
+ public Builder filters(List<Expression> newFilters) {
this.filters = newFilters;
return this;
}
- Builder limit(long newLimit) {
+ public Builder limit(long newLimit) {
this.limit = newLimit;
return this;
}
- Builder includeColumnStats(boolean newIncludeColumnStats) {
+ public Builder includeColumnStats(boolean newIncludeColumnStats) {
this.includeColumnStats = newIncludeColumnStats;
return this;
}
- Builder exposeLocality(boolean newExposeLocality) {
+ public Builder exposeLocality(boolean newExposeLocality) {
this.exposeLocality = newExposeLocality;
return this;
}
- Builder planParallelism(Integer parallelism) {
+ public Builder planParallelism(Integer parallelism) {
this.planParallelism = parallelism;
return this;
}
- Builder fromProperties(Map<String, String> properties) {
+ public Builder fromProperties(Map<String, String> properties) {
Configuration config = new Configuration();
properties.forEach(config::setString);
return this.useSnapshotId(config.get(SNAPSHOT_ID))
.caseSensitive(config.get(CASE_SENSITIVE))
.asOfTimestamp(config.get(AS_OF_TIMESTAMP))
+ .startingStrategy(config.get(STARTING_STRATEGY))
+ .startSnapshotTimestamp(config.get(START_SNAPSHOT_TIMESTAMP))
.startSnapshotId(config.get(START_SNAPSHOT_ID))
.endSnapshotId(config.get(END_SNAPSHOT_ID))
.splitSize(config.get(SPLIT_SIZE))
@@ -358,8 +413,8 @@ class ScanContext implements Serializable {
}
public ScanContext build() {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
- endSnapshotId, asOfTimestamp, splitSize, splitLookback,
+ return new ScanContext(caseSensitive, snapshotId, startingStrategy, startSnapshotTimestamp,
+ startSnapshotId, endSnapshotId, asOfTimestamp, splitSize, splitLookback,
splitOpenFileCost, isStreaming, monitorInterval, nameMapping, projectedSchema,
filters, limit, includeColumnStats, exposeLocality, planParallelism);
}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/StreamingStartingStrategy.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/StreamingStartingStrategy.java
new file mode 100644
index 000000000..3e83fbe7f
--- /dev/null
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/StreamingStartingStrategy.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+/**
+ * Starting strategy for streaming execution.
+ */
+public enum StreamingStartingStrategy {
+ /**
+ * Do a regular table scan then switch to the incremental mode.
+ * <p>
+ * The incremental mode starts from the current snapshot exclusive.
+ */
+ TABLE_SCAN_THEN_INCREMENTAL,
+
+ /**
+ * Start incremental mode from the latest snapshot inclusive.
+ * <p>
+ * If it is an empty map, all future append snapshots should be discovered.
+ */
+ INCREMENTAL_FROM_LATEST_SNAPSHOT,
+
+ /**
+ * Start incremental mode from the earliest snapshot inclusive.
+ * <p>
+ * If it is an empty map, all future append snapshots should be discovered.
+ */
+ INCREMENTAL_FROM_EARLIEST_SNAPSHOT,
+
+ /**
+ * Start incremental mode from a snapshot with a specific id inclusive.
+ */
+ INCREMENTAL_FROM_SNAPSHOT_ID,
+
+ /**
+ * Start incremental mode from a snapshot with a specific timestamp inclusive.
+ * <p>
+ * If the timestamp is between two snapshots, it should start from the snapshot after the timestamp.
+ */
+ INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP
+}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousEnumerationResult.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousEnumerationResult.java
new file mode 100644
index 000000000..8c20f2cf2
--- /dev/null
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousEnumerationResult.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.util.Collection;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+class ContinuousEnumerationResult {
+ private final Collection<IcebergSourceSplit> splits;
+ private final IcebergEnumeratorPosition fromPosition;
+ private final IcebergEnumeratorPosition toPosition;
+
+ /**
+ * @param splits should never be null. But it can be an empty collection
+ * @param fromPosition can be null
+ * @param toPosition should never be null. But it can have null snapshotId and snapshotTimestampMs
+ */
+ ContinuousEnumerationResult(
+ Collection<IcebergSourceSplit> splits,
+ IcebergEnumeratorPosition fromPosition,
+ IcebergEnumeratorPosition toPosition) {
+ Preconditions.checkArgument(splits != null, "Invalid to splits collection: null");
+ Preconditions.checkArgument(toPosition != null, "Invalid end position: null");
+ this.splits = splits;
+ this.fromPosition = fromPosition;
+ this.toPosition = toPosition;
+ }
+
+ public Collection<IcebergSourceSplit> splits() {
+ return splits;
+ }
+
+ public IcebergEnumeratorPosition fromPosition() {
+ return fromPosition;
+ }
+
+ public IcebergEnumeratorPosition toPosition() {
+ return toPosition;
+ }
+}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlanner.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlanner.java
new file mode 100644
index 000000000..1737ae6a5
--- /dev/null
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlanner.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.io.Closeable;
+import org.apache.flink.annotation.Internal;
+
+/**
+ * This interface is introduced so that we can plug in different split planner for unit test
+ */
+@Internal
+public interface ContinuousSplitPlanner extends Closeable {
+
+ /**
+ * Discover the files appended between {@code lastPosition} and current table snapshot
+ */
+ ContinuousEnumerationResult planSplits(IcebergEnumeratorPosition lastPosition);
+}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
new file mode 100644
index 000000000..4d6e89684
--- /dev/null
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.source.FlinkSplitPlanner;
+import org.apache.iceberg.flink.source.ScanContext;
+import org.apache.iceberg.flink.source.StreamingStartingStrategy;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public class ContinuousSplitPlannerImpl implements ContinuousSplitPlanner {
+ private static final Logger LOG = LoggerFactory.getLogger(ContinuousSplitPlannerImpl.class);
+
+ private final Table table;
+ private final ScanContext scanContext;
+ private final boolean isSharedPool;
+ private final ExecutorService workerPool;
+
+ /**
+ * @param threadName thread name prefix for worker pool to run the split planning.
+ * If null, a shared worker pool will be used.
+ */
+ public ContinuousSplitPlannerImpl(Table table, ScanContext scanContext, String threadName) {
+ this.table = table;
+ this.scanContext = scanContext;
+ this.isSharedPool = threadName == null;
+ this.workerPool = isSharedPool ? ThreadPools.getWorkerPool()
+ : ThreadPools.newWorkerPool("iceberg-plan-worker-pool-" + threadName, scanContext.planParallelism());
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!isSharedPool) {
+ workerPool.shutdown();
+ }
+ }
+
+ @Override
+ public ContinuousEnumerationResult planSplits(IcebergEnumeratorPosition lastPosition) {
+ table.refresh();
+ if (lastPosition != null) {
+ return discoverIncrementalSplits(lastPosition);
+ } else {
+ return discoverInitialSplits();
+ }
+ }
+
+ /**
+ * Discover incremental changes between @{code lastPosition} and current table snapshot
+ */
+ private ContinuousEnumerationResult discoverIncrementalSplits(IcebergEnumeratorPosition lastPosition) {
+ Snapshot currentSnapshot = table.currentSnapshot();
+ if (currentSnapshot == null) {
+ // empty table
+ Preconditions.checkArgument(lastPosition.snapshotId() == null,
+ "Invalid last enumerated position for an empty table: not null");
+ LOG.info("Skip incremental scan because table is empty");
+ return new ContinuousEnumerationResult(Collections.emptyList(), lastPosition, lastPosition);
+ } else if (lastPosition.snapshotId() != null && currentSnapshot.snapshotId() == lastPosition.snapshotId()) {
+ LOG.info("Current table snapshot is already enumerated: {}", currentSnapshot.snapshotId());
+ return new ContinuousEnumerationResult(Collections.emptyList(), lastPosition, lastPosition);
+ } else {
+ IcebergEnumeratorPosition newPosition = IcebergEnumeratorPosition.of(
+ currentSnapshot.snapshotId(), currentSnapshot.timestampMillis());
+ ScanContext incrementalScan = scanContext
+ .copyWithAppendsBetween(lastPosition.snapshotId(), currentSnapshot.snapshotId());
+ List<IcebergSourceSplit> splits = FlinkSplitPlanner.planIcebergSourceSplits(table, incrementalScan, workerPool);
+ LOG.info("Discovered {} splits from incremental scan: " +
+ "from snapshot (exclusive) is {}, to snapshot (inclusive) is {}",
+ splits.size(), lastPosition, newPosition);
+ return new ContinuousEnumerationResult(splits, lastPosition, newPosition);
+ }
+ }
+
+ /**
+ * Discovery initial set of splits based on {@link StreamingStartingStrategy}.
+ *
+ * <li>{@link ContinuousEnumerationResult#splits()} should contain initial splits
+ * discovered from table scan for {@link StreamingStartingStrategy#TABLE_SCAN_THEN_INCREMENTAL}.
+ * For all other strategies, splits collection should be empty.
+ * <li>{@link ContinuousEnumerationResult#toPosition()} points to the starting position
+ * for the next incremental split discovery with exclusive behavior. Meaning files committed
+ * by the snapshot from the position in {@code ContinuousEnumerationResult} won't be included
+ * in the next incremental scan.
+ */
+ private ContinuousEnumerationResult discoverInitialSplits() {
+ Optional<Snapshot> startSnapshotOptional = startSnapshot(table, scanContext);
+ if (!startSnapshotOptional.isPresent()) {
+ return new ContinuousEnumerationResult(Collections.emptyList(), null,
+ IcebergEnumeratorPosition.empty());
+ }
+
+ Snapshot startSnapshot = startSnapshotOptional.get();
+ LOG.info("Get starting snapshot id {} based on strategy {}",
+ startSnapshot.snapshotId(), scanContext.startingStrategy());
+ List<IcebergSourceSplit> splits;
+ IcebergEnumeratorPosition toPosition;
+ if (scanContext.startingStrategy() == StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) {
+ // do a batch table scan first
+ splits = FlinkSplitPlanner.planIcebergSourceSplits(table, scanContext, workerPool);
+ LOG.info("Discovered {} splits from initial batch table scan with snapshot Id {}",
+ splits.size(), startSnapshot.snapshotId());
+ // For TABLE_SCAN_THEN_INCREMENTAL, incremental mode starts exclusive from the startSnapshot
+ toPosition = IcebergEnumeratorPosition.of(startSnapshot.snapshotId(), startSnapshot.timestampMillis());
+ } else {
+ // For all other modes, starting snapshot should be consumed inclusively.
+ // Use parentId to achieve the inclusive behavior. It is fine if parentId is null.
+ splits = Collections.emptyList();
+ Long parentSnapshotId = startSnapshot.parentId();
+ if (parentSnapshotId != null) {
+ Snapshot parentSnapshot = table.snapshot(parentSnapshotId);
+ Long parentSnapshotTimestampMs = parentSnapshot != null ? parentSnapshot.timestampMillis() : null;
+ toPosition = IcebergEnumeratorPosition.of(parentSnapshotId, parentSnapshotTimestampMs);
+ } else {
+ toPosition = IcebergEnumeratorPosition.empty();
+ }
+
+ LOG.info("Start incremental scan with start snapshot (inclusive): id = {}, timestamp = {}",
+ startSnapshot.snapshotId(), startSnapshot.timestampMillis());
+ }
+
+ return new ContinuousEnumerationResult(splits, null, toPosition);
+ }
+
+ /**
+ * Calculate the starting snapshot based on the {@link StreamingStartingStrategy} defined in {@code ScanContext}.
+ * <p>
+ * If the {@link StreamingStartingStrategy} is not {@link StreamingStartingStrategy#TABLE_SCAN_THEN_INCREMENTAL},
+ * the start snapshot should be consumed inclusively.
+ */
+ @VisibleForTesting
+ static Optional<Snapshot> startSnapshot(Table table, ScanContext scanContext) {
+ switch (scanContext.startingStrategy()) {
+ case TABLE_SCAN_THEN_INCREMENTAL:
+ case INCREMENTAL_FROM_LATEST_SNAPSHOT:
+ return Optional.ofNullable(table.currentSnapshot());
+ case INCREMENTAL_FROM_EARLIEST_SNAPSHOT:
+ return Optional.ofNullable(SnapshotUtil.oldestAncestor(table));
+ case INCREMENTAL_FROM_SNAPSHOT_ID:
+ Snapshot matchedSnapshotById = table.snapshot(scanContext.startSnapshotId());
+ Preconditions.checkArgument(matchedSnapshotById != null,
+ "Start snapshot id not found in history: " + scanContext.startSnapshotId());
+ return Optional.of(matchedSnapshotById);
+ case INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP:
+ long snapshotIdAsOfTime = SnapshotUtil.snapshotIdAsOfTime(table, scanContext.startSnapshotTimestamp());
+ Snapshot matchedSnapshotByTimestamp = table.snapshot(snapshotIdAsOfTime);
+ if (matchedSnapshotByTimestamp.timestampMillis() == scanContext.startSnapshotTimestamp()) {
+ return Optional.of(matchedSnapshotByTimestamp);
+ } else {
+ // if the snapshotIdAsOfTime has the timestamp value smaller than the scanContext.startSnapshotTimestamp(),
+ // return the child snapshot whose timestamp value is larger
+ return Optional.of(SnapshotUtil.snapshotAfter(table, snapshotIdAsOfTime));
+ }
+ default:
+ throw new IllegalArgumentException("Unknown starting strategy: " + scanContext.startingStrategy());
+ }
+ }
+}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorPosition.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorPosition.java
new file mode 100644
index 000000000..e024473da
--- /dev/null
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorPosition.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+
+class IcebergEnumeratorPosition {
+ private final Long snapshotId;
+ // Track snapshot timestamp mainly for info logging
+ private final Long snapshotTimestampMs;
+
+ static IcebergEnumeratorPosition empty() {
+ return new IcebergEnumeratorPosition(null, null);
+ }
+
+ static IcebergEnumeratorPosition of(long snapshotId, Long snapshotTimestampMs) {
+ return new IcebergEnumeratorPosition(snapshotId, snapshotTimestampMs);
+ }
+
+ private IcebergEnumeratorPosition(Long snapshotId, Long snapshotTimestampMs) {
+ this.snapshotId = snapshotId;
+ this.snapshotTimestampMs = snapshotTimestampMs;
+ }
+
+ boolean isEmpty() {
+ return snapshotId == null;
+ }
+
+ Long snapshotId() {
+ return snapshotId;
+ }
+
+ Long snapshotTimestampMs() {
+ return snapshotTimestampMs;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("snapshotId", snapshotId)
+ .add("snapshotTimestampMs", snapshotTimestampMs)
+ .toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(
+ snapshotId,
+ snapshotTimestampMs);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ IcebergEnumeratorPosition other = (IcebergEnumeratorPosition) o;
+ return Objects.equal(snapshotId, other.snapshotId()) &&
+ Objects.equal(snapshotTimestampMs, other.snapshotTimestampMs());
+ }
+}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/HadoopTableResource.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/HadoopTableResource.java
new file mode 100644
index 000000000..bc4e209a4
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/HadoopTableResource.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.File;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.junit.rules.TemporaryFolder;
+
+public class HadoopTableResource extends ExternalResource {
+ private final TemporaryFolder temporaryFolder;
+ private final String database;
+ private final String tableName;
+ private final Schema schema;
+ private final PartitionSpec partitionSpec;
+
+ private HadoopCatalog catalog;
+ private TableLoader tableLoader;
+ private Table table;
+
+ public HadoopTableResource(TemporaryFolder temporaryFolder, String database, String tableName, Schema schema) {
+ this(temporaryFolder, database, tableName, schema, null);
+ }
+
+ public HadoopTableResource(TemporaryFolder temporaryFolder, String database, String tableName,
+ Schema schema, PartitionSpec partitionSpec) {
+ this.temporaryFolder = temporaryFolder;
+ this.database = database;
+ this.tableName = tableName;
+ this.schema = schema;
+ this.partitionSpec = partitionSpec;
+ }
+
+ @Override
+ protected void before() throws Throwable {
+ File warehouseFile = temporaryFolder.newFolder();
+ Assert.assertTrue(warehouseFile.delete());
+ // before variables
+ String warehouse = "file:" + warehouseFile;
+ Configuration hadoopConf = new Configuration();
+ this.catalog = new HadoopCatalog(hadoopConf, warehouse);
+ String location = String.format("%s/%s/%s", warehouse, database, tableName);
+ this.tableLoader = TableLoader.fromHadoopTable(location);
+ if (partitionSpec == null) {
+ this.table = catalog.createTable(TableIdentifier.of(database, tableName), schema);
+ } else {
+ this.table = catalog.createTable(TableIdentifier.of(database, tableName), schema, partitionSpec);
+ }
+ tableLoader.open();
+ }
+
+ @Override
+ protected void after() {
+ try {
+ catalog.dropTable(TableIdentifier.of(database, tableName));
+ catalog.close();
+ tableLoader.close();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to close catalog resource");
+ }
+ }
+
+ public TableLoader tableLoader() {
+ return tableLoader;
+ }
+
+ public Table table() {
+ return table;
+ }
+}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
index 357f5ab14..9284b8fa9 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
@@ -237,15 +237,15 @@ public abstract class TestFlinkScan {
long snapshotId1 = table.currentSnapshot().snapshotId();
// snapshot 2
- List<Record> records2 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
+ List<Record> records2 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 1L);
helper.appendToTable(records2);
- List<Record> records3 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
+ List<Record> records3 = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 2L);
helper.appendToTable(records3);
long snapshotId3 = table.currentSnapshot().snapshotId();
// snapshot 4
- helper.appendToTable(RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L));
+ helper.appendToTable(RandomGenericData.generate(TestFixtures.SCHEMA, 1, 3L));
List<Record> expected2 = Lists.newArrayList();
expected2.addAll(records2);
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java
new file mode 100644
index 000000000..f1db8ef5d
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/ManualContinuousSplitPlanner.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.List;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+class ManualContinuousSplitPlanner implements ContinuousSplitPlanner {
+ private final ArrayDeque<IcebergSourceSplit> splits = new ArrayDeque<>();
+ private IcebergEnumeratorPosition latestPosition;
+
+ @Override
+ public ContinuousEnumerationResult planSplits(IcebergEnumeratorPosition lastPosition) {
+ ContinuousEnumerationResult result = new ContinuousEnumerationResult(
+ Lists.newArrayList(splits), lastPosition, latestPosition);
+ return result;
+ }
+
+ /**
+ * Add new splits to the collection
+ */
+ public void addSplits(List<IcebergSourceSplit> newSplits, IcebergEnumeratorPosition newPosition) {
+ splits.addAll(newSplits);
+ this.latestPosition = newPosition;
+ }
+
+ /**
+ * Clear the splits collection
+ */
+ public void clearSplits() {
+ splits.clear();
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java
new file mode 100644
index 000000000..2bcf2f07d
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.source.ScanContext;
+import org.apache.iceberg.flink.source.StreamingStartingStrategy;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.joda.time.format.DateTimeFormat;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+
+public class TestContinuousSplitPlannerImpl {
+ @ClassRule
+ public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+ private static final FileFormat fileFormat = FileFormat.PARQUET;
+ private static final AtomicLong randomSeed = new AtomicLong();
+
+ @Rule
+ public final HadoopTableResource tableResource = new HadoopTableResource(TEMPORARY_FOLDER,
+ TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.SCHEMA);
+
+ @Rule
+ public TestName testName = new TestName();
+
+ private GenericAppenderHelper dataAppender;
+ private DataFile dataFile1;
+ private Snapshot snapshot1;
+ private DataFile dataFile2;
+ private Snapshot snapshot2;
+
+ @Before
+ public void before() throws IOException {
+ dataAppender = new GenericAppenderHelper(tableResource.table(), fileFormat, TEMPORARY_FOLDER);
+ }
+
+ private void appendTwoSnapshots() throws IOException {
+ // snapshot1
+ List<Record> batch1 = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L);
+ dataFile1 = dataAppender.writeFile(null, batch1);
+ dataAppender.appendToTable(dataFile1);
+ snapshot1 = tableResource.table().currentSnapshot();
+
+ // snapshot2
+ List<Record> batch2 = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 1L);
+ dataFile2 = dataAppender.writeFile(null, batch2);
+ dataAppender.appendToTable(dataFile2);
+ snapshot2 = tableResource.table().currentSnapshot();
+ }
+
+ /**
+ * @return the last enumerated snapshot id
+ */
+ private IcebergEnumeratorPosition verifyOneCycle(
+ ContinuousSplitPlannerImpl splitPlanner, IcebergEnumeratorPosition lastPosition) throws Exception {
+ List<Record> batch = RandomGenericData.generate(TestFixtures.SCHEMA, 2, randomSeed.incrementAndGet());
+ DataFile dataFile = dataAppender.writeFile(null, batch);
+ dataAppender.appendToTable(dataFile);
+ Snapshot snapshot = tableResource.table().currentSnapshot();
+
+ ContinuousEnumerationResult result = splitPlanner.planSplits(lastPosition);
+ Assert.assertEquals(lastPosition.snapshotId(), result.fromPosition().snapshotId());
+ Assert.assertEquals(lastPosition.snapshotTimestampMs(), result.fromPosition().snapshotTimestampMs());
+ Assert.assertEquals(snapshot.snapshotId(), result.toPosition().snapshotId().longValue());
+ Assert.assertEquals(snapshot.timestampMillis(), result.toPosition().snapshotTimestampMs().longValue());
+ Assert.assertEquals(1, result.splits().size());
+ IcebergSourceSplit split = Iterables.getOnlyElement(result.splits());
+ Assert.assertEquals(1, split.task().files().size());
+ Assert.assertEquals(dataFile.path().toString(),
+ Iterables.getOnlyElement(split.task().files()).file().path().toString());
+ return result.toPosition();
+ }
+
+ @Test
+ public void testTableScanThenIncrementalWithEmptyTable() throws Exception {
+ ScanContext scanContext = ScanContext.builder()
+ .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+ .build();
+ ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(
+ tableResource.table(), scanContext, null);
+
+ ContinuousEnumerationResult emptyTableInitialDiscoveryResult = splitPlanner.planSplits(null);
+ Assert.assertTrue(emptyTableInitialDiscoveryResult.splits().isEmpty());
+ Assert.assertNull(emptyTableInitialDiscoveryResult.fromPosition());
+ Assert.assertTrue(emptyTableInitialDiscoveryResult.toPosition().isEmpty());
+ Assert.assertNull(emptyTableInitialDiscoveryResult.toPosition().snapshotTimestampMs());
+
+ ContinuousEnumerationResult emptyTableSecondDiscoveryResult = splitPlanner
+ .planSplits(emptyTableInitialDiscoveryResult.toPosition());
+ Assert.assertTrue(emptyTableSecondDiscoveryResult.splits().isEmpty());
+ Assert.assertTrue(emptyTableSecondDiscoveryResult.fromPosition().isEmpty());
+ Assert.assertNull(emptyTableSecondDiscoveryResult.fromPosition().snapshotTimestampMs());
+ Assert.assertTrue(emptyTableSecondDiscoveryResult.toPosition().isEmpty());
+ Assert.assertNull(emptyTableSecondDiscoveryResult.toPosition().snapshotTimestampMs());
+
+ // next 3 snapshots
+ IcebergEnumeratorPosition lastPosition = emptyTableSecondDiscoveryResult.toPosition();
+ for (int i = 0; i < 3; ++i) {
+ lastPosition = verifyOneCycle(splitPlanner, lastPosition);
+ }
+ }
+
+ @Test
+ public void testTableScanThenIncrementalWithNonEmptyTable() throws Exception {
+ appendTwoSnapshots();
+
+ ScanContext scanContext = ScanContext.builder()
+ .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+ .build();
+ ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(
+ tableResource.table(), scanContext, null);
+
+ ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
+ Assert.assertNull(initialResult.fromPosition());
+ Assert.assertEquals(snapshot2.snapshotId(), initialResult.toPosition().snapshotId().longValue());
+ Assert.assertEquals(snapshot2.timestampMillis(), initialResult.toPosition().snapshotTimestampMs().longValue());
+ Assert.assertEquals(1, initialResult.splits().size());
+ IcebergSourceSplit split = Iterables.getOnlyElement(initialResult.splits());
+ Assert.assertEquals(2, split.task().files().size());
+ Set<String> discoveredFiles = split.task().files().stream()
+ .map(fileScanTask -> fileScanTask.file().path().toString())
+ .collect(Collectors.toSet());
+ Set<String> expectedFiles = ImmutableSet.of(dataFile1.path().toString(), dataFile2.path().toString());
+ Assert.assertEquals(expectedFiles, discoveredFiles);
+
+ IcebergEnumeratorPosition lastPosition = initialResult.toPosition();
+ for (int i = 0; i < 3; ++i) {
+ lastPosition = verifyOneCycle(splitPlanner, lastPosition);
+ }
+ }
+
+ @Test
+ public void testIncrementalFromLatestSnapshotWithEmptyTable() throws Exception {
+ ScanContext scanContext = ScanContext.builder()
+ .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
+ .splitSize(1L)
+ .build();
+ ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(
+ tableResource.table(), scanContext, null);
+
+ ContinuousEnumerationResult emptyTableInitialDiscoveryResult = splitPlanner.planSplits(null);
+ Assert.assertTrue(emptyTableInitialDiscoveryResult.splits().isEmpty());
+ Assert.assertNull(emptyTableInitialDiscoveryResult.fromPosition());
+ Assert.assertTrue(emptyTableInitialDiscoveryResult.toPosition().isEmpty());
+ Assert.assertNull(emptyTableInitialDiscoveryResult.toPosition().snapshotTimestampMs());
+
+ ContinuousEnumerationResult emptyTableSecondDiscoveryResult = splitPlanner
+ .planSplits(emptyTableInitialDiscoveryResult.toPosition());
+ Assert.assertTrue(emptyTableSecondDiscoveryResult.splits().isEmpty());
+ Assert.assertTrue(emptyTableSecondDiscoveryResult.fromPosition().isEmpty());
+ Assert.assertNull(emptyTableSecondDiscoveryResult.fromPosition().snapshotTimestampMs());
+ Assert.assertTrue(emptyTableSecondDiscoveryResult.toPosition().isEmpty());
+ Assert.assertNull(emptyTableSecondDiscoveryResult.toPosition().snapshotTimestampMs());
+
+ // latest mode should discover both snapshots, as latest position is marked by when job starts
+ appendTwoSnapshots();
+ ContinuousEnumerationResult afterTwoSnapshotsAppended = splitPlanner
+ .planSplits(emptyTableSecondDiscoveryResult.toPosition());
+ Assert.assertEquals(2, afterTwoSnapshotsAppended.splits().size());
+
+ // next 3 snapshots
+ IcebergEnumeratorPosition lastPosition = afterTwoSnapshotsAppended.toPosition();
+ for (int i = 0; i < 3; ++i) {
+ lastPosition = verifyOneCycle(splitPlanner, lastPosition);
+ }
+ }
+
+ @Test
+ public void testIncrementalFromLatestSnapshotWithNonEmptyTable() throws Exception {
+ appendTwoSnapshots();
+
+ ScanContext scanContext = ScanContext.builder()
+ .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
+ .build();
+ ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(
+ tableResource.table(), scanContext, null);
+
+ ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
+ Assert.assertNull(initialResult.fromPosition());
+ // For inclusive behavior, the initial result should point to snapshot1
+ // Then the next incremental scan shall discover files from latest snapshot2 (inclusive)
+ Assert.assertEquals(snapshot1.snapshotId(), initialResult.toPosition().snapshotId().longValue());
+ Assert.assertEquals(snapshot1.timestampMillis(), initialResult.toPosition().snapshotTimestampMs().longValue());
+ Assert.assertEquals(0, initialResult.splits().size());
+
+ ContinuousEnumerationResult secondResult = splitPlanner.planSplits(initialResult.toPosition());
+ Assert.assertEquals(snapshot1.snapshotId(), secondResult.fromPosition().snapshotId().longValue());
+ Assert.assertEquals(snapshot1.timestampMillis(), secondResult.fromPosition().snapshotTimestampMs().longValue());
+ Assert.assertEquals(snapshot2.snapshotId(), secondResult.toPosition().snapshotId().longValue());
+ Assert.assertEquals(snapshot2.timestampMillis(), secondResult.toPosition().snapshotTimestampMs().longValue());
+ IcebergSourceSplit split = Iterables.getOnlyElement(secondResult.splits());
+ Assert.assertEquals(1, split.task().files().size());
+ Set<String> discoveredFiles = split.task().files().stream()
+ .map(fileScanTask -> fileScanTask.file().path().toString())
+ .collect(Collectors.toSet());
+ // should discover dataFile2 appended in snapshot2
+ Set<String> expectedFiles = ImmutableSet.of(dataFile2.path().toString());
+ Assert.assertEquals(expectedFiles, discoveredFiles);
+
+ IcebergEnumeratorPosition lastPosition = secondResult.toPosition();
+ for (int i = 0; i < 3; ++i) {
+ lastPosition = verifyOneCycle(splitPlanner, lastPosition);
+ }
+ }
+
+ @Test
+ public void testIncrementalFromEarliestSnapshotWithEmptyTable() throws Exception {
+ ScanContext scanContext = ScanContext.builder()
+ .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ .build();
+ ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(
+ tableResource.table(), scanContext, null);
+
+ ContinuousEnumerationResult emptyTableInitialDiscoveryResult = splitPlanner.planSplits(null);
+ Assert.assertTrue(emptyTableInitialDiscoveryResult.splits().isEmpty());
+ Assert.assertNull(emptyTableInitialDiscoveryResult.fromPosition());
+ Assert.assertNull(emptyTableInitialDiscoveryResult.toPosition().snapshotId());
+ Assert.assertNull(emptyTableInitialDiscoveryResult.toPosition().snapshotTimestampMs());
+
+ ContinuousEnumerationResult emptyTableSecondDiscoveryResult = splitPlanner
+ .planSplits(emptyTableInitialDiscoveryResult.toPosition());
+ Assert.assertTrue(emptyTableSecondDiscoveryResult.splits().isEmpty());
+ Assert.assertNull(emptyTableSecondDiscoveryResult.fromPosition().snapshotId());
+ Assert.assertNull(emptyTableSecondDiscoveryResult.fromPosition().snapshotTimestampMs());
+ Assert.assertNull(emptyTableSecondDiscoveryResult.toPosition().snapshotId());
+ Assert.assertNull(emptyTableSecondDiscoveryResult.toPosition().snapshotTimestampMs());
+
+ // next 3 snapshots
+ IcebergEnumeratorPosition lastPosition = emptyTableSecondDiscoveryResult.toPosition();
+ for (int i = 0; i < 3; ++i) {
+ lastPosition = verifyOneCycle(splitPlanner, lastPosition);
+ }
+ }
+
+ @Test
+ public void testIncrementalFromEarliestSnapshotWithNonEmptyTable() throws Exception {
+ appendTwoSnapshots();
+
+ ScanContext scanContext = ScanContext.builder()
+ .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ .build();
+ ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(
+ tableResource.table(), scanContext, null);
+
+ ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
+ Assert.assertNull(initialResult.fromPosition());
+ // For inclusive behavior, the initial result should point to snapshot1's parent,
+ // which leads to null snapshotId and snapshotTimestampMs.
+ Assert.assertNull(initialResult.toPosition().snapshotId());
+ Assert.assertNull(initialResult.toPosition().snapshotTimestampMs());
+ Assert.assertEquals(0, initialResult.splits().size());
+
+ ContinuousEnumerationResult secondResult = splitPlanner.planSplits(initialResult.toPosition());
+ Assert.assertNull(secondResult.fromPosition().snapshotId());
+ Assert.assertNull(secondResult.fromPosition().snapshotTimestampMs());
+ Assert.assertEquals(snapshot2.snapshotId(), secondResult.toPosition().snapshotId().longValue());
+ Assert.assertEquals(snapshot2.timestampMillis(), secondResult.toPosition().snapshotTimestampMs().longValue());
+ IcebergSourceSplit split = Iterables.getOnlyElement(secondResult.splits());
+ Assert.assertEquals(2, split.task().files().size());
+ Set<String> discoveredFiles = split.task().files().stream()
+ .map(fileScanTask -> fileScanTask.file().path().toString())
+ .collect(Collectors.toSet());
+ // should discover files appended in both snapshot1 and snapshot2
+ Set<String> expectedFiles = ImmutableSet.of(dataFile1.path().toString(), dataFile2.path().toString());
+ Assert.assertEquals(expectedFiles, discoveredFiles);
+
+ IcebergEnumeratorPosition lastPosition = secondResult.toPosition();
+ for (int i = 0; i < 3; ++i) {
+ lastPosition = verifyOneCycle(splitPlanner, lastPosition);
+ }
+ }
+
+ @Test
+ public void testIncrementalFromSnapshotIdWithEmptyTable() throws Exception {
+ ScanContext scanContextWithInvalidSnapshotId = ScanContext.builder()
+ .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID)
+ .startSnapshotId(1L)
+ .build();
+ ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(
+ tableResource.table(), scanContextWithInvalidSnapshotId, null);
+
+ AssertHelpers.assertThrows("Should detect invalid starting snapshot id",
+ IllegalArgumentException.class,
+ "Start snapshot id not found in history: 1",
+ () -> splitPlanner.planSplits(null));
+ }
+
+ @Test
+ public void testIncrementalFromSnapshotIdWithInvalidIds() throws Exception {
+ appendTwoSnapshots();
+
+ // find an invalid snapshotId
+ long invalidSnapshotId = 0L;
+ while (invalidSnapshotId == snapshot1.snapshotId() || invalidSnapshotId == snapshot2.snapshotId()) {
+ invalidSnapshotId++;
+ }
+
+ ScanContext scanContextWithInvalidSnapshotId = ScanContext.builder()
+ .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID)
+ .startSnapshotId(invalidSnapshotId)
+ .build();
+
+ ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(
+ tableResource.table(), scanContextWithInvalidSnapshotId, null);
+
+ AssertHelpers.assertThrows("Should detect invalid starting snapshot id",
+ IllegalArgumentException.class,
+ "Start snapshot id not found in history: " + invalidSnapshotId,
+ () -> splitPlanner.planSplits(null));
+ }
+
+ @Test
+ public void testIncrementalFromSnapshotId() throws Exception {
+ appendTwoSnapshots();
+
+ ScanContext scanContext = ScanContext.builder()
+ .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID)
+ .startSnapshotId(snapshot2.snapshotId())
+ .build();
+ ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(
+ tableResource.table(), scanContext, null);
+
+ ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
+ Assert.assertNull(initialResult.fromPosition());
+ // For inclusive behavior of snapshot2, the initial result should point to snapshot1 (as snapshot2's parent)
+ Assert.assertEquals(snapshot1.snapshotId(), initialResult.toPosition().snapshotId().longValue());
+ Assert.assertEquals(snapshot1.timestampMillis(), initialResult.toPosition().snapshotTimestampMs().longValue());
+ Assert.assertEquals(0, initialResult.splits().size());
+
+ ContinuousEnumerationResult secondResult = splitPlanner.planSplits(initialResult.toPosition());
+ Assert.assertEquals(snapshot1.snapshotId(), secondResult.fromPosition().snapshotId().longValue());
+ Assert.assertEquals(snapshot1.timestampMillis(), secondResult.fromPosition().snapshotTimestampMs().longValue());
+ Assert.assertEquals(snapshot2.snapshotId(), secondResult.toPosition().snapshotId().longValue());
+ Assert.assertEquals(snapshot2.timestampMillis(), secondResult.toPosition().snapshotTimestampMs().longValue());
+ IcebergSourceSplit split = Iterables.getOnlyElement(secondResult.splits());
+ Assert.assertEquals(1, split.task().files().size());
+ Set<String> discoveredFiles = split.task().files().stream()
+ .map(fileScanTask -> fileScanTask.file().path().toString())
+ .collect(Collectors.toSet());
+ // should discover dataFile2 appended in snapshot2
+ Set<String> expectedFiles = ImmutableSet.of(dataFile2.path().toString());
+ Assert.assertEquals(expectedFiles, discoveredFiles);
+
+ IcebergEnumeratorPosition lastPosition = secondResult.toPosition();
+ for (int i = 0; i < 3; ++i) {
+ lastPosition = verifyOneCycle(splitPlanner, lastPosition);
+ }
+ }
+
+ @Test
+ public void testIncrementalFromSnapshotTimestampWithEmptyTable() throws Exception {
+ ScanContext scanContextWithInvalidSnapshotId = ScanContext.builder()
+ .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP)
+ .startSnapshotTimestamp(1L)
+ .build();
+ ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(
+ tableResource.table(), scanContextWithInvalidSnapshotId, null);
+
+ AssertHelpers.assertThrows("Should detect invalid starting snapshot timestamp",
+ IllegalArgumentException.class,
+ "Cannot find a snapshot older than 1970-01-01 00:00:00.001",
+ () -> splitPlanner.planSplits(null));
+ }
+
+ @Test
+ public void testIncrementalFromSnapshotTimestampWithInvalidIds() throws Exception {
+ appendTwoSnapshots();
+
+ long invalidSnapshotTimestampMs = snapshot1.timestampMillis() - 1000L;
+ String invalidSnapshotTimestampMsStr = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS")
+ .withZoneUTC()
+ .print(invalidSnapshotTimestampMs);
+
+ ScanContext scanContextWithInvalidSnapshotId = ScanContext.builder()
+ .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP)
+ .startSnapshotTimestamp(invalidSnapshotTimestampMs)
+ .build();
+
+ ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(
+ tableResource.table(), scanContextWithInvalidSnapshotId, null);
+
+ AssertHelpers.assertThrows("Should detect invalid starting snapshot timestamp",
+ IllegalArgumentException.class,
+ "Cannot find a snapshot older than " + invalidSnapshotTimestampMsStr,
+ () -> splitPlanner.planSplits(null));
+ }
+
+ @Test
+ public void testIncrementalFromSnapshotTimestamp() throws Exception {
+ appendTwoSnapshots();
+
+ ScanContext scanContext = ScanContext.builder()
+ .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP)
+ .startSnapshotTimestamp(snapshot2.timestampMillis())
+ .build();
+ ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(
+ tableResource.table(), scanContext, null);
+
+ ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
+ Assert.assertNull(initialResult.fromPosition());
+ // For inclusive behavior, the initial result should point to snapshot1 (as snapshot2's parent).
+ Assert.assertEquals(snapshot1.snapshotId(), initialResult.toPosition().snapshotId().longValue());
+ Assert.assertEquals(snapshot1.timestampMillis(), initialResult.toPosition().snapshotTimestampMs().longValue());
+ Assert.assertEquals(0, initialResult.splits().size());
+
+ ContinuousEnumerationResult secondResult = splitPlanner.planSplits(initialResult.toPosition());
+ Assert.assertEquals(snapshot1.snapshotId(), secondResult.fromPosition().snapshotId().longValue());
+ Assert.assertEquals(snapshot1.timestampMillis(), secondResult.fromPosition().snapshotTimestampMs().longValue());
+ Assert.assertEquals(snapshot2.snapshotId(), secondResult.toPosition().snapshotId().longValue());
+ Assert.assertEquals(snapshot2.timestampMillis(), secondResult.toPosition().snapshotTimestampMs().longValue());
+ IcebergSourceSplit split = Iterables.getOnlyElement(secondResult.splits());
+ Assert.assertEquals(1, split.task().files().size());
+ Set<String> discoveredFiles = split.task().files().stream()
+ .map(fileScanTask -> fileScanTask.file().path().toString())
+ .collect(Collectors.toSet());
+ // should discover dataFile2 appended in snapshot2
+ Set<String> expectedFiles = ImmutableSet.of(dataFile2.path().toString());
+ Assert.assertEquals(expectedFiles, discoveredFiles);
+
+ IcebergEnumeratorPosition lastPosition = secondResult.toPosition();
+ for (int i = 0; i < 3; ++i) {
+ lastPosition = verifyOneCycle(splitPlanner, lastPosition);
+ }
+ }
+}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImplStartStrategy.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImplStartStrategy.java
new file mode 100644
index 000000000..ef5265340
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImplStartStrategy.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.source.ScanContext;
+import org.apache.iceberg.flink.source.StreamingStartingStrategy;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.RuleChain;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestRule;
+
+public class TestContinuousSplitPlannerImplStartStrategy {
+ private static final FileFormat FILE_FORMAT = FileFormat.PARQUET;
+
+ public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+ public final HadoopTableResource tableResource = new HadoopTableResource(temporaryFolder,
+ TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.SCHEMA);
+ @Rule
+ public final TestRule chain = RuleChain
+ .outerRule(temporaryFolder)
+ .around(tableResource);
+
+ private GenericAppenderHelper dataAppender;
+ private Snapshot snapshot1;
+ private Snapshot snapshot2;
+ private Snapshot snapshot3;
+
+ @Before
+ public void before() throws IOException {
+ dataAppender = new GenericAppenderHelper(tableResource.table(), FILE_FORMAT, temporaryFolder);
+ }
+
+ private void appendThreeSnapshots() throws IOException {
+ List<Record> batch1 = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L);
+ dataAppender.appendToTable(batch1);
+ snapshot1 = tableResource.table().currentSnapshot();
+
+ List<Record> batch2 = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 1L);
+ dataAppender.appendToTable(batch2);
+ snapshot2 = tableResource.table().currentSnapshot();
+
+ List<Record> batch3 = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 2L);
+ dataAppender.appendToTable(batch3);
+ snapshot3 = tableResource.table().currentSnapshot();
+ }
+
+ @Test
+ public void testTableScanThenIncrementalStrategy() throws IOException {
+ ScanContext scanContext = ScanContext.builder()
+ .streaming(true)
+ .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+ .build();
+
+ // emtpy table
+ Assert.assertFalse(ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContext).isPresent());
+
+ appendThreeSnapshots();
+ Snapshot startSnapshot = ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContext).get();
+ Assert.assertEquals(snapshot3.snapshotId(), startSnapshot.snapshotId());
+ }
+
+ @Test
+ public void testForLatestSnapshotStrategy() throws IOException {
+ ScanContext scanContext = ScanContext.builder()
+ .streaming(true)
+ .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
+ .build();
+
+ // emtpy table
+ Assert.assertFalse(ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContext).isPresent());
+
+ appendThreeSnapshots();
+ Snapshot startSnapshot = ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContext).get();
+ Assert.assertEquals(snapshot3.snapshotId(), startSnapshot.snapshotId());
+ }
+
+ @Test
+ public void testForEarliestSnapshotStrategy() throws IOException {
+ ScanContext scanContext = ScanContext.builder()
+ .streaming(true)
+ .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ .build();
+
+ // emtpy table
+ Assert.assertFalse(ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContext).isPresent());
+
+ appendThreeSnapshots();
+ Snapshot startSnapshot = ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContext).get();
+ Assert.assertEquals(snapshot1.snapshotId(), startSnapshot.snapshotId());
+ }
+
+ @Test
+ public void testForSpecificSnapshotIdStrategy() throws IOException {
+ ScanContext scanContextInvalidSnapshotId = ScanContext.builder()
+ .streaming(true)
+ .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID)
+ .startSnapshotId(1L)
+ .build();
+
+ // emtpy table
+ AssertHelpers.assertThrows("Should detect invalid starting snapshot id",
+ IllegalArgumentException.class,
+ "Start snapshot id not found in history: 1",
+ () -> ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContextInvalidSnapshotId));
+
+ appendThreeSnapshots();
+
+ ScanContext scanContext = ScanContext.builder()
+ .streaming(true)
+ .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID)
+ .startSnapshotId(snapshot2.snapshotId())
+ .build();
+
+ Snapshot startSnapshot = ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContext).get();
+ Assert.assertEquals(snapshot2.snapshotId(), startSnapshot.snapshotId());
+ }
+
+ @Test
+ public void testForSpecificSnapshotTimestampStrategySnapshot2() throws IOException {
+ ScanContext scanContextInvalidSnapshotTimestamp = ScanContext.builder()
+ .streaming(true)
+ .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP)
+ .startSnapshotTimestamp(1L)
+ .build();
+
+ // emtpy table
+ AssertHelpers.assertThrows("Should detect invalid starting snapshot timestamp",
+ IllegalArgumentException.class,
+ "Cannot find a snapshot older than 1970-01-01 00:00:00.001",
+ () -> ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContextInvalidSnapshotTimestamp));
+
+ appendThreeSnapshots();
+
+ ScanContext scanContext = ScanContext.builder()
+ .streaming(true)
+ .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP)
+ .startSnapshotTimestamp(snapshot2.timestampMillis())
+ .build();
+
+ Snapshot startSnapshot = ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContext).get();
+ Assert.assertEquals(snapshot2.snapshotId(), startSnapshot.snapshotId());
+ }
+
+ @Test
+ public void testForSpecificSnapshotTimestampStrategySnapshot2Minus1() throws IOException {
+ appendThreeSnapshots();
+
+ ScanContext config = ScanContext.builder()
+ .streaming(true)
+ .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP)
+ .startSnapshotTimestamp(snapshot2.timestampMillis() - 1L)
+ .build();
+
+ Snapshot startSnapshot = ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), config).get();
+ Assert.assertEquals(snapshot2.snapshotId(), startSnapshot.snapshotId());
+ }
+}