You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/06/24 23:43:21 UTC

[iceberg] branch master updated: Flink: FLIP-27 Iceberg source and builder (#5109)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 35b8558d7 Flink: FLIP-27 Iceberg source and builder (#5109)
35b8558d7 is described below

commit 35b8558d7c0b5564fb3d47f92721534f45850cd6
Author: Steven Zhen Wu <st...@gmail.com>
AuthorDate: Fri Jun 24 16:43:16 2022 -0700

    Flink: FLIP-27 Iceberg source and builder (#5109)
---
 .../iceberg/flink/data/FlinkParquetReaders.java    |   8 +-
 .../apache/iceberg/flink/source/IcebergSource.java | 290 +++++++++++++++++
 .../apache/iceberg/flink/source/ScanContext.java   |   2 +-
 .../flink/source/assigner/SimpleSplitAssigner.java |  14 +-
 .../enumerator/ContinuousSplitPlannerImpl.java     |   8 +-
 .../org/apache/iceberg/flink/SimpleDataUtil.java   |  55 ++++
 .../org/apache/iceberg/flink/TestFixtures.java     |  11 +
 .../iceberg/flink/data/RowDataToRowMapper.java     |  51 +++
 .../flink/source/TestIcebergSourceBounded.java     | 126 ++++++++
 .../flink/source/TestIcebergSourceContinuous.java  | 353 +++++++++++++++++++++
 .../flink/source/TestIcebergSourceFailover.java    | 299 +++++++++++++++++
 .../source/TestIcebergSourceReaderDeletes.java     | 115 +++++++
 12 files changed, 1321 insertions(+), 11 deletions(-)

diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
index b0fb3538e..30184d899 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
@@ -24,10 +24,12 @@ import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.time.Instant;
 import java.time.ZoneOffset;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.MapData;
 import org.apache.flink.table.data.RawValueData;
@@ -482,8 +484,10 @@ public class FlinkParquetReaders {
 
     @Override
     protected ArrayData buildList(ReusableArrayData list) {
-      list.setNumElements(writePos);
-      return list;
+      // Since ReusableArrayData is not accepted by Flink, use GenericArrayData temporarily to walk around it.
+      // Revert this to use ReusableArrayData once it is fixed in Flink.
+      // For your reference, https://issues.apache.org/jira/browse/FLINK-25238.
+      return new GenericArrayData(Arrays.copyOf(list.values, writePos));
     }
   }
 
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
new file mode 100644
index 000000000..1d604cf50
--- /dev/null
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.source.assigner.SplitAssigner;
+import org.apache.iceberg.flink.source.assigner.SplitAssignerFactory;
+import org.apache.iceberg.flink.source.enumerator.ContinuousIcebergEnumerator;
+import org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlanner;
+import org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlannerImpl;
+import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorState;
+import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer;
+import org.apache.iceberg.flink.source.enumerator.StaticIcebergEnumerator;
+import org.apache.iceberg.flink.source.reader.IcebergSourceReader;
+import org.apache.iceberg.flink.source.reader.ReaderFunction;
+import org.apache.iceberg.flink.source.reader.ReaderMetricsContext;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Experimental
+public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEnumeratorState> {
+  private static final Logger LOG = LoggerFactory.getLogger(IcebergSource.class);
+
+  private final TableLoader tableLoader;
+  private final ScanContext scanContext;
+  private final ReaderFunction<T> readerFunction;
+  private final SplitAssignerFactory assignerFactory;
+
+  IcebergSource(
+      TableLoader tableLoader,
+      ScanContext scanContext,
+      ReaderFunction<T> readerFunction,
+      SplitAssignerFactory assignerFactory) {
+    this.tableLoader = tableLoader;
+    this.scanContext = scanContext;
+    this.readerFunction = readerFunction;
+    this.assignerFactory = assignerFactory;
+  }
+
+  private static Table loadTable(TableLoader tableLoader) {
+    tableLoader.open();
+    try (TableLoader loader = tableLoader) {
+      return loader.loadTable();
+    } catch (IOException e) {
+      throw new UncheckedIOException("Failed to close table loader", e);
+    }
+  }
+
+  @Override
+  public Boundedness getBoundedness() {
+    return scanContext.isStreaming() ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED;
+  }
+
+  @Override
+  public SourceReader<T, IcebergSourceSplit> createReader(SourceReaderContext readerContext) {
+    ReaderMetricsContext readerMetrics =
+        new ReaderMetricsContext(readerContext.metricGroup());
+    return new IcebergSourceReader<>(readerFunction, readerContext, readerMetrics);
+  }
+
+  @Override
+  public SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumerator(
+      SplitEnumeratorContext<IcebergSourceSplit> enumContext) {
+    return createEnumerator(enumContext, null);
+  }
+
+  @Override
+  public SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> restoreEnumerator(
+      SplitEnumeratorContext<IcebergSourceSplit> enumContext, IcebergEnumeratorState enumState) {
+    return createEnumerator(enumContext, enumState);
+  }
+
+  @Override
+  public SimpleVersionedSerializer<IcebergSourceSplit> getSplitSerializer() {
+    return IcebergSourceSplitSerializer.INSTANCE;
+  }
+
+  @Override
+  public SimpleVersionedSerializer<IcebergEnumeratorState> getEnumeratorCheckpointSerializer() {
+    return IcebergEnumeratorStateSerializer.INSTANCE;
+  }
+
+  private SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumerator(
+      SplitEnumeratorContext<IcebergSourceSplit> enumContext,
+      @Nullable IcebergEnumeratorState enumState) {
+    Table table = loadTable(tableLoader);
+    SplitAssigner assigner;
+    if (enumState == null) {
+      assigner = assignerFactory.createAssigner();
+    } else {
+      LOG.info("Iceberg source restored {} splits from state for table {}",
+          enumState.pendingSplits().size(), table.name());
+      assigner = assignerFactory.createAssigner(enumState.pendingSplits());
+    }
+
+    if (scanContext.isStreaming()) {
+      // Ideally, operatorId should be used as the threadPoolName as Flink guarantees its uniqueness within a job.
+      // SplitEnumeratorContext doesn't expose the OperatorCoordinator.Context, which would contain the OperatorID.
+      // Need to discuss with Flink community whether it is ok to expose a public API like the protected method
+      // "OperatorCoordinator.Context getCoordinatorContext()" from SourceCoordinatorContext implementation.
+      // For now, <table name>-<random UUID> is used as the unique thread pool name.
+      ContinuousSplitPlanner splitPlanner = new ContinuousSplitPlannerImpl(
+          table, scanContext, table.name() + "-" + UUID.randomUUID());
+      return new ContinuousIcebergEnumerator(enumContext, assigner, scanContext, splitPlanner, enumState);
+    } else {
+      return new StaticIcebergEnumerator(enumContext, assigner, table, scanContext, enumState);
+    }
+  }
+
+  public static <T> Builder<T> builder() {
+    return new Builder<>();
+  }
+
+  public static class Builder<T> {
+
+    // required
+    private TableLoader tableLoader;
+    private SplitAssignerFactory splitAssignerFactory;
+    private ReaderFunction<T> readerFunction;
+
+    // optional
+    private final ScanContext.Builder contextBuilder = ScanContext.builder();
+
+    Builder() {
+    }
+
+    public Builder<T> tableLoader(TableLoader loader) {
+      this.tableLoader = loader;
+      return this;
+    }
+
+    public Builder<T> assignerFactory(SplitAssignerFactory assignerFactory) {
+      this.splitAssignerFactory = assignerFactory;
+      return this;
+    }
+
+    public Builder<T> readerFunction(ReaderFunction<T> newReaderFunction) {
+      this.readerFunction = newReaderFunction;
+      return this;
+    }
+
+    public Builder caseSensitive(boolean newCaseSensitive) {
+      this.contextBuilder.caseSensitive(newCaseSensitive);
+      return this;
+    }
+
+    public Builder useSnapshotId(Long newSnapshotId) {
+      this.contextBuilder.useSnapshotId(newSnapshotId);
+      return this;
+    }
+
+    public Builder streamingStartingStrategy(StreamingStartingStrategy newStartingStrategy) {
+      this.contextBuilder.startingStrategy(newStartingStrategy);
+      return this;
+    }
+
+    public Builder startSnapshotTimestamp(Long newStartSnapshotTimestamp) {
+      this.contextBuilder.startSnapshotTimestamp(newStartSnapshotTimestamp);
+      return this;
+    }
+
+    public Builder startSnapshotId(Long newStartSnapshotId) {
+      this.contextBuilder.startSnapshotId(newStartSnapshotId);
+      return this;
+    }
+
+    public Builder endSnapshotId(Long newEndSnapshotId) {
+      this.contextBuilder.endSnapshotId(newEndSnapshotId);
+      return this;
+    }
+
+    public Builder asOfTimestamp(Long newAsOfTimestamp) {
+      this.contextBuilder.asOfTimestamp(newAsOfTimestamp);
+      return this;
+    }
+
+    public Builder splitSize(Long newSplitSize) {
+      this.contextBuilder.splitSize(newSplitSize);
+      return this;
+    }
+
+    public Builder splitLookback(Integer newSplitLookback) {
+      this.contextBuilder.splitLookback(newSplitLookback);
+      return this;
+    }
+
+    public Builder splitOpenFileCost(Long newSplitOpenFileCost) {
+      this.contextBuilder.splitOpenFileCost(newSplitOpenFileCost);
+      return this;
+    }
+
+    public Builder streaming(boolean streaming) {
+      this.contextBuilder.streaming(streaming);
+      return this;
+    }
+
+    public Builder monitorInterval(Duration newMonitorInterval) {
+      this.contextBuilder.monitorInterval(newMonitorInterval);
+      return this;
+    }
+
+    public Builder nameMapping(String newNameMapping) {
+      this.contextBuilder.nameMapping(newNameMapping);
+      return this;
+    }
+
+    public Builder project(Schema newProjectedSchema) {
+      this.contextBuilder.project(newProjectedSchema);
+      return this;
+    }
+
+    public Builder filters(List<Expression> newFilters) {
+      this.contextBuilder.filters(newFilters);
+      return this;
+    }
+
+    public Builder limit(long newLimit) {
+      this.contextBuilder.limit(newLimit);
+      return this;
+    }
+
+    public Builder includeColumnStats(boolean newIncludeColumnStats) {
+      this.contextBuilder.includeColumnStats(newIncludeColumnStats);
+      return this;
+    }
+
+    public Builder planParallelism(int planParallelism) {
+      this.contextBuilder.planParallelism(planParallelism);
+      return this;
+    }
+
+    public Builder properties(Map<String, String> properties) {
+      contextBuilder.fromProperties(properties);
+      return this;
+    }
+
+    public IcebergSource<T> build() {
+      checkRequired();
+      return new IcebergSource<T>(
+          tableLoader,
+          contextBuilder.build(),
+          readerFunction,
+          splitAssignerFactory);
+    }
+
+    private void checkRequired() {
+      Preconditions.checkNotNull(tableLoader, "tableLoader is required.");
+      Preconditions.checkNotNull(splitAssignerFactory, "assignerFactory is required.");
+      Preconditions.checkNotNull(readerFunction, "readerFunction is required.");
+    }
+  }
+}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index 905205c88..2fe47cc7f 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -163,7 +163,7 @@ public class ScanContext implements Serializable {
     return snapshotId;
   }
 
-  public StreamingStartingStrategy startingStrategy() {
+  public StreamingStartingStrategy streamingStartingStrategy() {
     return startingStrategy;
   }
 
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssigner.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssigner.java
index 51ce3d113..ed70ad377 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssigner.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssigner.java
@@ -63,14 +63,20 @@ public class SimpleSplitAssigner implements SplitAssigner {
 
   @Override
   public void onDiscoveredSplits(Collection<IcebergSourceSplit> splits) {
-    pendingSplits.addAll(splits);
-    completeAvailableFuturesIfNeeded();
+    addSplits(splits);
   }
 
   @Override
   public void onUnassignedSplits(Collection<IcebergSourceSplit> splits) {
-    pendingSplits.addAll(splits);
-    completeAvailableFuturesIfNeeded();
+    addSplits(splits);
+  }
+
+  private void addSplits(Collection<IcebergSourceSplit> splits) {
+    if (!splits.isEmpty()) {
+      pendingSplits.addAll(splits);
+      // only complete pending future if new splits are discovered
+      completeAvailableFuturesIfNeeded();
+    }
   }
 
   /**
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
index 4d6e89684..2bbaaf940 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java
@@ -123,10 +123,10 @@ public class ContinuousSplitPlannerImpl implements ContinuousSplitPlanner {
 
     Snapshot startSnapshot = startSnapshotOptional.get();
     LOG.info("Get starting snapshot id {} based on strategy {}",
-        startSnapshot.snapshotId(), scanContext.startingStrategy());
+        startSnapshot.snapshotId(), scanContext.streamingStartingStrategy());
     List<IcebergSourceSplit> splits;
     IcebergEnumeratorPosition toPosition;
-    if (scanContext.startingStrategy() == StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) {
+    if (scanContext.streamingStartingStrategy() == StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) {
       // do a batch table scan first
       splits = FlinkSplitPlanner.planIcebergSourceSplits(table, scanContext, workerPool);
       LOG.info("Discovered {} splits from initial batch table scan with snapshot Id {}",
@@ -161,7 +161,7 @@ public class ContinuousSplitPlannerImpl implements ContinuousSplitPlanner {
    */
   @VisibleForTesting
   static Optional<Snapshot> startSnapshot(Table table, ScanContext scanContext) {
-    switch (scanContext.startingStrategy()) {
+    switch (scanContext.streamingStartingStrategy()) {
       case TABLE_SCAN_THEN_INCREMENTAL:
       case INCREMENTAL_FROM_LATEST_SNAPSHOT:
         return Optional.ofNullable(table.currentSnapshot());
@@ -183,7 +183,7 @@ public class ContinuousSplitPlannerImpl implements ContinuousSplitPlanner {
           return Optional.of(SnapshotUtil.snapshotAfter(table, snapshotIdAsOfTime));
         }
       default:
-        throw new IllegalArgumentException("Unknown starting strategy: " + scanContext.startingStrategy());
+        throw new IllegalArgumentException("Unknown starting strategy: " + scanContext.streamingStartingStrategy());
     }
   }
 }
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
index 92e935f25..a6f70453b 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg.flink;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -193,6 +194,60 @@ public class SimpleDataUtil {
     assertTableRecords(table, convertToRecords(expected));
   }
 
+  /**
+   * Get all rows for a table
+   */
+  public static List<Record> tableRecords(Table table) throws IOException {
+    table.refresh();
+    List<Record> records = Lists.newArrayList();
+    try (CloseableIterable<Record> iterable = IcebergGenerics.read(table).build()) {
+      for (Record record : iterable) {
+        records.add(record);
+      }
+    }
+    return records;
+  }
+
+  private static boolean equalsRecords(List<Record> expected, List<Record> actual, Schema schema) {
+    if (expected.size() != actual.size()) {
+      return false;
+    }
+    Types.StructType type = schema.asStruct();
+    StructLikeSet expectedSet = StructLikeSet.create(type);
+    expectedSet.addAll(expected);
+    StructLikeSet actualSet = StructLikeSet.create(type);
+    actualSet.addAll(actual);
+    return expectedSet.equals(actualSet);
+  }
+
+  private static void assertRecordsEqual(List<Record> expected, List<Record> actual, Schema schema) {
+    Assert.assertEquals(expected.size(), actual.size());
+    Types.StructType type = schema.asStruct();
+    StructLikeSet expectedSet = StructLikeSet.create(type);
+    expectedSet.addAll(expected);
+    StructLikeSet actualSet = StructLikeSet.create(type);
+    actualSet.addAll(actual);
+    Assert.assertEquals(expectedSet, actualSet);
+  }
+
+  /**
+   * Assert table contains the expected list of records after
+   * waiting up to {@code maxCheckCount} with {@code checkInterval}
+   */
+  public static void assertTableRecords(
+      Table table, List<Record> expected, Duration checkInterval, int maxCheckCount)
+      throws IOException, InterruptedException {
+    for (int i = 0; i < maxCheckCount; ++i) {
+      if (equalsRecords(expected, tableRecords(table), table.schema())) {
+        break;
+      } else {
+        Thread.sleep(checkInterval.toMillis());
+      }
+    }
+    // success or failure, assert on the latest table state
+    assertRecordsEqual(tableRecords(table), expected, table.schema());
+  }
+
   public static void assertTableRecords(Table table, List<Record> expected) throws IOException {
     table.refresh();
 
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFixtures.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFixtures.java
index 71532c594..c3c280cec 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFixtures.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFixtures.java
@@ -47,6 +47,17 @@ public class TestFixtures {
 
   public static final String DATABASE = "default";
   public static final String TABLE = "t";
+  public static final String SINK_TABLE = "t_sink";
 
   public static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(DATABASE, TABLE);
+
+  public static final Schema TS_SCHEMA = new Schema(
+      required(1, "ts", Types.TimestampType.withoutZone()),
+      required(2, "str", Types.StringType.get()));
+
+  public static final PartitionSpec TS_SPEC = PartitionSpec.builderFor(TS_SCHEMA)
+      .hour("ts")
+      .build();
+
+  public static final RowType TS_ROW_TYPE = FlinkSchemaUtil.convert(TS_SCHEMA);
 }
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/data/RowDataToRowMapper.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/data/RowDataToRowMapper.java
new file mode 100644
index 000000000..549a6ed3a
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/data/RowDataToRowMapper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.data;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
+
+public class RowDataToRowMapper extends RichMapFunction<RowData, Row> {
+
+  private final RowType rowType;
+
+  private transient DataStructureConverter<Object, Object> converter;
+
+  public RowDataToRowMapper(RowType rowType) {
+    this.rowType = rowType;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    this.converter = DataStructureConverters.getConverter(
+        TypeConversions.fromLogicalToDataType(rowType));
+  }
+
+  @Override
+  public Row map(RowData value) throws Exception {
+    return (Row) converter.toExternal(value);
+  }
+}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
new file mode 100644
index 000000000..5c8243ada
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.data.RowDataToRowMapper;
+import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
+import org.apache.iceberg.flink.source.reader.RowDataReaderFunction;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestIcebergSourceBounded extends TestFlinkScan {
+
+  public TestIcebergSourceBounded(String fileFormat) {
+    super(fileFormat);
+  }
+
+  @Override
+  protected List<Row> runWithProjection(String... projected) throws Exception {
+    Schema icebergTableSchema = catalog.loadTable(TestFixtures.TABLE_IDENTIFIER).schema();
+    TableSchema.Builder builder = TableSchema.builder();
+    TableSchema schema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergTableSchema));
+    for (String field : projected) {
+      TableColumn column = schema.getTableColumn(field).get();
+      builder.field(column.getName(), column.getType());
+    }
+    TableSchema flinkSchema = builder.build();
+    Schema projectedSchema = FlinkSchemaUtil.convert(icebergTableSchema, flinkSchema);
+    return run(projectedSchema, null, null);
+  }
+
+  @Override
+  protected List<Row> runWithFilter(Expression filter, String sqlFilter) throws Exception {
+    return run(null, Arrays.asList(filter), null);
+  }
+
+  @Override
+  protected List<Row> runWithOptions(Map<String, String> options) throws Exception {
+    return run(null, null, options);
+  }
+
+  @Override
+  protected List<Row> run() throws Exception {
+    return run(null, null, null);
+  }
+
+  private List<Row> run(Schema projectedSchema, List<Expression> filters,
+                        Map<String, String> options) throws Exception {
+
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setParallelism(1);
+    Configuration config = new Configuration();
+    config.setInteger(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 128);
+    Table table;
+    try (TableLoader tableLoader = tableLoader()) {
+      tableLoader.open();
+      table = tableLoader.loadTable();
+    }
+
+    IcebergSource.Builder<RowData> sourceBuilder = IcebergSource.<RowData>builder()
+        .tableLoader(tableLoader())
+        .assignerFactory(new SimpleSplitAssignerFactory())
+        .readerFunction(new RowDataReaderFunction(config, table.schema(), projectedSchema,
+            null, false, table.io(), table.encryption()));
+    if (projectedSchema != null) {
+      sourceBuilder.project(projectedSchema);
+    }
+    if (filters != null) {
+      sourceBuilder.filters(filters);
+    }
+    if (options != null) {
+      sourceBuilder.properties(options);
+    }
+
+    DataStream<Row> stream = env.fromSource(
+        sourceBuilder.build(),
+        WatermarkStrategy.noWatermarks(),
+        "testBasicRead",
+        TypeInformation.of(RowData.class))
+        .map(new RowDataToRowMapper(FlinkSchemaUtil.convert(
+            projectedSchema == null ? table.schema() : projectedSchema)));
+
+    try (CloseableIterator<Row> iter = stream.executeAndCollect()) {
+      return Lists.newArrayList(iter);
+    }
+  }
+
+}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
new file mode 100644
index 000000000..80475ee4f
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.flink.data.RowDataToRowMapper;
+import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
+import org.apache.iceberg.flink.source.reader.RowDataReaderFunction;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceContinuous {
+
+  @ClassRule
+  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+      MiniClusterResource.createWithClassloaderCheckDisabled();
+
+  @ClassRule
+  public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  @Rule
+  public final HadoopTableResource tableResource = new HadoopTableResource(TEMPORARY_FOLDER,
+      TestFixtures.DATABASE, TestFixtures.TABLE, TestFixtures.SCHEMA);
+
+  private final AtomicLong randomSeed = new AtomicLong(0L);
+
+  @Test
+  public void testTableScanThenIncremental() throws Exception {
+    GenericAppenderHelper dataAppender = new GenericAppenderHelper(
+        tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
+
+    // snapshot1
+    List<Record> batch1 = RandomGenericData.generate(
+        tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+    dataAppender.appendToTable(batch1);
+
+    ScanContext scanContext = ScanContext.builder()
+        .streaming(true)
+        .monitorInterval(Duration.ofMillis(10L))
+        .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+        .build();
+
+    try (CloseableIterator<Row> iter = createStream(scanContext).executeAndCollect(getClass().getSimpleName())) {
+      List<Row> result1 = waitForResult(iter, 2);
+      TestHelpers.assertRecords(result1, batch1, tableResource.table().schema());
+
+      // snapshot2
+      List<Record> batch2 = RandomGenericData.generate(
+          tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+      dataAppender.appendToTable(batch2);
+      tableResource.table().currentSnapshot().snapshotId();
+
+      List<Row> result2 = waitForResult(iter, 2);
+      TestHelpers.assertRecords(result2, batch2, tableResource.table().schema());
+
+      // snapshot3
+      List<Record> batch3 = RandomGenericData.generate(
+          tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+      dataAppender.appendToTable(batch3);
+      tableResource.table().currentSnapshot().snapshotId();
+
+      List<Row> result3 = waitForResult(iter, 2);
+      TestHelpers.assertRecords(result3, batch3, tableResource.table().schema());
+    }
+  }
+
+  @Test
+  public void testEarliestSnapshot() throws Exception {
+    GenericAppenderHelper dataAppender = new GenericAppenderHelper(
+        tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
+
+    // snapshot0
+    List<Record> batch0 = RandomGenericData.generate(
+        tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+    dataAppender.appendToTable(batch0);
+
+    // snapshot1
+    List<Record> batch1 = RandomGenericData.generate(
+        tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+    dataAppender.appendToTable(batch1);
+
+    ScanContext scanContext = ScanContext.builder()
+        .streaming(true)
+        .monitorInterval(Duration.ofMillis(10L))
+        .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+        .build();
+
+    try (CloseableIterator<Row> iter = createStream(scanContext).executeAndCollect(getClass().getSimpleName())) {
+      List<Row> result1 = waitForResult(iter, 4);
+      List<Record> combinedBatch0AndBatch1 = Lists.newArrayList(batch0);
+      combinedBatch0AndBatch1.addAll(batch1);
+      TestHelpers.assertRecords(result1, combinedBatch0AndBatch1, tableResource.table().schema());
+
+      // snapshot2
+      List<Record> batch2 = RandomGenericData.generate(
+          tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+      dataAppender.appendToTable(batch2);
+
+      List<Row> result2 = waitForResult(iter, 2);
+      TestHelpers.assertRecords(result2, batch2, tableResource.table().schema());
+
+      // snapshot3
+      List<Record> batch3 = RandomGenericData.generate(
+          tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+      dataAppender.appendToTable(batch3);
+
+      List<Row> result3 = waitForResult(iter, 2);
+      TestHelpers.assertRecords(result3, batch3, tableResource.table().schema());
+    }
+  }
+
+  @Test
+  public void testLatestSnapshot() throws Exception {
+    GenericAppenderHelper dataAppender = new GenericAppenderHelper(
+        tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
+
+    // snapshot0
+    List<Record> batch0 = RandomGenericData.generate(
+        tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+    dataAppender.appendToTable(batch0);
+
+    // snapshot1
+    List<Record> batch1 = RandomGenericData.generate(
+        tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+    dataAppender.appendToTable(batch1);
+
+    ScanContext scanContext = ScanContext.builder()
+        .streaming(true)
+        .monitorInterval(Duration.ofMillis(10L))
+        .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
+        .build();
+
+    try (CloseableIterator<Row> iter = createStream(scanContext).executeAndCollect(getClass().getSimpleName())) {
+      // we want to make sure job is running first so that enumerator can
+      // start from the latest snapshot before inserting the next batch2 below.
+      waitUntilJobIsRunning(MINI_CLUSTER_RESOURCE.getClusterClient());
+
+      // inclusive behavior for starting snapshot
+      List<Row> result1 = waitForResult(iter, 2);
+      TestHelpers.assertRecords(result1, batch1, tableResource.table().schema());
+
+      // snapshot2
+      List<Record> batch2 = RandomGenericData.generate(
+          tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+      dataAppender.appendToTable(batch2);
+
+      List<Row> result2 = waitForResult(iter, 2);
+      TestHelpers.assertRecords(result2, batch2, tableResource.table().schema());
+
+      // snapshot3
+      List<Record> batch3 = RandomGenericData.generate(
+          tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+      dataAppender.appendToTable(batch3);
+
+      List<Row> result3 = waitForResult(iter, 2);
+      TestHelpers.assertRecords(result3, batch3, tableResource.table().schema());
+    }
+  }
+
+  @Test
+  public void testSpecificSnapshotId() throws Exception {
+    GenericAppenderHelper dataAppender = new GenericAppenderHelper(
+        tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
+
+    // snapshot0
+    List<Record> batch0 = RandomGenericData.generate(
+        tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+    dataAppender.appendToTable(batch0);
+    long snapshot0 = tableResource.table().currentSnapshot().snapshotId();
+
+    // snapshot1
+    List<Record> batch1 = RandomGenericData.generate(
+        tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+    dataAppender.appendToTable(batch1);
+    long snapshot1 = tableResource.table().currentSnapshot().snapshotId();
+
+    ScanContext scanContext = ScanContext.builder()
+        .streaming(true)
+        .monitorInterval(Duration.ofMillis(10L))
+        .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID)
+        .startSnapshotId(snapshot1)
+        .build();
+
+    try (CloseableIterator<Row> iter = createStream(scanContext).executeAndCollect(getClass().getSimpleName())) {
+      List<Row> result1 = waitForResult(iter, 2);
+      TestHelpers.assertRecords(result1, batch1, tableResource.table().schema());
+
+      // snapshot2
+      List<Record> batch2 = RandomGenericData.generate(
+          tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+      dataAppender.appendToTable(batch2);
+
+      List<Row> result2 = waitForResult(iter, 2);
+      TestHelpers.assertRecords(result2, batch2, tableResource.table().schema());
+
+      // snapshot3
+      List<Record> batch3 = RandomGenericData.generate(
+          tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+      dataAppender.appendToTable(batch3);
+
+      List<Row> result3 = waitForResult(iter, 2);
+      TestHelpers.assertRecords(result3, batch3, tableResource.table().schema());
+    }
+  }
+
+  @Test
+  public void testSpecificSnapshotTimestamp() throws Exception {
+    GenericAppenderHelper dataAppender = new GenericAppenderHelper(
+        tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
+
+    // snapshot0
+    List<Record> batch0 = RandomGenericData.generate(
+        tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+    dataAppender.appendToTable(batch0);
+    long snapshot0Timestamp = tableResource.table().currentSnapshot().timestampMillis();
+
+    // sleep for 2 ms to make sure snapshot1 has a higher timestamp value
+    Thread.sleep(2);
+
+    // snapshot1
+    List<Record> batch1 = RandomGenericData.generate(
+        tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+    dataAppender.appendToTable(batch1);
+    long snapshot1Timestamp = tableResource.table().currentSnapshot().timestampMillis();
+
+    ScanContext scanContext = ScanContext.builder()
+        .streaming(true)
+        .monitorInterval(Duration.ofMillis(10L))
+        .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP)
+        .startSnapshotTimestamp(snapshot1Timestamp)
+        .build();
+
+    try (CloseableIterator<Row> iter = createStream(scanContext).executeAndCollect(getClass().getSimpleName())) {
+      // consume data from snapshot1
+      List<Row> result1 = waitForResult(iter, 2);
+      TestHelpers.assertRecords(result1, batch1, tableResource.table().schema());
+
+      // snapshot2
+      List<Record> batch2 = RandomGenericData.generate(
+          tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+      dataAppender.appendToTable(batch2);
+
+      List<Row> result2 = waitForResult(iter, 2);
+      TestHelpers.assertRecords(result2, batch2, tableResource.table().schema());
+
+      // snapshot3
+      List<Record> batch3 = RandomGenericData.generate(
+          tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+      dataAppender.appendToTable(batch3);
+
+      List<Row> result3 = waitForResult(iter, 2);
+      TestHelpers.assertRecords(result3, batch3, tableResource.table().schema());
+    }
+  }
+
+  private DataStream<Row> createStream(ScanContext scanContext) throws Exception {
+    Table table = tableResource.table();
+    Configuration config = new Configuration();
+    // start the source and collect output
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setParallelism(1);
+    DataStream<Row> stream = env.fromSource(
+        IcebergSource.<RowData>builder()
+            .tableLoader(tableResource.tableLoader())
+            .assignerFactory(new SimpleSplitAssignerFactory())
+            .readerFunction(new RowDataReaderFunction(config, table.schema(), null,
+                null, false, table.io(), table.encryption()))
+            .streaming(scanContext.isStreaming())
+            .streamingStartingStrategy(scanContext.streamingStartingStrategy())
+            .startSnapshotTimestamp(scanContext.startSnapshotTimestamp())
+            .startSnapshotId(scanContext.startSnapshotId())
+            .monitorInterval(Duration.ofMillis(10L))
+            .build(),
+        WatermarkStrategy.noWatermarks(),
+        "icebergSource",
+        TypeInformation.of(RowData.class))
+        .map(new RowDataToRowMapper(FlinkSchemaUtil.convert(tableResource.table().schema())));
+    return stream;
+  }
+
+  public static List<Row> waitForResult(CloseableIterator<Row> iter, int limit) {
+    List<Row> results = Lists.newArrayListWithCapacity(limit);
+    while (results.size() < limit) {
+      if (iter.hasNext()) {
+        results.add(iter.next());
+      } else {
+        break;
+      }
+    }
+    return results;
+  }
+
+  public static void waitUntilJobIsRunning(ClusterClient<?> client) throws Exception {
+    while (getRunningJobs(client).isEmpty()) {
+      Thread.sleep(10);
+    }
+  }
+
+  public static List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception {
+    Collection<JobStatusMessage> statusMessages = client.listJobs().get();
+    return statusMessages.stream()
+        .filter(status -> status.getJobState() == JobStatus.RUNNING)
+        .map(JobStatusMessage::getJobId)
+        .collect(Collectors.toList());
+  }
+}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
new file mode 100644
index 000000000..f6f522c9d
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.HadoopTableResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.sink.FlinkSink;
+import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
+import org.apache.iceberg.flink.source.reader.RowDataReaderFunction;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceFailover {
+
+  private static final int PARALLELISM = 4;
+
+  @ClassRule
+  public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+
+  @Rule
+  public final MiniClusterWithClientResource miniClusterResource =
+      new MiniClusterWithClientResource(
+          new MiniClusterResourceConfiguration.Builder()
+              .setNumberTaskManagers(1)
+              .setNumberSlotsPerTaskManager(PARALLELISM)
+              .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+              .withHaLeadershipControl()
+              .build());
+
+  @Rule
+  public final HadoopTableResource sourceTableResource = new HadoopTableResource(TEMPORARY_FOLDER,
+      TestFixtures.DATABASE, TestFixtures.TABLE, schema());
+
+  @Rule
+  public final HadoopTableResource sinkTableResource = new HadoopTableResource(TEMPORARY_FOLDER,
+      TestFixtures.DATABASE, TestFixtures.SINK_TABLE, schema());
+
+  protected IcebergSource.Builder<RowData> sourceBuilder() {
+    Table sourceTable = sourceTableResource.table();
+    Configuration config = new Configuration();
+    config.setInteger(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 128);
+    return IcebergSource.<RowData>builder()
+        .tableLoader(sourceTableResource.tableLoader())
+        .assignerFactory(new SimpleSplitAssignerFactory())
+        .readerFunction(new RowDataReaderFunction(config, sourceTable.schema(), null,
+            null, false, sourceTable.io(), sourceTable.encryption()));
+  }
+
+  protected Schema schema() {
+    return TestFixtures.SCHEMA;
+  }
+
+  protected List<Record> generateRecords(int numRecords, long seed) {
+    return RandomGenericData.generate(schema(), numRecords, seed);
+  }
+
+  protected void assertRecords(Table table, List<Record> expectedRecords, Duration interval, int maxCount)
+      throws Exception {
+    SimpleDataUtil.assertTableRecords(table,
+        expectedRecords, interval, maxCount);
+  }
+
+  @Test
+  public void testBoundedWithTaskManagerFailover() throws Exception {
+    testBoundedIcebergSource(FailoverType.TM);
+  }
+
+  @Test
+  public void testBoundedWithJobManagerFailover() throws Exception {
+    testBoundedIcebergSource(FailoverType.JM);
+  }
+
+  private void testBoundedIcebergSource(FailoverType failoverType) throws Exception {
+    List<Record> expectedRecords = Lists.newArrayList();
+    GenericAppenderHelper dataAppender = new GenericAppenderHelper(
+        sourceTableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
+    for (int i = 0; i < 4; ++i) {
+      List<Record> records = generateRecords(2, i);
+      expectedRecords.addAll(records);
+      dataAppender.appendToTable(records);
+    }
+
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setParallelism(PARALLELISM);
+    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+
+    DataStream<RowData> stream = env.fromSource(
+        sourceBuilder().build(),
+        WatermarkStrategy.noWatermarks(),
+        "IcebergSource",
+        TypeInformation.of(RowData.class));
+
+    DataStream<RowData> streamFailingInTheMiddleOfReading =
+        RecordCounterToFail.wrapWithFailureAfter(stream, expectedRecords.size() / 2);
+
+    // CollectStreamSink from DataStream#executeAndCollect() doesn't guarantee
+    // exactly-once behavior. When Iceberg sink, we can verify end-to-end
+    // exactly-once. Here we mainly about source exactly-once behavior.
+    FlinkSink.forRowData(streamFailingInTheMiddleOfReading)
+        .table(sinkTableResource.table())
+        .tableLoader(sinkTableResource.tableLoader())
+        .append();
+
+    JobClient jobClient = env.executeAsync("Bounded Iceberg Source Failover Test");
+    JobID jobId = jobClient.getJobID();
+
+    RecordCounterToFail.waitToFail();
+    triggerFailover(
+        failoverType,
+        jobId,
+        RecordCounterToFail::continueProcessing,
+        miniClusterResource.getMiniCluster());
+
+    assertRecords(sinkTableResource.table(), expectedRecords, Duration.ofMillis(10), 12000);
+  }
+
+  @Test
+  public void testContinuousWithTaskManagerFailover() throws Exception {
+    testContinuousIcebergSource(FailoverType.TM);
+  }
+
+  @Test
+  public void testContinuousWithJobManagerFailover() throws Exception {
+    testContinuousIcebergSource(FailoverType.JM);
+  }
+
+  private void testContinuousIcebergSource(FailoverType failoverType) throws Exception {
+    GenericAppenderHelper dataAppender = new GenericAppenderHelper(
+        sourceTableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
+    List<Record> expectedRecords = Lists.newArrayList();
+
+    List<Record> batch = generateRecords(2, 0);
+    expectedRecords.addAll(batch);
+    dataAppender.appendToTable(batch);
+
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setParallelism(PARALLELISM);
+    env.enableCheckpointing(10L);
+    Configuration config = new Configuration();
+    config.setInteger(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 128);
+
+    DataStream<RowData> stream = env.fromSource(
+        sourceBuilder()
+            .streaming(true)
+            .monitorInterval(Duration.ofMillis(10))
+            .streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+            .build(),
+        WatermarkStrategy.noWatermarks(),
+        "IcebergSource",
+        TypeInformation.of(RowData.class));
+
+    // CollectStreamSink from DataStream#executeAndCollect() doesn't guarantee
+    // exactly-once behavior. When Iceberg sink, we can verify end-to-end
+    // exactly-once. Here we mainly about source exactly-once behavior.
+    FlinkSink.forRowData(stream)
+        .table(sinkTableResource.table())
+        .tableLoader(sinkTableResource.tableLoader())
+        .append();
+
+    JobClient jobClient = env.executeAsync("Continuous Iceberg Source Failover Test");
+    JobID jobId = jobClient.getJobID();
+
+    for (int i = 1; i < 5; i++) {
+      Thread.sleep(10);
+      List<Record> records = generateRecords(2, i);
+      expectedRecords.addAll(records);
+      dataAppender.appendToTable(records);
+      if (i == 2) {
+        triggerFailover(failoverType, jobId, () -> {
+        }, miniClusterResource.getMiniCluster());
+      }
+    }
+
+    // wait longer for continuous source to reduce flakiness
+    // because CI servers tend to be overloaded.
+    assertRecords(sinkTableResource.table(), expectedRecords, Duration.ofMillis(10), 12000);
+  }
+
+  // ------------------------------------------------------------------------
+  // test utilities copied from Flink's FileSourceTextLinesITCase
+  // ------------------------------------------------------------------------
+
+  private enum FailoverType {
+    NONE,
+    TM,
+    JM
+  }
+
+  private static void triggerFailover(
+      FailoverType type, JobID jobId, Runnable afterFailAction, MiniCluster miniCluster)
+      throws Exception {
+    switch (type) {
+      case NONE:
+        afterFailAction.run();
+        break;
+      case TM:
+        restartTaskManager(afterFailAction, miniCluster);
+        break;
+      case JM:
+        triggerJobManagerFailover(jobId, afterFailAction, miniCluster);
+        break;
+    }
+  }
+
+  private static void triggerJobManagerFailover(
+      JobID jobId, Runnable afterFailAction, MiniCluster miniCluster) throws Exception {
+    HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get();
+    haLeadershipControl.revokeJobMasterLeadership(jobId).get();
+    afterFailAction.run();
+    haLeadershipControl.grantJobMasterLeadership(jobId).get();
+  }
+
+  private static void restartTaskManager(Runnable afterFailAction, MiniCluster miniCluster)
+      throws Exception {
+    miniCluster.terminateTaskManager(0).get();
+    afterFailAction.run();
+    miniCluster.startTaskManager();
+  }
+
+  private static class RecordCounterToFail {
+
+    private static AtomicInteger records;
+    private static CompletableFuture<Void> fail;
+    private static CompletableFuture<Void> continueProcessing;
+
+    private static <T> DataStream<T> wrapWithFailureAfter(DataStream<T> stream, int failAfter) {
+
+      records = new AtomicInteger();
+      fail = new CompletableFuture<>();
+      continueProcessing = new CompletableFuture<>();
+      return stream.map(
+          record -> {
+            boolean reachedFailPoint = records.incrementAndGet() > failAfter;
+            boolean notFailedYet = !fail.isDone();
+            if (notFailedYet && reachedFailPoint) {
+              fail.complete(null);
+              continueProcessing.get();
+            }
+            return record;
+          });
+    }
+
+    private static void waitToFail() throws ExecutionException, InterruptedException {
+      fail.get();
+    }
+
+    private static void continueProcessing() {
+      continueProcessing.complete(null);
+    }
+  }
+}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceReaderDeletes.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceReaderDeletes.java
new file mode 100644
index 000000000..08fb159dd
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceReaderDeletes.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
+import org.apache.iceberg.flink.source.reader.RowDataReaderFunction;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergSourceReaderDeletes extends TestFlinkReaderDeletesBase {
+
+  private static final int PARALLELISM = 4;
+
+  @ClassRule
+  public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
+
+  @ClassRule
+  public static final MiniClusterWithClientResource MINI_CLUSTER = new MiniClusterWithClientResource(
+      new MiniClusterResourceConfiguration.Builder()
+          .setNumberTaskManagers(1)
+          .setNumberSlotsPerTaskManager(PARALLELISM)
+          .build());
+
+  public TestIcebergSourceReaderDeletes(FileFormat inputFormat) {
+    super(inputFormat);
+  }
+
+  @Override
+  protected StructLikeSet rowSet(String tableName, Table testTable, String... columns) throws IOException {
+    Schema projected = testTable.schema().select(columns);
+    RowType rowType = FlinkSchemaUtil.convert(projected);
+
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(CatalogProperties.WAREHOUSE_LOCATION, hiveConf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname));
+    properties.put(CatalogProperties.URI, hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
+    properties.put(CatalogProperties.CLIENT_POOL_SIZE,
+        Integer.toString(hiveConf.getInt("iceberg.hive.client-pool-size", 5)));
+    CatalogLoader hiveCatalogLoader = CatalogLoader.hive(catalog.name(), hiveConf, properties);
+    TableLoader hiveTableLoader = TableLoader.fromCatalog(hiveCatalogLoader, TableIdentifier.of("default", tableName));
+    hiveTableLoader.open();
+    try (TableLoader tableLoader = hiveTableLoader) {
+      Configuration config = new Configuration();
+      Table table = tableLoader.loadTable();
+      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+      env.setParallelism(1);
+      DataStream<RowData> stream = env.fromSource(
+          IcebergSource.<RowData>builder()
+              .tableLoader(tableLoader)
+              .assignerFactory(new SimpleSplitAssignerFactory())
+              .readerFunction(new RowDataReaderFunction(config, table.schema(), projected,
+                  null, false, table.io(), table.encryption()))
+              .project(projected)
+              .build(),
+          WatermarkStrategy.noWatermarks(),
+          "testBasicRead",
+          TypeInformation.of(RowData.class));
+
+      try (CloseableIterator<RowData> iter = stream.executeAndCollect()) {
+        List<RowData> rowDataList = Lists.newArrayList(iter);
+        StructLikeSet set = StructLikeSet.create(projected.asStruct());
+        rowDataList.forEach(rowData -> {
+          RowDataWrapper wrapper = new RowDataWrapper(rowType, projected.asStruct());
+          set.add(wrapper.wrap(rowData));
+        });
+        return set;
+      } catch (Exception e) {
+        throw new IOException("Failed to collect result", e);
+      }
+    }
+  }
+
+}