You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/06/24 23:43:21 UTC
[iceberg] branch master updated: Flink: FLIP-27 Iceberg source and builder (#5109)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 35b8558d7 Flink: FLIP-27 Iceberg source and builder (#5109)
35b8558d7 is described below
commit 35b8558d7c0b5564fb3d47f92721534f45850cd6
Author: Steven Zhen Wu <st...@gmail.com>
AuthorDate: Fri Jun 24 16:43:16 2022 -0700
Flink: FLIP-27 Iceberg source and builder (#5109)
---
.../iceberg/flink/data/FlinkParquetReaders.java | 8 +-
.../apache/iceberg/flink/source/IcebergSource.java | 290 +++++++++++++++++
.../apache/iceberg/flink/source/ScanContext.java | 2 +-
.../flink/source/assigner/SimpleSplitAssigner.java | 14 +-
.../enumerator/ContinuousSplitPlannerImpl.java | 8 +-
.../org/apache/iceberg/flink/SimpleDataUtil.java | 55 ++++
.../org/apache/iceberg/flink/TestFixtures.java | 11 +
.../iceberg/flink/data/RowDataToRowMapper.java | 51 +++
.../flink/source/TestIcebergSourceBounded.java | 126 ++++++++
.../flink/source/TestIcebergSourceContinuous.java | 353 +++++++++++++++++++++
.../flink/source/TestIcebergSourceFailover.java | 299 +++++++++++++++++
.../source/TestIcebergSourceReaderDeletes.java | 115 +++++++
12 files changed, 1321 insertions(+), 11 deletions(-)
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
index b0fb3538e..30184d899 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
@@ -24,10 +24,12 @@ import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.ZoneOffset;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.RawValueData;
@@ -482,8 +484,10 @@ public class FlinkParquetReaders {
@Override
protected ArrayData buildList(ReusableArrayData list) {
- list.setNumElements(writePos);
- return list;
+ // Since ReusableArrayData is not accepted by Flink, use GenericArrayData temporarily to walk around it.
+ // Revert this to use ReusableArrayData once it is fixed in Flink.
+ // For your reference, https://issues.apache.org/jira/browse/FLINK-25238.
+ return new GenericArrayData(Arrays.copyOf(list.values, writePos));
}
}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
new file mode 100644
index 000000000..1d604cf50
--- /dev/null
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.source.assigner.SplitAssigner;
+import org.apache.iceberg.flink.source.assigner.SplitAssignerFactory;
+import org.apache.iceberg.flink.source.enumerator.ContinuousIcebergEnumerator;
+import org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlanner;
+import org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlannerImpl;
+import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorState;
+import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer;
+import org.apache.iceberg.flink.source.enumerator.StaticIcebergEnumerator;
+import org.apache.iceberg.flink.source.reader.IcebergSourceReader;
+import org.apache.iceberg.flink.source.reader.ReaderFunction;
+import org.apache.iceberg.flink.source.reader.ReaderMetricsContext;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Experimental
+public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEnumeratorState> {
+ private static final Logger LOG = LoggerFactory.getLogger(IcebergSource.class);
+
+ private final TableLoader tableLoader;
+ private final ScanContext scanContext;
+ private final ReaderFunction<T> readerFunction;
+ private final SplitAssignerFactory assignerFactory;
+
+ IcebergSource(
+ TableLoader tableLoader,
+ ScanContext scanContext,
+ ReaderFunction<T> readerFunction,
+ SplitAssignerFactory assignerFactory) {
+ this.tableLoader = tableLoader;
+ this.scanContext = scanContext;
+ this.readerFunction = readerFunction;
+ this.assignerFactory = assignerFactory;
+ }
+
+ private static Table loadTable(TableLoader tableLoader) {
+ tableLoader.open();
+ try (TableLoader loader = tableLoader) {
+ return loader.loadTable();
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to close table loader", e);
+ }
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return scanContext.isStreaming() ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED;
+ }
+
+ @Override
+ public SourceReader<T, IcebergSourceSplit> createReader(SourceReaderContext readerContext) {
+ ReaderMetricsContext readerMetrics =
+ new ReaderMetricsContext(readerContext.metricGroup());
+ return new IcebergSourceReader<>(readerFunction, readerContext, readerMetrics);
+ }
+
+ @Override
+ public SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumerator(
+ SplitEnumeratorContext<IcebergSourceSplit> enumContext) {
+ return createEnumerator(enumContext, null);
+ }
+
+ @Override
+ public SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> restoreEnumerator(
+ SplitEnumeratorContext<IcebergSourceSplit> enumContext, IcebergEnumeratorState enumState) {
+ return createEnumerator(enumContext, enumState);
+ }
+
+ @Override
+ public SimpleVersionedSerializer<IcebergSourceSplit> getSplitSerializer() {
+ return IcebergSourceSplitSerializer.INSTANCE;
+ }
+
+ @Override
+ public SimpleVersionedSerializer<IcebergEnumeratorState> getEnumeratorCheckpointSerializer() {
+ return IcebergEnumeratorStateSerializer.INSTANCE;
+ }
+
+ private SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumerator(
+ SplitEnumeratorContext<IcebergSourceSplit> enumContext,
+ @Nullable IcebergEnumeratorState enumState) {
+ Table table = loadTable(tableLoader);
+ SplitAssigner assigner;
+ if (enumState == null) {
+ assigner = assignerFactory.createAssigner();
+ } else {
+ LOG.info("Iceberg source restored {} splits from state for table {}",
+ enumState.pendingSplits().size(), table.name());
+ assigner = assignerFactory.createAssigner(enumState.pendingSplits());
+ }
+
+ if (scanContext.isStreaming()) {
+ // Ideally, operatorId should be used as the threadPoolName as Flink guarantees its uniqueness within a job.
+ // SplitEnumeratorContext doesn't expose the OperatorCoordinator.Context, which would contain the OperatorID.
+ // Need to discuss with Flink community whether it is ok to expose a public API like the protected method
+ // "OperatorCoordinator.Context getCoordinatorContext()" from SourceCoordinatorContext implementation.
+ // For now, <table name>-<random UUID> is used as the unique thread pool name.
+ ContinuousSplitPlanner splitPlanner = new ContinuousSplitPlannerImpl(
+ table, scanContext, table.name() + "-" + UUID.randomUUID());
+ return new ContinuousIcebergEnumerator(enumContext, assigner, scanContext, splitPlanner, enumState);
+ } else {
+ return new StaticIcebergEnumerator(enumContext, assigner, table, scanContext, enumState);
+ }
+ }
+
+ public static <T> Builder<T> builder() {
+ return new Builder<>();
+ }
+
+ public static class Builder<T> {
+
+ // required
+ private TableLoader tableLoader;
+ private SplitAssignerFactory splitAssignerFactory;
+ private ReaderFunction<T> readerFunction;
+
+ // optional
+ private final ScanContext.Builder contextBuilder = ScanContext.builder();
+
+ Builder() {
+ }
+
+ public Builder<T> tableLoader(TableLoader loader) {
+ this.tableLoader = loader;
+ return this;
+ }
+
+ public Builder<T> assignerFactory(SplitAssignerFactory assignerFactory) {
+ this.splitAssignerFactory = assignerFactory;
+ return this;
+ }
+
+ public Builder<T> readerFunction(ReaderFunction<T> newReaderFunction) {
+ this.readerFunction = newReaderFunction;
+ return this;
+ }
+
+ public Builder caseSensitive(boolean newCaseSensitive) {
+ this.contextBuilder.caseSensitive(newCaseSensitive);
+ return this;
+ }
+
+ public Builder useSnapshotId(Long newSnapshotId) {
+ this.contextBuilder.useSnapshotId(newSnapshotId);
+ return this;
+ }
+
+ public Builder streamingStartingStrategy(StreamingStartingStrategy newStartingStrategy) {
+ this.contextBuilder.startingStrategy(newStartingStrategy);
+ return this;
+ }
+
+ public Builder startSnapshotTimestamp(Long newStartSnapshotTimestamp) {
+ this.contextBuilder.startSnapshotTimestamp(newStartSnapshotTimestamp);
+ return this;
+ }
+
+ public Builder startSnapshotId(Long newStartSnapshotId) {
+ this.contextBuilder.startSnapshotId(newStartSnapshotId);
+ return this;
+ }
+
+ public Builder endSnapshotId(Long newEndSnapshotId) {
+ this.contextBuilder.endSnapshotId(newEndSnapshotId);
+ return this;
+ }
+
+ public Builder asOfTimestamp(Long newAsOfTimestamp) {
+ this.contextBuilder.asOfTimestamp(newAsOfTimestamp);
+ return this;
+ }
+
+ public Builder splitSize(Long newSplitSize) {
+ this.contextBuilder.splitSize(newSplitSize);
+ return this;
+ }
+
+ public Builder splitLookback(Integer newSplitLookback) {
+ this.contextBuilder.splitLookback(newSplitLookback);
+ return this;
+ }
+
+ public Builder splitOpenFileCost(Long newSplitOpenFileCost) {
+ this.contextBuilder.splitOpenFileCost(newSplitOpenFileCost);
+ return this;
+ }
+
+ public Builder streaming(boolean streaming) {
+ this.contextBuilder.streaming(streaming);
+ return this;
+ }
+
+ public Builder monitorInterval(Duration newMonitorInterval) {
+ this.contextBuilder.monitorInterval(newMonitorInterval);
+ return this;
+ }
+
+ public Builder nameMapping(String newNameMapping) {
+ this.contextBuilder.nameMapping(newNameMapping);
+ return this;
+ }
+
+ public Builder project(Schema newProjectedSchema) {
+ this.contextBuilder.project(newProjectedSchema);
+ return this;
+ }
+
+ public Builder filters(List<Expression> newFilters) {
+ this.contextBuilder.filters(newFilters);
+ return this;
+ }
+
+ public Builder limit(long newLimit) {
+ this.contextBuilder.limit(newLimit);
+ return this;
+ }
+
+ public Builder includeColumnStats(boolean newIncludeColumnStats) {
+ this.contextBuilder.includeColumnStats(newIncludeColumnStats);
+ return this;
+ }
+
+ public Builder planParallelism(int planParallelism) {
+ this.contextBuilder.planParallelism(planParallelism);
+ return this;
+ }
+
+ public Builder properties(Map<String, String> properties) {
+ contextBuilder.fromProperties(properties);
+ return this;
+ }
+
+ public IcebergSource<T> build() {
+ checkRequired();
+ return new IcebergSource<T>(
+ tableLoader,
+ contextBuilder.build(),
+ readerFunction,
+ splitAssignerFactory);
+ }
+
+ private void checkRequired() {
+ Preconditions.checkNotNull(tableLoader, "tableLoader is required.");
+ Preconditions.checkNotNull(splitAssignerFactory, "assignerFactory is required.");
+ Preconditions.checkNotNull(readerFunction, "readerFunction is required.");
+ }
+ }
+}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index 905205c88..2fe47cc7f 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -163,7 +163,7 @@ public class ScanContext implements Serializable {
return snapshotId;
}
- public StreamingStartingStrategy startingStrategy() {
+ public StreamingStartingStrategy streamingStartingStrategy() {
return startingStrategy;
}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssigner.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssigner.java
index 51ce3d113..ed70ad377 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssigner.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssigner.java
@@ -63,14 +63,20 @@ public class SimpleSplitAssigner implements SplitAssigner {
@Override
public void onDiscoveredSplits(Collection<IcebergSourceSplit> splits) {
- pendingSplits.addAll(splits);
- completeAvailableFuturesIfNeeded();
+ addSplits(splits);
}
@Override
public void onUnassignedSplits(Collection<IcebergSourceSplit> splits) {
- pendingSplits.addAll(splits);
- completeAvailableFuturesIfNeeded();
+ addSplits(splits);
+ }
+
+ private void addSplits(Collection<IcebergSourceSplit> splits) {
+ if (!splits.isEmpty()) {
+ pendingSplits.addAll(splits);
+ // only complete pending future if new splits are discovered
+ completeAvailableFuturesIfNeeded();
+ }
}
/**
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
index 4d6e89684..2bbaaf940 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
@@ -123,10 +123,10 @@ public class ContinuousSplitPlannerImpl implements ContinuousSplitPlanner {
Snapshot startSnapshot = startSnapshotOptional.get();
LOG.info("Get starting snapshot id {} based on strategy {}",
- startSnapshot.snapshotId(), scanContext.startingStrategy());
+ startSnapshot.snapshotId(), scanContext.streamingStartingStrategy());
List<IcebergSourceSplit> splits;
IcebergEnumeratorPosition toPosition;
- if (scanContext.startingStrategy() == StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) {
+ if (scanContext.streamingStartingStrategy() == StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) {
// do a batch table scan first
splits = FlinkSplitPlanner.planIcebergSourceSplits(table, scanContext, workerPool);
LOG.info("Discovered {} splits from initial batch table scan with snapshot Id {}",
@@ -161,7 +161,7 @@ public class ContinuousSplitPlannerImpl implements ContinuousSplitPlanner {
*/
@VisibleForTesting
static Optional<Snapshot> startSnapshot(Table table, ScanContext scanContext) {
- switch (scanContext.startingStrategy()) {
+ switch (scanContext.streamingStartingStrategy()) {
case TABLE_SCAN_THEN_INCREMENTAL:
case INCREMENTAL_FROM_LATEST_SNAPSHOT:
return Optional.ofNullable(table.currentSnapshot());
@@ -183,7 +183,7 @@ public class ContinuousSplitPlannerImpl implements ContinuousSplitPlanner {
return Optional.of(SnapshotUtil.snapshotAfter(table, snapshotIdAsOfTime));
}
default:
- throw new IllegalArgumentException("Unknown starting strategy: " + scanContext.startingStrategy());
+ throw new IllegalArgumentException("Unknown starting strategy: " + scanContext.streamingStartingStrategy());
}
}
}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
index 92e935f25..a6f70453b 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
@@ -20,6 +20,7 @@
package org.apache.iceberg.flink;
import java.io.IOException;
+import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -193,6 +194,60 @@ public class SimpleDataUtil {
assertTableRecords(table, convertToRecords(expected));
}
+ /**
+ * Get all rows for a table
+ */
+ public static List<Record> tableRecords(Table table) throws IOException {
+ table.refresh();
+ List<Record> records = Lists.newArrayList();
+ try (CloseableIterable<Record> iterable = IcebergGenerics.read(table).build()) {
+ for (Record record : iterable) {
+ records.add(record);
+ }
+ }
+ return records;
+ }
+
+ private static boolean equalsRecords(List<Record> expected, List<Record> actual, Schema schema) {
+ if (expected.size() != actual.size()) {
+ return false;
+ }
+ Types.StructType type = schema.asStruct();
+ StructLikeSet expectedSet = StructLikeSet.create(type);
+ expectedSet.addAll(expected);
+ StructLikeSet actualSet = StructLikeSet.create(type);
+ actualSet.addAll(actual);
+ return expectedSet.equals(actualSet);
+ }
+
+ private static void assertRecordsEqual(List<Record> expected, List<Record> actual, Schema schema) {
+ Assert.assertEquals(expected.size(), actual.size());
+ Types.StructType type = schema.asStruct();
+ StructLikeSet expectedSet = StructLikeSet.create(type);
+ expectedSet.addAll(expected);
+ StructLikeSet actualSet = StructLikeSet.create(type);
+ actualSet.addAll(actual);
+ Assert.assertEquals(expectedSet, actualSet);
+ }
+
+ /**
+ * Assert table contains the expected list of records after
+ * waiting up to {@code maxCheckCount} with {@code checkInterval}
+ */
+ public static void assertTableRecords(
+ Table table, List<Record> expected, Duration checkInterval, int maxCheckCount)
+ throws IOException, InterruptedException {
+ for (int i = 0; i < maxCheckCount; ++i) {
+ if (equalsRecords(expected, tableRecords(table), table.schema())) {
+ break;
+ } else {
+ Thread.sleep(checkInterval.toMillis());
+ }
+ }
+ // success or failure, assert on the latest table state
+ assertRecordsEqual(tableRecords(table), expected, table.schema());
+ }
+
public static void assertTableRecords(Table table, List<Record> expected) throws IOException {
table.refresh();
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFixtures.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFixtures.java
index 71532c594..c3c280cec 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFixtures.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFixtures.java
@@ -47,6 +47,17 @@ public class TestFixtures {
public static final String DATABASE = "default";
public static final String TABLE = "t";
+ public static final String SINK_TABLE = "t_sink";
public static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(DATABASE, TABLE);
+
+ public static final Schema TS_SCHEMA = new Schema(
+ required(1, "ts", Types.TimestampType.withoutZone()),
+ required(2, "str", Types.StringType.get()));
+
+ public static final PartitionSpec TS_SPEC = PartitionSpec.builderFor(TS_SCHEMA)
+ .hour("ts")
+ .build();
+
+ public static final RowType TS_ROW_TYPE = FlinkSchemaUtil.convert(TS_SCHEMA);
}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/data/RowDataToRowMapper.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/data/RowDataToRowMapper.java
new file mode 100644
index 000000000..549a6ed3a
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/data/RowDataToRowMapper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.data;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
+
+public class RowDataToRowMapper extends RichMapFunction<RowData, Row> {
+
+ private final RowType rowType;
+
+ private transient DataStructureConverter<Object, Object> converter;
+
+ public RowDataToRowMapper(RowType rowType) {
+ this.rowType = rowType;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ this.converter = DataStructureConverters.getConverter(
+ TypeConversions.fromLogicalToDataType(rowType));
+ }
+
+ @Override
+ public Row map(RowData value) throws Exception {
+ return (Row) converter.toExternal(value);
+ }
+}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
new file mode 100644
index 000000000..5c8243ada
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.data.RowDataToRowMapper;
+import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
+import org.apache.iceberg.flink.source.reader.RowDataReaderFunction;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestIcebergSourceBounded extends TestFlinkScan {
+
+ public TestIcebergSourceBounded(String fileFormat) {
+ super(fileFormat);
+ }
+
+ @Override
+ protected List<Row> runWithProjection(String... projected) throws Exception {
+ Schema icebergTableSchema = catalog.loadTable(TestFixtures.TABLE_IDENTIFIER).schema();
+ TableSchema.Builder builder = TableSchema.builder();
+ TableSchema schema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergTableSchema));
+ for (String field : projected) {
+ TableColumn column = schema.getTableColumn(field).get();
+ builder.field(column.getName(), column.getType());
+ }
+ TableSchema flinkSchema = builder.build();
+ Schema projectedSchema = FlinkSchemaUtil.convert(icebergTableSchema, flinkSchema);
+ return run(projectedSchema, null, null);
+ }
+
+ @Override
+ protected List<Row> runWithFilter(Expression filter, String sqlFilter) throws Exception {
+ return run(null, Arrays.asList(filter), null);
+ }
+
+ @Override
+ protected List<Row> runWithOptions(Map<String, String> options) throws Exception {
+ return run(null, null, options);
+ }
+
+ @Override
+ protected List<Row> run() throws Exception {
+ return run(null, null, null);
+ }
+
+ private List<Row> run(Schema projectedSchema, List<Expression> filters,
+ Map<String, String> options) throws Exception {
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ Configuration config = new Configuration();
+ config.setInteger(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 128);
+ Table table;
+ try (TableLoader tableLoader = tableLoader()) {
+ tableLoader.open();
+ table = tableLoader.loadTable();
+ }
+
+ IcebergSource.Builder<RowData> sourceBuilder = IcebergSource.<RowData>builder()
+ .tableLoader(tableLoader())
+ .assignerFactory(new SimpleSplitAssignerFactory())
+ .readerFunction(new RowDataReaderFunction(config, table.schema(), projectedSchema,
+ null, false, table.io(), table.encryption()));
+ if (projectedSchema != null) {
+ sourceBuilder.project(projectedSchema);
+ }
+ if (filters != null) {
+ sourceBuilder.filters(filters);
+ }
+ if (options != null) {
+ sourceBuilder.properties(options);
+ }
+
+ DataStream<Row> stream = env.fromSource(
+ sourceBuilder.build(),
+ WatermarkStrategy.noWatermarks(),
+ "testBasicRead",
+ TypeInformation.of(RowData.class))
+ .map(new RowDataToRowMapper(FlinkSchemaUtil.convert(
+ projectedSchema == null ? table.schema() : projectedSchema)));
+
+ try (CloseableIterator<Row> iter = stream.executeAndCollect()) {
+ return Lists.newArrayList(iter);
+ }
+ }
+
+}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
new file mode 100644
index 000000000..80475ee4f
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.flink.data.RowDataToRowMapper;
+import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
+import org.apache.iceberg.flink.source.reader.RowDataReaderFunction;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceContinuous {
+
+ @ClassRule
+ public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+ MiniClusterResource.createWithClassloaderCheckDisabled();
+
+ @ClassRule
+ public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+ @Rule
+ public final HadoopTableResource tableResource = new HadoopTableResource(TEMPORARY_FOLDER,
+ TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.SCHEMA);
+
+ private final AtomicLong randomSeed = new AtomicLong(0L);
+
+ @Test
+ public void testTableScanThenIncremental() throws Exception {
+ GenericAppenderHelper dataAppender = new GenericAppenderHelper(
+ tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
+
+ // snapshot1
+ List<Record> batch1 = RandomGenericData.generate(
+ tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+ dataAppender.appendToTable(batch1);
+
+ ScanContext scanContext = ScanContext.builder()
+ .streaming(true)
+ .monitorInterval(Duration.ofMillis(10L))
+ .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+ .build();
+
+ try (CloseableIterator<Row> iter = createStream(scanContext).executeAndCollect(getClass().getSimpleName())) {
+ List<Row> result1 = waitForResult(iter, 2);
+ TestHelpers.assertRecords(result1, batch1, tableResource.table().schema());
+
+ // snapshot2
+ List<Record> batch2 = RandomGenericData.generate(
+ tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+ dataAppender.appendToTable(batch2);
+ tableResource.table().currentSnapshot().snapshotId();
+
+ List<Row> result2 = waitForResult(iter, 2);
+ TestHelpers.assertRecords(result2, batch2, tableResource.table().schema());
+
+ // snapshot3
+ List<Record> batch3 = RandomGenericData.generate(
+ tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+ dataAppender.appendToTable(batch3);
+ tableResource.table().currentSnapshot().snapshotId();
+
+ List<Row> result3 = waitForResult(iter, 2);
+ TestHelpers.assertRecords(result3, batch3, tableResource.table().schema());
+ }
+ }
+
+ @Test
+ public void testEarliestSnapshot() throws Exception {
+ GenericAppenderHelper dataAppender = new GenericAppenderHelper(
+ tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
+
+ // snapshot0
+ List<Record> batch0 = RandomGenericData.generate(
+ tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+ dataAppender.appendToTable(batch0);
+
+ // snapshot1
+ List<Record> batch1 = RandomGenericData.generate(
+ tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+ dataAppender.appendToTable(batch1);
+
+ ScanContext scanContext = ScanContext.builder()
+ .streaming(true)
+ .monitorInterval(Duration.ofMillis(10L))
+ .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+ .build();
+
+ try (CloseableIterator<Row> iter = createStream(scanContext).executeAndCollect(getClass().getSimpleName())) {
+ List<Row> result1 = waitForResult(iter, 4);
+ List<Record> combinedBatch0AndBatch1 = Lists.newArrayList(batch0);
+ combinedBatch0AndBatch1.addAll(batch1);
+ TestHelpers.assertRecords(result1, combinedBatch0AndBatch1, tableResource.table().schema());
+
+ // snapshot2
+ List<Record> batch2 = RandomGenericData.generate(
+ tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+ dataAppender.appendToTable(batch2);
+
+ List<Row> result2 = waitForResult(iter, 2);
+ TestHelpers.assertRecords(result2, batch2, tableResource.table().schema());
+
+ // snapshot3
+ List<Record> batch3 = RandomGenericData.generate(
+ tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+ dataAppender.appendToTable(batch3);
+
+ List<Row> result3 = waitForResult(iter, 2);
+ TestHelpers.assertRecords(result3, batch3, tableResource.table().schema());
+ }
+ }
+
+ @Test
+ public void testLatestSnapshot() throws Exception {
+ GenericAppenderHelper dataAppender = new GenericAppenderHelper(
+ tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
+
+ // snapshot0
+ List<Record> batch0 = RandomGenericData.generate(
+ tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+ dataAppender.appendToTable(batch0);
+
+ // snapshot1
+ List<Record> batch1 = RandomGenericData.generate(
+ tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+ dataAppender.appendToTable(batch1);
+
+ ScanContext scanContext = ScanContext.builder()
+ .streaming(true)
+ .monitorInterval(Duration.ofMillis(10L))
+ .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
+ .build();
+
+ try (CloseableIterator<Row> iter = createStream(scanContext).executeAndCollect(getClass().getSimpleName())) {
+ // we want to make sure job is running first so that enumerator can
+ // start from the latest snapshot before inserting the next batch2 below.
+ waitUntilJobIsRunning(MINI_CLUSTER_RESOURCE.getClusterClient());
+
+ // inclusive behavior for starting snapshot
+ List<Row> result1 = waitForResult(iter, 2);
+ TestHelpers.assertRecords(result1, batch1, tableResource.table().schema());
+
+ // snapshot2
+ List<Record> batch2 = RandomGenericData.generate(
+ tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+ dataAppender.appendToTable(batch2);
+
+ List<Row> result2 = waitForResult(iter, 2);
+ TestHelpers.assertRecords(result2, batch2, tableResource.table().schema());
+
+ // snapshot3
+ List<Record> batch3 = RandomGenericData.generate(
+ tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+ dataAppender.appendToTable(batch3);
+
+ List<Row> result3 = waitForResult(iter, 2);
+ TestHelpers.assertRecords(result3, batch3, tableResource.table().schema());
+ }
+ }
+
+ @Test
+ public void testSpecificSnapshotId() throws Exception {
+ GenericAppenderHelper dataAppender = new GenericAppenderHelper(
+ tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
+
+ // snapshot0
+ List<Record> batch0 = RandomGenericData.generate(
+ tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+ dataAppender.appendToTable(batch0);
+ long snapshot0 = tableResource.table().currentSnapshot().snapshotId();
+
+ // snapshot1
+ List<Record> batch1 = RandomGenericData.generate(
+ tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+ dataAppender.appendToTable(batch1);
+ long snapshot1 = tableResource.table().currentSnapshot().snapshotId();
+
+ ScanContext scanContext = ScanContext.builder()
+ .streaming(true)
+ .monitorInterval(Duration.ofMillis(10L))
+ .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID)
+ .startSnapshotId(snapshot1)
+ .build();
+
+ try (CloseableIterator<Row> iter = createStream(scanContext).executeAndCollect(getClass().getSimpleName())) {
+ List<Row> result1 = waitForResult(iter, 2);
+ TestHelpers.assertRecords(result1, batch1, tableResource.table().schema());
+
+ // snapshot2
+ List<Record> batch2 = RandomGenericData.generate(
+ tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+ dataAppender.appendToTable(batch2);
+
+ List<Row> result2 = waitForResult(iter, 2);
+ TestHelpers.assertRecords(result2, batch2, tableResource.table().schema());
+
+ // snapshot3
+ List<Record> batch3 = RandomGenericData.generate(
+ tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+ dataAppender.appendToTable(batch3);
+
+ List<Row> result3 = waitForResult(iter, 2);
+ TestHelpers.assertRecords(result3, batch3, tableResource.table().schema());
+ }
+ }
+
+ @Test
+ public void testSpecificSnapshotTimestamp() throws Exception {
+ GenericAppenderHelper dataAppender = new GenericAppenderHelper(
+ tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
+
+ // snapshot0
+ List<Record> batch0 = RandomGenericData.generate(
+ tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+ dataAppender.appendToTable(batch0);
+ long snapshot0Timestamp = tableResource.table().currentSnapshot().timestampMillis();
+
+ // sleep for 2 ms to make sure snapshot1 has a higher timestamp value
+ Thread.sleep(2);
+
+ // snapshot1
+ List<Record> batch1 = RandomGenericData.generate(
+ tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+ dataAppender.appendToTable(batch1);
+ long snapshot1Timestamp = tableResource.table().currentSnapshot().timestampMillis();
+
+ ScanContext scanContext = ScanContext.builder()
+ .streaming(true)
+ .monitorInterval(Duration.ofMillis(10L))
+ .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP)
+ .startSnapshotTimestamp(snapshot1Timestamp)
+ .build();
+
+ try (CloseableIterator<Row> iter = createStream(scanContext).executeAndCollect(getClass().getSimpleName())) {
+ // consume data from snapshot1
+ List<Row> result1 = waitForResult(iter, 2);
+ TestHelpers.assertRecords(result1, batch1, tableResource.table().schema());
+
+ // snapshot2
+ List<Record> batch2 = RandomGenericData.generate(
+ tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+ dataAppender.appendToTable(batch2);
+
+ List<Row> result2 = waitForResult(iter, 2);
+ TestHelpers.assertRecords(result2, batch2, tableResource.table().schema());
+
+ // snapshot3
+ List<Record> batch3 = RandomGenericData.generate(
+ tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+ dataAppender.appendToTable(batch3);
+
+ List<Row> result3 = waitForResult(iter, 2);
+ TestHelpers.assertRecords(result3, batch3, tableResource.table().schema());
+ }
+ }
+
+ private DataStream<Row> createStream(ScanContext scanContext) throws Exception {
+ Table table = tableResource.table();
+ Configuration config = new Configuration();
+ // start the source and collect output
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ DataStream<Row> stream = env.fromSource(
+ IcebergSource.<RowData>builder()
+ .tableLoader(tableResource.tableLoader())
+ .assignerFactory(new SimpleSplitAssignerFactory())
+ .readerFunction(new RowDataReaderFunction(config, table.schema(), null,
+ null, false, table.io(), table.encryption()))
+ .streaming(scanContext.isStreaming())
+ .streamingStartingStrategy(scanContext.streamingStartingStrategy())
+ .startSnapshotTimestamp(scanContext.startSnapshotTimestamp())
+ .startSnapshotId(scanContext.startSnapshotId())
+ .monitorInterval(Duration.ofMillis(10L))
+ .build(),
+ WatermarkStrategy.noWatermarks(),
+ "icebergSource",
+ TypeInformation.of(RowData.class))
+ .map(new RowDataToRowMapper(FlinkSchemaUtil.convert(tableResource.table().schema())));
+ return stream;
+ }
+
+ public static List<Row> waitForResult(CloseableIterator<Row> iter, int limit) {
+ List<Row> results = Lists.newArrayListWithCapacity(limit);
+ while (results.size() < limit) {
+ if (iter.hasNext()) {
+ results.add(iter.next());
+ } else {
+ break;
+ }
+ }
+ return results;
+ }
+
+ public static void waitUntilJobIsRunning(ClusterClient<?> client) throws Exception {
+ while (getRunningJobs(client).isEmpty()) {
+ Thread.sleep(10);
+ }
+ }
+
+ public static List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception {
+ Collection<JobStatusMessage> statusMessages = client.listJobs().get();
+ return statusMessages.stream()
+ .filter(status -> status.getJobState() == JobStatus.RUNNING)
+ .map(JobStatusMessage::getJobId)
+ .collect(Collectors.toList());
+ }
+}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
new file mode 100644
index 000000000..f6f522c9d
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.sink.FlinkSink;
+import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
+import org.apache.iceberg.flink.source.reader.RowDataReaderFunction;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceFailover {
+
+ private static final int PARALLELISM = 4;
+
+ @ClassRule
+ public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+
+ @Rule
+ public final MiniClusterWithClientResource miniClusterResource =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+ .withHaLeadershipControl()
+ .build());
+
+ @Rule
+ public final HadoopTableResource sourceTableResource = new HadoopTableResource(TEMPORARY_FOLDER,
+ TestFixtures.DATABASE, TestFixtures.TABLE, schema());
+
+ @Rule
+ public final HadoopTableResource sinkTableResource = new HadoopTableResource(TEMPORARY_FOLDER,
+ TestFixtures.DATABASE, TestFixtures.SINK_TABLE, schema());
+
+ protected IcebergSource.Builder<RowData> sourceBuilder() {
+ Table sourceTable = sourceTableResource.table();
+ Configuration config = new Configuration();
+ config.setInteger(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 128);
+ return IcebergSource.<RowData>builder()
+ .tableLoader(sourceTableResource.tableLoader())
+ .assignerFactory(new SimpleSplitAssignerFactory())
+ .readerFunction(new RowDataReaderFunction(config, sourceTable.schema(), null,
+ null, false, sourceTable.io(), sourceTable.encryption()));
+ }
+
+ protected Schema schema() {
+ return TestFixtures.SCHEMA;
+ }
+
+ protected List<Record> generateRecords(int numRecords, long seed) {
+ return RandomGenericData.generate(schema(), numRecords, seed);
+ }
+
+ protected void assertRecords(Table table, List<Record> expectedRecords, Duration interval, int maxCount)
+ throws Exception {
+ SimpleDataUtil.assertTableRecords(table,
+ expectedRecords, interval, maxCount);
+ }
+
+ @Test
+ public void testBoundedWithTaskManagerFailover() throws Exception {
+ testBoundedIcebergSource(FailoverType.TM);
+ }
+
+ @Test
+ public void testBoundedWithJobManagerFailover() throws Exception {
+ testBoundedIcebergSource(FailoverType.JM);
+ }
+
+ private void testBoundedIcebergSource(FailoverType failoverType) throws Exception {
+ List<Record> expectedRecords = Lists.newArrayList();
+ GenericAppenderHelper dataAppender = new GenericAppenderHelper(
+ sourceTableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
+ for (int i = 0; i < 4; ++i) {
+ List<Record> records = generateRecords(2, i);
+ expectedRecords.addAll(records);
+ dataAppender.appendToTable(records);
+ }
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+
+ DataStream<RowData> stream = env.fromSource(
+ sourceBuilder().build(),
+ WatermarkStrategy.noWatermarks(),
+ "IcebergSource",
+ TypeInformation.of(RowData.class));
+
+ DataStream<RowData> streamFailingInTheMiddleOfReading =
+ RecordCounterToFail.wrapWithFailureAfter(stream, expectedRecords.size() / 2);
+
+ // CollectStreamSink from DataStream#executeAndCollect() doesn't guarantee
+ // exactly-once behavior. When Iceberg sink, we can verify end-to-end
+ // exactly-once. Here we mainly about source exactly-once behavior.
+ FlinkSink.forRowData(streamFailingInTheMiddleOfReading)
+ .table(sinkTableResource.table())
+ .tableLoader(sinkTableResource.tableLoader())
+ .append();
+
+ JobClient jobClient = env.executeAsync("Bounded Iceberg Source Failover Test");
+ JobID jobId = jobClient.getJobID();
+
+ RecordCounterToFail.waitToFail();
+ triggerFailover(
+ failoverType,
+ jobId,
+ RecordCounterToFail::continueProcessing,
+ miniClusterResource.getMiniCluster());
+
+ assertRecords(sinkTableResource.table(), expectedRecords, Duration.ofMillis(10), 12000);
+ }
+
+ @Test
+ public void testContinuousWithTaskManagerFailover() throws Exception {
+ testContinuousIcebergSource(FailoverType.TM);
+ }
+
+ @Test
+ public void testContinuousWithJobManagerFailover() throws Exception {
+ testContinuousIcebergSource(FailoverType.JM);
+ }
+
+ private void testContinuousIcebergSource(FailoverType failoverType) throws Exception {
+ GenericAppenderHelper dataAppender = new GenericAppenderHelper(
+ sourceTableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
+ List<Record> expectedRecords = Lists.newArrayList();
+
+ List<Record> batch = generateRecords(2, 0);
+ expectedRecords.addAll(batch);
+ dataAppender.appendToTable(batch);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+ env.enableCheckpointing(10L);
+ Configuration config = new Configuration();
+ config.setInteger(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 128);
+
+ DataStream<RowData> stream = env.fromSource(
+ sourceBuilder()
+ .streaming(true)
+ .monitorInterval(Duration.ofMillis(10))
+ .streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+ .build(),
+ WatermarkStrategy.noWatermarks(),
+ "IcebergSource",
+ TypeInformation.of(RowData.class));
+
+ // CollectStreamSink from DataStream#executeAndCollect() doesn't guarantee
+ // exactly-once behavior. When Iceberg sink, we can verify end-to-end
+ // exactly-once. Here we mainly about source exactly-once behavior.
+ FlinkSink.forRowData(stream)
+ .table(sinkTableResource.table())
+ .tableLoader(sinkTableResource.tableLoader())
+ .append();
+
+ JobClient jobClient = env.executeAsync("Continuous Iceberg Source Failover Test");
+ JobID jobId = jobClient.getJobID();
+
+ for (int i = 1; i < 5; i++) {
+ Thread.sleep(10);
+ List<Record> records = generateRecords(2, i);
+ expectedRecords.addAll(records);
+ dataAppender.appendToTable(records);
+ if (i == 2) {
+ triggerFailover(failoverType, jobId, () -> {
+ }, miniClusterResource.getMiniCluster());
+ }
+ }
+
+ // wait longer for continuous source to reduce flakiness
+ // because CI servers tend to be overloaded.
+ assertRecords(sinkTableResource.table(), expectedRecords, Duration.ofMillis(10), 12000);
+ }
+
+ // ------------------------------------------------------------------------
+ // test utilities copied from Flink's FileSourceTextLinesITCase
+ // ------------------------------------------------------------------------
+
+ private enum FailoverType {
+ NONE,
+ TM,
+ JM
+ }
+
+ private static void triggerFailover(
+ FailoverType type, JobID jobId, Runnable afterFailAction, MiniCluster miniCluster)
+ throws Exception {
+ switch (type) {
+ case NONE:
+ afterFailAction.run();
+ break;
+ case TM:
+ restartTaskManager(afterFailAction, miniCluster);
+ break;
+ case JM:
+ triggerJobManagerFailover(jobId, afterFailAction, miniCluster);
+ break;
+ }
+ }
+
+ private static void triggerJobManagerFailover(
+ JobID jobId, Runnable afterFailAction, MiniCluster miniCluster) throws Exception {
+ HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get();
+ haLeadershipControl.revokeJobMasterLeadership(jobId).get();
+ afterFailAction.run();
+ haLeadershipControl.grantJobMasterLeadership(jobId).get();
+ }
+
+ private static void restartTaskManager(Runnable afterFailAction, MiniCluster miniCluster)
+ throws Exception {
+ miniCluster.terminateTaskManager(0).get();
+ afterFailAction.run();
+ miniCluster.startTaskManager();
+ }
+
+ private static class RecordCounterToFail {
+
+ private static AtomicInteger records;
+ private static CompletableFuture<Void> fail;
+ private static CompletableFuture<Void> continueProcessing;
+
+ private static <T> DataStream<T> wrapWithFailureAfter(DataStream<T> stream, int failAfter) {
+
+ records = new AtomicInteger();
+ fail = new CompletableFuture<>();
+ continueProcessing = new CompletableFuture<>();
+ return stream.map(
+ record -> {
+ boolean reachedFailPoint = records.incrementAndGet() > failAfter;
+ boolean notFailedYet = !fail.isDone();
+ if (notFailedYet && reachedFailPoint) {
+ fail.complete(null);
+ continueProcessing.get();
+ }
+ return record;
+ });
+ }
+
+ private static void waitToFail() throws ExecutionException, InterruptedException {
+ fail.get();
+ }
+
+ private static void continueProcessing() {
+ continueProcessing.complete(null);
+ }
+ }
+}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceReaderDeletes.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceReaderDeletes.java
new file mode 100644
index 000000000..08fb159dd
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceReaderDeletes.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
+import org.apache.iceberg.flink.source.reader.RowDataReaderFunction;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceReaderDeletes extends TestFlinkReaderDeletesBase {
+
+ private static final int PARALLELISM = 4;
+
+ @ClassRule
+ public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
+
+ @ClassRule
+ public static final MiniClusterWithClientResource MINI_CLUSTER = new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .build());
+
+ public TestIcebergSourceReaderDeletes(FileFormat inputFormat) {
+ super(inputFormat);
+ }
+
+ @Override
+ protected StructLikeSet rowSet(String tableName, Table testTable, String... columns) throws IOException {
+ Schema projected = testTable.schema().select(columns);
+ RowType rowType = FlinkSchemaUtil.convert(projected);
+
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(CatalogProperties.WAREHOUSE_LOCATION, hiveConf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname));
+ properties.put(CatalogProperties.URI, hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
+ properties.put(CatalogProperties.CLIENT_POOL_SIZE,
+ Integer.toString(hiveConf.getInt("iceberg.hive.client-pool-size", 5)));
+ CatalogLoader hiveCatalogLoader = CatalogLoader.hive(catalog.name(), hiveConf, properties);
+ TableLoader hiveTableLoader = TableLoader.fromCatalog(hiveCatalogLoader, TableIdentifier.of("default", tableName));
+ hiveTableLoader.open();
+ try (TableLoader tableLoader = hiveTableLoader) {
+ Configuration config = new Configuration();
+ Table table = tableLoader.loadTable();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ DataStream<RowData> stream = env.fromSource(
+ IcebergSource.<RowData>builder()
+ .tableLoader(tableLoader)
+ .assignerFactory(new SimpleSplitAssignerFactory())
+ .readerFunction(new RowDataReaderFunction(config, table.schema(), projected,
+ null, false, table.io(), table.encryption()))
+ .project(projected)
+ .build(),
+ WatermarkStrategy.noWatermarks(),
+ "testBasicRead",
+ TypeInformation.of(RowData.class));
+
+ try (CloseableIterator<RowData> iter = stream.executeAndCollect()) {
+ List<RowData> rowDataList = Lists.newArrayList(iter);
+ StructLikeSet set = StructLikeSet.create(projected.asStruct());
+ rowDataList.forEach(rowData -> {
+ RowDataWrapper wrapper = new RowDataWrapper(rowType, projected.asStruct());
+ set.add(wrapper.wrap(rowData));
+ });
+ return set;
+ } catch (Exception e) {
+ throw new IOException("Failed to collect result", e);
+ }
+ }
+ }
+
+}