You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2021/01/14 04:35:36 UTC

[iceberg] branch master updated: Flink: Support streaming reader. (#1793)

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 14331c4  Flink: Support streaming reader. (#1793)
14331c4 is described below

commit 14331c4e5f61e14cdc527a567f5dafc7fd95c3e7
Author: openinx <op...@gmail.com>
AuthorDate: Thu Jan 14 12:35:25 2021 +0800

    Flink: Support streaming reader. (#1793)
---
 .../apache/iceberg/data/GenericAppenderHelper.java |   2 +-
 .../iceberg/flink/source/FlinkInputFormat.java     |   4 +-
 .../iceberg/flink/source/FlinkInputSplit.java      |   9 +
 .../apache/iceberg/flink/source/FlinkSource.java   |  74 ++---
 .../iceberg/flink/source/FlinkSplitGenerator.java  |   6 +-
 .../apache/iceberg/flink/source/ScanContext.java   | 268 ++++++++++++------
 .../flink/source/StreamingMonitorFunction.java     | 177 ++++++++++++
 .../flink/source/StreamingReaderOperator.java      | 232 ++++++++++++++++
 .../org/apache/iceberg/flink/FlinkTestBase.java    |  24 +-
 .../org/apache/iceberg/flink/TestTableLoader.java  |   4 +
 .../iceberg/flink/source/TestStreamScanSql.java    | 241 +++++++++++++++++
 .../flink/source/TestStreamingMonitorFunction.java | 300 +++++++++++++++++++++
 .../flink/source/TestStreamingReaderOperator.java  | 283 +++++++++++++++++++
 13 files changed, 1491 insertions(+), 133 deletions(-)

diff --git a/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java b/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java
index c32be08..299135a 100644
--- a/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java
+++ b/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java
@@ -88,7 +88,7 @@ public class GenericAppenderHelper {
     return DataFiles.builder(table.spec())
         .withRecordCount(records.size())
         .withFileSizeInBytes(file.length())
-        .withPath(file.toURI().toString())
+        .withPath(Files.localInput(file).location())
         .withMetrics(appender.metrics())
         .withFormat(format)
         .withPartition(partition)
diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
index f25af65..1bad1c2 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
@@ -61,7 +61,7 @@ public class FlinkInputFormat extends RichInputFormat<RowData, FlinkInputSplit>
 
   @VisibleForTesting
   Schema projectedSchema() {
-    return context.projectedSchema();
+    return context.project();
   }
 
   @Override
@@ -92,7 +92,7 @@ public class FlinkInputFormat extends RichInputFormat<RowData, FlinkInputSplit>
   @Override
   public void open(FlinkInputSplit split) {
     this.iterator = new RowDataIterator(
-        split.getTask(), io, encryption, tableSchema, context.projectedSchema(), context.nameMapping(),
+        split.getTask(), io, encryption, tableSchema, context.project(), context.nameMapping(),
         context.caseSensitive());
   }
 
diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java
index 21a2f71..b59574f 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.flink.source;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 
 /**
  * TODO Implement {@link LocatableInputSplit}.
@@ -44,4 +45,12 @@ public class FlinkInputSplit implements InputSplit {
   CombinedScanTask getTask() {
     return task;
   }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("splitNumber", splitNumber)
+        .add("task", task)
+        .toString();
+  }
 }
diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
index e4b5907..95a7ba9 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
@@ -23,12 +23,12 @@ import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.List;
 import java.util.Map;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
-import org.apache.flink.table.types.logical.RowType;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableScan;
@@ -70,10 +70,7 @@ public class FlinkSource {
     private Table table;
     private TableLoader tableLoader;
     private TableSchema projectedSchema;
-    private long limit;
-    private ScanContext context = new ScanContext();
-
-    private RowDataTypeInfo rowTypeInfo;
+    private final ScanContext.Builder contextBuilder = ScanContext.builder();
 
     public Builder tableLoader(TableLoader newLoader) {
       this.tableLoader = newLoader;
@@ -91,7 +88,7 @@ public class FlinkSource {
     }
 
     public Builder filters(List<Expression> filters) {
-      this.context = context.filterRows(filters);
+      contextBuilder.filters(filters);
       return this;
     }
 
@@ -101,57 +98,62 @@ public class FlinkSource {
     }
 
     public Builder limit(long newLimit) {
-      this.limit = newLimit;
+      contextBuilder.limit(newLimit);
       return this;
     }
 
     public Builder properties(Map<String, String> properties) {
-      this.context = context.fromProperties(properties);
+      contextBuilder.fromProperties(properties);
       return this;
     }
 
     public Builder caseSensitive(boolean caseSensitive) {
-      this.context = context.setCaseSensitive(caseSensitive);
+      contextBuilder.caseSensitive(caseSensitive);
       return this;
     }
 
     public Builder snapshotId(Long snapshotId) {
-      this.context = context.useSnapshotId(snapshotId);
+      contextBuilder.useSnapshotId(snapshotId);
       return this;
     }
 
     public Builder startSnapshotId(Long startSnapshotId) {
-      this.context = context.startSnapshotId(startSnapshotId);
+      contextBuilder.startSnapshotId(startSnapshotId);
       return this;
     }
 
     public Builder endSnapshotId(Long endSnapshotId) {
-      this.context = context.endSnapshotId(endSnapshotId);
+      contextBuilder.endSnapshotId(endSnapshotId);
       return this;
     }
 
     public Builder asOfTimestamp(Long asOfTimestamp) {
-      this.context = context.asOfTimestamp(asOfTimestamp);
+      contextBuilder.asOfTimestamp(asOfTimestamp);
       return this;
     }
 
     public Builder splitSize(Long splitSize) {
-      this.context = context.splitSize(splitSize);
+      contextBuilder.splitSize(splitSize);
       return this;
     }
 
     public Builder splitLookback(Integer splitLookback) {
-      this.context = context.splitLookback(splitLookback);
+      contextBuilder.splitLookback(splitLookback);
       return this;
     }
 
     public Builder splitOpenFileCost(Long splitOpenFileCost) {
-      this.context = context.splitOpenFileCost(splitOpenFileCost);
+      contextBuilder.splitOpenFileCost(splitOpenFileCost);
+      return this;
+    }
+
+    public Builder streaming(boolean streaming) {
+      contextBuilder.streaming(streaming);
       return this;
     }
 
     public Builder nameMapping(String nameMapping) {
-      this.context = context.nameMapping(nameMapping);
+      contextBuilder.nameMapping(nameMapping);
       return this;
     }
 
@@ -178,35 +180,37 @@ public class FlinkSource {
         encryption = table.encryption();
       }
 
-      rowTypeInfo = RowDataTypeInfo.of((RowType) (
-          projectedSchema == null ?
-              FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergSchema)) :
-              projectedSchema).toRowDataType().getLogicalType());
-
-      context = context.project(projectedSchema == null ? icebergSchema :
-          FlinkSchemaUtil.convert(icebergSchema, projectedSchema));
-
-      context = context.limit(limit);
+      if (projectedSchema == null) {
+        contextBuilder.project(icebergSchema);
+      } else {
+        contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedSchema));
+      }
 
-      return new FlinkInputFormat(tableLoader, icebergSchema, io, encryption, context);
+      return new FlinkInputFormat(tableLoader, icebergSchema, io, encryption, contextBuilder.build());
     }
 
     public DataStream<RowData> build() {
       Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null");
       FlinkInputFormat format = buildFormat();
-      if (isBounded(context)) {
-        return env.createInput(format, rowTypeInfo);
+
+      ScanContext context = contextBuilder.build();
+      TypeInformation<RowData> typeInfo = RowDataTypeInfo.of(FlinkSchemaUtil.convert(context.project()));
+
+      if (!context.isStreaming()) {
+        return env.createInput(format, typeInfo);
       } else {
-        throw new UnsupportedOperationException("The Unbounded mode is not supported yet");
+        StreamingMonitorFunction function = new StreamingMonitorFunction(tableLoader, context);
+
+        String monitorFunctionName = String.format("Iceberg table (%s) monitor", table);
+        String readerOperatorName = String.format("Iceberg table (%s) reader", table);
+
+        return env.addSource(function, monitorFunctionName)
+            .transform(readerOperatorName, typeInfo, StreamingReaderOperator.factory(format));
       }
     }
   }
 
-  private static boolean isBounded(ScanContext context) {
-    return context.startSnapshotId() == null || context.endSnapshotId() != null;
-  }
-
   public static boolean isBounded(Map<String, String> properties) {
-    return isBounded(new ScanContext().fromProperties(properties));
+    return !ScanContext.builder().fromProperties(properties).build().isStreaming();
   }
 }
diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
index ade4cfb..f495e09 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
@@ -47,7 +47,7 @@ class FlinkSplitGenerator {
     TableScan scan = table
         .newScan()
         .caseSensitive(context.caseSensitive())
-        .project(context.projectedSchema());
+        .project(context.project());
 
     if (context.snapshotId() != null) {
       scan = scan.useSnapshot(context.snapshotId());
@@ -77,8 +77,8 @@ class FlinkSplitGenerator {
       scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, context.splitOpenFileCost().toString());
     }
 
-    if (context.filterExpressions() != null) {
-      for (Expression filter : context.filterExpressions()) {
+    if (context.filters() != null) {
+      for (Expression filter : context.filters()) {
         scan = scan.filter(filter);
       }
     }
diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index 42804a0..2896efb 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg.flink.source;
 
 import java.io.Serializable;
+import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import org.apache.flink.configuration.ConfigOption;
@@ -61,6 +62,12 @@ class ScanContext implements Serializable {
   private static final ConfigOption<Long> SPLIT_FILE_OPEN_COST =
       ConfigOptions.key("split-file-open-cost").longType().defaultValue(null);
 
+  private static final ConfigOption<Boolean> STREAMING =
+      ConfigOptions.key("streaming").booleanType().defaultValue(false);
+
+  private static final ConfigOption<Duration> MONITOR_INTERVAL =
+      ConfigOptions.key("monitor-interval").durationType().defaultValue(Duration.ofSeconds(10));
+
   private final boolean caseSensitive;
   private final Long snapshotId;
   private final Long startSnapshotId;
@@ -69,29 +76,18 @@ class ScanContext implements Serializable {
   private final Long splitSize;
   private final Integer splitLookback;
   private final Long splitOpenFileCost;
+  private final boolean isStreaming;
+  private final Duration monitorInterval;
+
   private final String nameMapping;
-  private final Schema projectedSchema;
-  private final List<Expression> filterExpressions;
-  private final Long limit;
-
-  ScanContext() {
-    this.caseSensitive = CASE_SENSITIVE.defaultValue();
-    this.snapshotId = SNAPSHOT_ID.defaultValue();
-    this.startSnapshotId = START_SNAPSHOT_ID.defaultValue();
-    this.endSnapshotId = END_SNAPSHOT_ID.defaultValue();
-    this.asOfTimestamp = AS_OF_TIMESTAMP.defaultValue();
-    this.splitSize = SPLIT_SIZE.defaultValue();
-    this.splitLookback = SPLIT_LOOKBACK.defaultValue();
-    this.splitOpenFileCost = SPLIT_FILE_OPEN_COST.defaultValue();
-    this.nameMapping = null;
-    this.projectedSchema = null;
-    this.filterExpressions = null;
-    this.limit = null;
-  }
+  private final Schema schema;
+  private final List<Expression> filters;
+  private final long limit;
 
   private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId, Long endSnapshotId,
-      Long asOfTimestamp, Long splitSize, Integer splitLookback, Long splitOpenFileCost,
-      String nameMapping, Schema projectedSchema, List<Expression> filterExpressions, Long limit) {
+                      Long asOfTimestamp, Long splitSize, Integer splitLookback, Long splitOpenFileCost,
+                      boolean isStreaming, Duration monitorInterval, String nameMapping,
+                      Schema schema, List<Expression> filters, long limit) {
     this.caseSensitive = caseSensitive;
     this.snapshotId = snapshotId;
     this.startSnapshotId = startSnapshotId;
@@ -100,126 +96,224 @@ class ScanContext implements Serializable {
     this.splitSize = splitSize;
     this.splitLookback = splitLookback;
     this.splitOpenFileCost = splitOpenFileCost;
+    this.isStreaming = isStreaming;
+    this.monitorInterval = monitorInterval;
+
     this.nameMapping = nameMapping;
-    this.projectedSchema = projectedSchema;
-    this.filterExpressions = filterExpressions;
+    this.schema = schema;
+    this.filters = filters;
     this.limit = limit;
   }
 
-  ScanContext fromProperties(Map<String, String> properties) {
-    Configuration config = new Configuration();
-    properties.forEach(config::setString);
-    return new ScanContext(config.get(CASE_SENSITIVE), config.get(SNAPSHOT_ID), config.get(START_SNAPSHOT_ID),
-        config.get(END_SNAPSHOT_ID), config.get(AS_OF_TIMESTAMP), config.get(SPLIT_SIZE), config.get(SPLIT_LOOKBACK),
-        config.get(SPLIT_FILE_OPEN_COST), properties.get(DEFAULT_NAME_MAPPING), projectedSchema, filterExpressions,
-        limit);
-  }
-
   boolean caseSensitive() {
     return caseSensitive;
   }
 
-  ScanContext setCaseSensitive(boolean isCaseSensitive) {
-    return new ScanContext(isCaseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize,
-        splitLookback, splitOpenFileCost, nameMapping, projectedSchema, filterExpressions, limit);
-  }
-
   Long snapshotId() {
     return snapshotId;
   }
 
-  ScanContext useSnapshotId(Long scanSnapshotId) {
-    return new ScanContext(caseSensitive, scanSnapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize,
-        splitLookback, splitOpenFileCost, nameMapping, projectedSchema, filterExpressions, limit);
-  }
-
   Long startSnapshotId() {
     return startSnapshotId;
   }
 
-  ScanContext startSnapshotId(Long id) {
-    return new ScanContext(caseSensitive, snapshotId, id, endSnapshotId, asOfTimestamp, splitSize, splitLookback,
-        splitOpenFileCost, nameMapping, projectedSchema, filterExpressions, limit);
-  }
-
   Long endSnapshotId() {
     return endSnapshotId;
   }
 
-  ScanContext endSnapshotId(Long id) {
-    return new ScanContext(caseSensitive, snapshotId, startSnapshotId, id, asOfTimestamp, splitSize, splitLookback,
-        splitOpenFileCost, nameMapping, projectedSchema, filterExpressions, limit);
-  }
-
   Long asOfTimestamp() {
     return asOfTimestamp;
   }
 
-  ScanContext asOfTimestamp(Long timestamp) {
-    return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, timestamp, splitSize,
-        splitLookback, splitOpenFileCost, nameMapping, projectedSchema, filterExpressions, limit);
-  }
-
   Long splitSize() {
     return splitSize;
   }
 
-  ScanContext splitSize(Long size) {
-    return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, size,
-        splitLookback, splitOpenFileCost, nameMapping, projectedSchema, filterExpressions, limit);
-  }
-
   Integer splitLookback() {
     return splitLookback;
   }
 
-  ScanContext splitLookback(Integer lookback) {
-    return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize,
-        lookback, splitOpenFileCost, nameMapping, projectedSchema, filterExpressions, limit);
-  }
-
   Long splitOpenFileCost() {
     return splitOpenFileCost;
   }
 
-  ScanContext splitOpenFileCost(Long fileCost) {
-    return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize,
-        splitLookback, fileCost, nameMapping, projectedSchema, filterExpressions, limit);
+  boolean isStreaming() {
+    return isStreaming;
+  }
+
+  Duration monitorInterval() {
+    return monitorInterval;
   }
 
   String nameMapping() {
     return nameMapping;
   }
 
-  ScanContext nameMapping(String mapping) {
-    return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize,
-        splitLookback, splitOpenFileCost, mapping, projectedSchema, filterExpressions, limit);
+  Schema project() {
+    return schema;
   }
 
-  Schema projectedSchema() {
-    return projectedSchema;
+  List<Expression> filters() {
+    return filters;
   }
 
-  ScanContext project(Schema schema) {
-    return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize,
-        splitLookback, splitOpenFileCost, nameMapping, schema, filterExpressions, limit);
+  long limit() {
+    return limit;
   }
 
-  List<Expression> filterExpressions() {
-    return filterExpressions;
+  ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotId) {
+    return ScanContext.builder()
+        .caseSensitive(caseSensitive)
+        .useSnapshotId(null)
+        .startSnapshotId(newStartSnapshotId)
+        .endSnapshotId(newEndSnapshotId)
+        .asOfTimestamp(null)
+        .splitSize(splitSize)
+        .splitLookback(splitLookback)
+        .splitOpenFileCost(splitOpenFileCost)
+        .streaming(isStreaming)
+        .monitorInterval(monitorInterval)
+        .nameMapping(nameMapping)
+        .project(schema)
+        .filters(filters)
+        .limit(limit)
+        .build();
   }
 
-  ScanContext filterRows(List<Expression> filters) {
-    return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize,
-        splitLookback, splitOpenFileCost, nameMapping, projectedSchema, filters, limit);
+  ScanContext copyWithSnapshotId(long newSnapshotId) {
+    return ScanContext.builder()
+        .caseSensitive(caseSensitive)
+        .useSnapshotId(newSnapshotId)
+        .startSnapshotId(null)
+        .endSnapshotId(null)
+        .asOfTimestamp(null)
+        .splitSize(splitSize)
+        .splitLookback(splitLookback)
+        .splitOpenFileCost(splitOpenFileCost)
+        .streaming(isStreaming)
+        .monitorInterval(monitorInterval)
+        .nameMapping(nameMapping)
+        .project(schema)
+        .filters(filters)
+        .limit(limit)
+        .build();
   }
 
-  long limit() {
-    return limit;
+  static Builder builder() {
+    return new Builder();
   }
 
-  ScanContext limit(Long newLimit) {
-    return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize,
-        splitLookback, splitOpenFileCost, nameMapping, projectedSchema, filterExpressions, newLimit);
+  static class Builder {
+    private boolean caseSensitive = CASE_SENSITIVE.defaultValue();
+    private Long snapshotId = SNAPSHOT_ID.defaultValue();
+    private Long startSnapshotId = START_SNAPSHOT_ID.defaultValue();
+    private Long endSnapshotId = END_SNAPSHOT_ID.defaultValue();
+    private Long asOfTimestamp = AS_OF_TIMESTAMP.defaultValue();
+    private Long splitSize = SPLIT_SIZE.defaultValue();
+    private Integer splitLookback = SPLIT_LOOKBACK.defaultValue();
+    private Long splitOpenFileCost = SPLIT_FILE_OPEN_COST.defaultValue();
+    private boolean isStreaming = STREAMING.defaultValue();
+    private Duration monitorInterval = MONITOR_INTERVAL.defaultValue();
+    private String nameMapping;
+    private Schema projectedSchema;
+    private List<Expression> filters;
+    private long limit = -1L;
+
+    private Builder() {
+    }
+
+    Builder caseSensitive(boolean newCaseSensitive) {
+      this.caseSensitive = newCaseSensitive;
+      return this;
+    }
+
+    Builder useSnapshotId(Long newSnapshotId) {
+      this.snapshotId = newSnapshotId;
+      return this;
+    }
+
+    Builder startSnapshotId(Long newStartSnapshotId) {
+      this.startSnapshotId = newStartSnapshotId;
+      return this;
+    }
+
+    Builder endSnapshotId(Long newEndSnapshotId) {
+      this.endSnapshotId = newEndSnapshotId;
+      return this;
+    }
+
+    Builder asOfTimestamp(Long newAsOfTimestamp) {
+      this.asOfTimestamp = newAsOfTimestamp;
+      return this;
+    }
+
+    Builder splitSize(Long newSplitSize) {
+      this.splitSize = newSplitSize;
+      return this;
+    }
+
+    Builder splitLookback(Integer newSplitLookback) {
+      this.splitLookback = newSplitLookback;
+      return this;
+    }
+
+    Builder splitOpenFileCost(Long newSplitOpenFileCost) {
+      this.splitOpenFileCost = newSplitOpenFileCost;
+      return this;
+    }
+
+    Builder streaming(boolean streaming) {
+      this.isStreaming = streaming;
+      return this;
+    }
+
+    Builder monitorInterval(Duration newMonitorInterval) {
+      this.monitorInterval = newMonitorInterval;
+      return this;
+    }
+
+    Builder nameMapping(String newNameMapping) {
+      this.nameMapping = newNameMapping;
+      return this;
+    }
+
+    Builder project(Schema newProjectedSchema) {
+      this.projectedSchema = newProjectedSchema;
+      return this;
+    }
+
+    Builder filters(List<Expression> newFilters) {
+      this.filters = newFilters;
+      return this;
+    }
+
+    Builder limit(long newLimit) {
+      this.limit = newLimit;
+      return this;
+    }
+
+    Builder fromProperties(Map<String, String> properties) {
+      Configuration config = new Configuration();
+      properties.forEach(config::setString);
+
+      return this.useSnapshotId(config.get(SNAPSHOT_ID))
+          .caseSensitive(config.get(CASE_SENSITIVE))
+          .asOfTimestamp(config.get(AS_OF_TIMESTAMP))
+          .startSnapshotId(config.get(START_SNAPSHOT_ID))
+          .endSnapshotId(config.get(END_SNAPSHOT_ID))
+          .splitSize(config.get(SPLIT_SIZE))
+          .splitLookback(config.get(SPLIT_LOOKBACK))
+          .splitOpenFileCost(config.get(SPLIT_FILE_OPEN_COST))
+          .streaming(config.get(STREAMING))
+          .monitorInterval(config.get(MONITOR_INTERVAL))
+          .nameMapping(properties.get(DEFAULT_NAME_MAPPING));
+    }
+
+    public ScanContext build() {
+      return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
+          endSnapshotId, asOfTimestamp, splitSize, splitLookback,
+          splitOpenFileCost, isStreaming, monitorInterval, nameMapping, projectedSchema,
+          filters, limit);
+    }
   }
 }
diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java b/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
new file mode 100644
index 0000000..b31426a
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the single (non-parallel) monitoring task which takes a {@link FlinkInputFormat},
+ * it is responsible for:
+ *
+ * <ol>
+ *     <li>Monitoring snapshots of the Iceberg table.</li>
+ *     <li>Creating the {@link FlinkInputSplit splits} corresponding to the incremental files</li>
+ *     <li>Assigning them to downstream tasks for further processing.</li>
+ * </ol>
+ *
+ * <p>The splits to be read are forwarded to the downstream {@link StreamingReaderOperator}
+ * which can have parallelism greater than one.
+ */
+public class StreamingMonitorFunction extends RichSourceFunction<FlinkInputSplit> implements CheckpointedFunction {
+
+  private static final Logger LOG = LoggerFactory.getLogger(StreamingMonitorFunction.class);
+
+  private static final long INIT_LAST_SNAPSHOT_ID = -1L;
+
+  private final TableLoader tableLoader;
+  private final ScanContext scanContext;
+
+  private volatile boolean isRunning = true;
+
+  // The checkpoint thread is not the same thread that running the function for SourceStreamTask now. It's necessary to
+  // mark this as volatile.
+  private volatile long lastSnapshotId = INIT_LAST_SNAPSHOT_ID;
+
+  private transient SourceContext<FlinkInputSplit> sourceContext;
+  private transient Table table;
+  private transient ListState<Long> lastSnapshotIdState;
+
+  public StreamingMonitorFunction(TableLoader tableLoader, ScanContext scanContext) {
+    Preconditions.checkArgument(scanContext.snapshotId() == null,
+        "Cannot set snapshot-id option for streaming reader");
+    Preconditions.checkArgument(scanContext.asOfTimestamp() == null,
+        "Cannot set as-of-timestamp option for streaming reader");
+    Preconditions.checkArgument(scanContext.endSnapshotId() == null,
+        "Cannot set end-snapshot-id option for streaming reader");
+    this.tableLoader = tableLoader;
+    this.scanContext = scanContext;
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext context) throws Exception {
+    // Load iceberg table from table loader.
+    tableLoader.open();
+    table = tableLoader.loadTable();
+
+    // Initialize the flink state for last snapshot id.
+    lastSnapshotIdState = context.getOperatorStateStore().getListState(
+        new ListStateDescriptor<>(
+            "snapshot-id-state",
+            LongSerializer.INSTANCE));
+
+    // Restore the last-snapshot-id from flink's state if possible.
+    if (context.isRestored()) {
+      LOG.info("Restoring state for the {}.", getClass().getSimpleName());
+      lastSnapshotId = lastSnapshotIdState.get().iterator().next();
+    } else if (scanContext.startSnapshotId() != null) {
+      Preconditions.checkNotNull(table.currentSnapshot(), "Don't have any available snapshot in table.");
+
+      long currentSnapshotId = table.currentSnapshot().snapshotId();
+      Preconditions.checkState(SnapshotUtil.ancestorOf(table, currentSnapshotId, scanContext.startSnapshotId()),
+          "The option start-snapshot-id %s is not an ancestor of the current snapshot.", scanContext.startSnapshotId());
+
+      lastSnapshotId = scanContext.startSnapshotId();
+    }
+  }
+
+  @Override
+  public void snapshotState(FunctionSnapshotContext context) throws Exception {
+    lastSnapshotIdState.clear();
+    lastSnapshotIdState.add(lastSnapshotId);
+  }
+
+  @Override
+  public void run(SourceContext<FlinkInputSplit> ctx) throws Exception {
+    this.sourceContext = ctx;
+    while (isRunning) {
+      synchronized (sourceContext.getCheckpointLock()) {
+        if (isRunning) {
+          monitorAndForwardSplits();
+        }
+      }
+      Thread.sleep(scanContext.monitorInterval().toMillis());
+    }
+  }
+
+  private void monitorAndForwardSplits() {
+    // Refresh the table to get the latest committed snapshot.
+    table.refresh();
+
+    Snapshot snapshot = table.currentSnapshot();
+    if (snapshot != null && snapshot.snapshotId() != lastSnapshotId) {
+      long snapshotId = snapshot.snapshotId();
+
+      ScanContext newScanContext;
+      if (lastSnapshotId == INIT_LAST_SNAPSHOT_ID) {
+        newScanContext = scanContext.copyWithSnapshotId(snapshotId);
+      } else {
+        newScanContext = scanContext.copyWithAppendsBetween(lastSnapshotId, snapshotId);
+      }
+
+      FlinkInputSplit[] splits = FlinkSplitGenerator.createInputSplits(table, newScanContext);
+      for (FlinkInputSplit split : splits) {
+        sourceContext.collect(split);
+      }
+
+      lastSnapshotId = snapshotId;
+    }
+  }
+
+  @Override
+  public void cancel() {
+    // this is to cover the case where cancel() is called before the run()
+    if (sourceContext != null) {
+      synchronized (sourceContext.getCheckpointLock()) {
+        isRunning = false;
+      }
+    } else {
+      isRunning = false;
+    }
+
+    // Release all the resources here.
+    if (tableLoader != null) {
+      try {
+        tableLoader.close();
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    cancel();
+  }
+}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java b/flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java
new file mode 100644
index 0000000..235b173
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import java.util.Queue;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.state.JavaSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.MailboxExecutor;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.api.operators.StreamSourceContexts;
+import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The operator that reads the {@link FlinkInputSplit splits} received from the preceding {@link
+ * StreamingMonitorFunction}. Contrary to the {@link StreamingMonitorFunction} which has a parallelism of 1,
+ * this operator can have multiple parallelism.
+ *
+ * <p>As soon as a split descriptor is received, it is put in a queue, and use {@link MailboxExecutor}
+ * read the actual data of the split. This architecture allows the separation of the reading thread from the one split
+ * processing the checkpoint barriers, thus removing any potential back-pressure.
+ */
+public class StreamingReaderOperator extends AbstractStreamOperator<RowData>
+    implements OneInputStreamOperator<FlinkInputSplit, RowData> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(StreamingReaderOperator.class);
+
+  // It's the same thread that is running this operator and checkpoint actions. we use this executor to schedule only
+  // one split for future reading, so that a new checkpoint could be triggered without blocking long time for exhausting
+  // all scheduled splits.
+  private final MailboxExecutor executor;
+  private FlinkInputFormat format;
+
+  private transient SourceFunction.SourceContext<RowData> sourceContext;
+
+  private transient ListState<FlinkInputSplit> inputSplitsState;
+  private transient Queue<FlinkInputSplit> splits;
+
+  // Splits are read by the same thread that calls processElement. Each read task is submitted to that thread by adding
+  // them to the executor. This state is used to ensure that only one read task is in that queue at a time, so that read
+  // tasks do not accumulate ahead of checkpoint tasks. When there is a read task in the queue, this is set to RUNNING.
+  // When there are no more files to read, this will be set to IDLE.
+  private transient SplitState currentSplitState;
+
+  private StreamingReaderOperator(FlinkInputFormat format, ProcessingTimeService timeService,
+                                  MailboxExecutor mailboxExecutor) {
+    this.format = Preconditions.checkNotNull(format, "The InputFormat should not be null.");
+    this.processingTimeService = timeService;
+    this.executor = Preconditions.checkNotNull(mailboxExecutor, "The mailboxExecutor should not be null.");
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws Exception {
+    super.initializeState(context);
+
+    // TODO Replace Java serialization with Avro approach to keep state compatibility.
+    // See issue: https://github.com/apache/iceberg/issues/1698
+    inputSplitsState = context.getOperatorStateStore().getListState(
+        new ListStateDescriptor<>("splits", new JavaSerializer<>()));
+
+    // Initialize the current split state to IDLE.
+    currentSplitState = SplitState.IDLE;
+
+    // Recover splits state from flink state backend if possible.
+    splits = Lists.newLinkedList();
+    if (context.isRestored()) {
+      int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
+      LOG.info("Restoring state for the {} (taskIdx: {}).", getClass().getSimpleName(), subtaskIdx);
+
+      for (FlinkInputSplit split : inputSplitsState.get()) {
+        splits.add(split);
+      }
+    }
+
+    this.sourceContext = StreamSourceContexts.getSourceContext(
+        getOperatorConfig().getTimeCharacteristic(),
+        getProcessingTimeService(),
+        new Object(), // no actual locking needed
+        getContainingTask().getStreamStatusMaintainer(),
+        output,
+        getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(),
+        -1);
+
+    // Enqueue to process the recovered input splits.
+    enqueueProcessSplits();
+  }
+
+  @Override
+  public void snapshotState(StateSnapshotContext context) throws Exception {
+    super.snapshotState(context);
+
+    inputSplitsState.clear();
+    inputSplitsState.addAll(Lists.newArrayList(splits));
+  }
+
+  @Override
+  public void processElement(StreamRecord<FlinkInputSplit> element) {
+    splits.add(element.getValue());
+    enqueueProcessSplits();
+  }
+
+  private void enqueueProcessSplits() {
+    if (currentSplitState == SplitState.IDLE && !splits.isEmpty()) {
+      currentSplitState = SplitState.RUNNING;
+      executor.execute(this::processSplits, this.getClass().getSimpleName());
+    }
+  }
+
+  private void processSplits() throws IOException {
+    FlinkInputSplit split = splits.poll();
+    if (split == null) {
+      currentSplitState = SplitState.IDLE;
+      return;
+    }
+
+    format.open(split);
+    try {
+      RowData nextElement = null;
+      while (!format.reachedEnd()) {
+        nextElement = format.nextRecord(nextElement);
+        sourceContext.collect(nextElement);
+      }
+    } finally {
+      currentSplitState = SplitState.IDLE;
+      format.close();
+    }
+
+    // Re-schedule to process the next split.
+    enqueueProcessSplits();
+  }
+
+  @Override
+  public void processWatermark(Watermark mark) {
+    // we do nothing because we emit our own watermarks if needed.
+  }
+
+  @Override
+  public void dispose() throws Exception {
+    super.dispose();
+
+    if (format != null) {
+      format.close();
+      format.closeInputFormat();
+      format = null;
+    }
+
+    sourceContext = null;
+  }
+
+  @Override
+  public void close() throws Exception {
+    super.close();
+    output.close();
+    if (sourceContext != null) {
+      sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
+      sourceContext.close();
+      sourceContext = null;
+    }
+  }
+
+  static OneInputStreamOperatorFactory<FlinkInputSplit, RowData> factory(FlinkInputFormat format) {
+    return new OperatorFactory(format);
+  }
+
+  private enum SplitState {
+    IDLE, RUNNING
+  }
+
+  private static class OperatorFactory extends AbstractStreamOperatorFactory<RowData>
+      implements YieldingOperatorFactory<RowData>, OneInputStreamOperatorFactory<FlinkInputSplit, RowData> {
+
+    private final FlinkInputFormat format;
+
+    private transient MailboxExecutor mailboxExecutor;
+
+    private OperatorFactory(FlinkInputFormat format) {
+      this.format = format;
+    }
+
+    @Override
+    public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
+      this.mailboxExecutor = mailboxExecutor;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <O extends StreamOperator<RowData>> O createStreamOperator(StreamOperatorParameters<RowData> parameters) {
+      StreamingReaderOperator operator = new StreamingReaderOperator(format, processingTimeService, mailboxExecutor);
+      operator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
+      return (O) operator;
+    }
+
+    @Override
+    public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
+      return StreamingReaderOperator.class;
+    }
+  }
+}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
index 6782267..8302be3 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
@@ -72,8 +72,17 @@ public abstract class FlinkTestBase extends AbstractTestBase {
     return tEnv;
   }
 
+  protected static TableResult exec(TableEnvironment env, String query, Object... args) {
+    return env.executeSql(String.format(query, args));
+  }
+
+  protected TableResult exec(String query, Object... args) {
+    return exec(getTableEnv(), query, args);
+  }
+
   protected List<Object[]> sql(String query, Object... args) {
-    TableResult tableResult = getTableEnv().executeSql(String.format(query, args));
+    TableResult tableResult = exec(String.format(query, args));
+
     tableResult.getJobClient().ifPresent(c -> {
       try {
         c.getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
@@ -81,12 +90,17 @@ public abstract class FlinkTestBase extends AbstractTestBase {
         throw new RuntimeException(e);
       }
     });
-    CloseableIterator<Row> iter = tableResult.collect();
+
     List<Object[]> results = Lists.newArrayList();
-    while (iter.hasNext()) {
-      Row row = iter.next();
-      results.add(IntStream.range(0, row.getArity()).mapToObj(row::getField).toArray(Object[]::new));
+    try (CloseableIterator<Row> iter = tableResult.collect()) {
+      while (iter.hasNext()) {
+        Row row = iter.next();
+        results.add(IntStream.range(0, row.getArity()).mapToObj(row::getField).toArray(Object[]::new));
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
     }
+
     return results;
   }
 }
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestTableLoader.java b/flink/src/test/java/org/apache/iceberg/flink/TestTableLoader.java
index f3df428..5f7ae29 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestTableLoader.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestTableLoader.java
@@ -26,6 +26,10 @@ import org.apache.iceberg.TestTables;
 public class TestTableLoader implements TableLoader {
   private File dir;
 
+  public static TableLoader of(String dir) {
+    return new TestTableLoader(dir);
+  }
+
   public TestTableLoader(String dir) {
     this.dir = new File(dir);
   }
diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
new file mode 100644
index 0000000..91ce983
--- /dev/null
+++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.FlinkCatalogTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestStreamScanSql extends FlinkCatalogTestBase {
+  private static final String TABLE = "test_table";
+  private static final FileFormat FORMAT = FileFormat.PARQUET;
+
+  private TableEnvironment tEnv;
+
+  public TestStreamScanSql(String catalogName, Namespace baseNamespace) {
+    super(catalogName, baseNamespace);
+  }
+
+  @Override
+  protected TableEnvironment getTableEnv() {
+    if (tEnv == null) {
+      synchronized (this) {
+        if (tEnv == null) {
+          EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings
+              .newInstance()
+              .useBlinkPlanner()
+              .inStreamingMode();
+
+          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+          env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+          env.enableCheckpointing(400);
+
+          StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(env, settingsBuilder.build());
+          streamTableEnv.getConfig()
+              .getConfiguration()
+              .set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true);
+          tEnv = streamTableEnv;
+        }
+      }
+    }
+    return tEnv;
+  }
+
+  @Before
+  public void before() {
+    super.before();
+    sql("CREATE DATABASE %s", flinkDatabase);
+    sql("USE CATALOG %s", catalogName);
+    sql("USE %s", DATABASE);
+  }
+
+  @After
+  public void clean() {
+    sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE);
+    sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+    super.clean();
+  }
+
+  private void insertRows(String partition, Table table, Row... rows) throws IOException {
+    GenericAppenderHelper appender = new GenericAppenderHelper(table, FORMAT, TEMPORARY_FOLDER);
+
+    GenericRecord gRecord = GenericRecord.create(table.schema());
+    List<Record> records = Lists.newArrayList();
+    for (Row row : rows) {
+      records.add(gRecord.copy(
+          "id", row.getField(0),
+          "data", row.getField(1),
+          "dt", row.getField(2)
+      ));
+    }
+
+    if (partition != null) {
+      appender.appendToTable(TestHelpers.Row.of(partition, 0), records);
+    } else {
+      appender.appendToTable(records);
+    }
+  }
+
+  private void insertRows(Table table, Row... rows) throws IOException {
+    insertRows(null, table, rows);
+  }
+
+  private void assertRows(List<Row> expectedRows, Iterator<Row> iterator) {
+    for (Row expectedRow : expectedRows) {
+      Assert.assertTrue("Should have more records", iterator.hasNext());
+
+      Row actualRow = iterator.next();
+      Assert.assertEquals("Should have expected fields", 3, actualRow.getArity());
+      Assert.assertEquals("Should have expected id", expectedRow.getField(0), actualRow.getField(0));
+      Assert.assertEquals("Should have expected data", expectedRow.getField(1), actualRow.getField(1));
+      Assert.assertEquals("Should have expected dt", expectedRow.getField(2), actualRow.getField(2));
+    }
+  }
+
+  @Test
+  public void testUnPartitionedTable() throws Exception {
+    sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
+    Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+
+    TableResult result = exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/", TABLE);
+    try (CloseableIterator<Row> iterator = result.collect()) {
+
+      Row row1 = Row.of(1, "aaa", "2021-01-01");
+      insertRows(table, row1);
+      assertRows(ImmutableList.of(row1), iterator);
+
+      Row row2 = Row.of(2, "bbb", "2021-01-01");
+      insertRows(table, row2);
+      assertRows(ImmutableList.of(row2), iterator);
+    }
+    result.getJobClient().ifPresent(JobClient::cancel);
+  }
+
+
+  @Test
+  public void testPartitionedTable() throws Exception {
+    sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR) PARTITIONED BY (dt)", TABLE);
+    Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+
+    TableResult result = exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/", TABLE);
+    try (CloseableIterator<Row> iterator = result.collect()) {
+      Row row1 = Row.of(1, "aaa", "2021-01-01");
+      insertRows("2021-01-01", table, row1);
+      assertRows(ImmutableList.of(row1), iterator);
+
+      Row row2 = Row.of(2, "bbb", "2021-01-02");
+      insertRows("2021-01-02", table, row2);
+      assertRows(ImmutableList.of(row2), iterator);
+
+      Row row3 = Row.of(1, "aaa", "2021-01-02");
+      insertRows("2021-01-02", table, row3);
+      assertRows(ImmutableList.of(row3), iterator);
+
+      Row row4 = Row.of(2, "bbb", "2021-01-01");
+      insertRows("2021-01-01", table, row4);
+      assertRows(ImmutableList.of(row4), iterator);
+    }
+    result.getJobClient().ifPresent(JobClient::cancel);
+  }
+
+  @Test
+  public void testConsumeFromBeginning() throws Exception {
+    sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
+    Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+
+    Row row1 = Row.of(1, "aaa", "2021-01-01");
+    Row row2 = Row.of(2, "bbb", "2021-01-01");
+    insertRows(table, row1, row2);
+
+    TableResult result = exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/", TABLE);
+    try (CloseableIterator<Row> iterator = result.collect()) {
+      assertRows(ImmutableList.of(row1, row2), iterator);
+
+      Row row3 = Row.of(3, "ccc", "2021-01-01");
+      insertRows(table, row3);
+      assertRows(ImmutableList.of(row3), iterator);
+
+      Row row4 = Row.of(4, "ddd", "2021-01-01");
+      insertRows(table, row4);
+      assertRows(ImmutableList.of(row4), iterator);
+    }
+    result.getJobClient().ifPresent(JobClient::cancel);
+  }
+
+  @Test
+  public void testConsumeFromStartSnapshotId() throws Exception {
+    sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
+    Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+
+    // Produce two snapshots.
+    Row row1 = Row.of(1, "aaa", "2021-01-01");
+    Row row2 = Row.of(2, "bbb", "2021-01-01");
+    insertRows(table, row1);
+    insertRows(table, row2);
+
+    long startSnapshotId = table.currentSnapshot().snapshotId();
+
+    Row row3 = Row.of(3, "ccc", "2021-01-01");
+    Row row4 = Row.of(4, "ddd", "2021-01-01");
+    insertRows(table, row3, row4);
+
+    TableResult result = exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', " +
+        "'start-snapshot-id'='%d')*/", TABLE, startSnapshotId);
+    try (CloseableIterator<Row> iterator = result.collect()) {
+      // The row2 in start snapshot will be excluded.
+      assertRows(ImmutableList.of(row3, row4), iterator);
+
+      Row row5 = Row.of(5, "eee", "2021-01-01");
+      Row row6 = Row.of(6, "fff", "2021-01-01");
+      insertRows(table, row5, row6);
+      assertRows(ImmutableList.of(row5, row6), iterator);
+
+      Row row7 = Row.of(7, "ggg", "2021-01-01");
+      insertRows(table, row7);
+      assertRows(ImmutableList.of(row7), iterator);
+    }
+    result.getJobClient().ifPresent(JobClient::cancel);
+  }
+}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
new file mode 100644
index 0000000..dcd41dc
--- /dev/null
+++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.TestTableLoader;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestStreamingMonitorFunction extends TableTestBase {
+
+  private static final Schema SCHEMA = new Schema(
+      Types.NestedField.required(1, "id", Types.IntegerType.get()),
+      Types.NestedField.required(2, "data", Types.StringType.get())
+  );
+  private static final FileFormat DEFAULT_FORMAT = FileFormat.PARQUET;
+  private static final long WAIT_TIME_MILLIS = 10 * 1000L;
+
+  @Parameterized.Parameters(name = "FormatVersion={0}")
+  public static Iterable<Object[]> parameters() {
+    return ImmutableList.of(
+        new Object[] {1},
+        new Object[] {2}
+    );
+  }
+
+  public TestStreamingMonitorFunction(int formatVersion) {
+    super(formatVersion);
+  }
+
+  @Before
+  @Override
+  public void setupTable() throws IOException {
+    this.tableDir = temp.newFolder();
+    this.metadataDir = new File(tableDir, "metadata");
+    Assert.assertTrue(tableDir.delete());
+
+    // Construct the iceberg table.
+    table = create(SCHEMA, PartitionSpec.unpartitioned());
+  }
+
+  private void runSourceFunctionInTask(TestSourceContext sourceContext, StreamingMonitorFunction function) {
+    Thread task = new Thread(() -> {
+      try {
+        function.run(sourceContext);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    });
+    task.start();
+  }
+
+  @Test
+  public void testConsumeWithoutStartSnapshotId() throws Exception {
+    List<List<Record>> recordsList = generateRecordsAndCommitTxn(10);
+    ScanContext scanContext = ScanContext.builder()
+        .monitorInterval(Duration.ofMillis(100))
+        .build();
+
+    StreamingMonitorFunction function = createFunction(scanContext);
+    try (AbstractStreamOperatorTestHarness<FlinkInputSplit> harness = createHarness(function)) {
+      harness.setup();
+      harness.open();
+
+      CountDownLatch latch = new CountDownLatch(1);
+      TestSourceContext sourceContext = new TestSourceContext(latch);
+      runSourceFunctionInTask(sourceContext, function);
+
+      Assert.assertTrue("Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
+      Thread.sleep(1000L);
+
+      // Stop the stream task.
+      function.close();
+
+      Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size());
+      TestFlinkScan.assertRecords(sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA);
+    }
+  }
+
+  @Test
+  public void testConsumeFromStartSnapshotId() throws Exception {
+    // Commit the first five transactions.
+    generateRecordsAndCommitTxn(5);
+    long startSnapshotId = table.currentSnapshot().snapshotId();
+
+    // Commit the next five transactions.
+    List<List<Record>> recordsList = generateRecordsAndCommitTxn(5);
+
+    ScanContext scanContext = ScanContext.builder()
+        .monitorInterval(Duration.ofMillis(100))
+        .startSnapshotId(startSnapshotId)
+        .build();
+
+    StreamingMonitorFunction function = createFunction(scanContext);
+    try (AbstractStreamOperatorTestHarness<FlinkInputSplit> harness = createHarness(function)) {
+      harness.setup();
+      harness.open();
+
+      CountDownLatch latch = new CountDownLatch(1);
+      TestSourceContext sourceContext = new TestSourceContext(latch);
+      runSourceFunctionInTask(sourceContext, function);
+
+      Assert.assertTrue("Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
+      Thread.sleep(1000L);
+
+      // Stop the stream task.
+      function.close();
+
+      Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size());
+      TestFlinkScan.assertRecords(sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA);
+    }
+  }
+
+  @Test
+  public void testCheckpointRestore() throws Exception {
+    List<List<Record>> recordsList = generateRecordsAndCommitTxn(10);
+    ScanContext scanContext = ScanContext.builder()
+        .monitorInterval(Duration.ofMillis(100))
+        .build();
+
+    StreamingMonitorFunction func = createFunction(scanContext);
+    OperatorSubtaskState state;
+    try (AbstractStreamOperatorTestHarness<FlinkInputSplit> harness = createHarness(func)) {
+      harness.setup();
+      harness.open();
+
+      CountDownLatch latch = new CountDownLatch(1);
+      TestSourceContext sourceContext = new TestSourceContext(latch);
+      runSourceFunctionInTask(sourceContext, func);
+
+      Assert.assertTrue("Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
+      Thread.sleep(1000L);
+
+      state = harness.snapshot(1, 1);
+
+      // Stop the stream task.
+      func.close();
+
+      Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size());
+      TestFlinkScan.assertRecords(sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA);
+    }
+
+    List<List<Record>> newRecordsList = generateRecordsAndCommitTxn(10);
+    StreamingMonitorFunction newFunc = createFunction(scanContext);
+    try (AbstractStreamOperatorTestHarness<FlinkInputSplit> harness = createHarness(newFunc)) {
+      harness.setup();
+      // Recover to process the remaining snapshots.
+      harness.initializeState(state);
+      harness.open();
+
+      CountDownLatch latch = new CountDownLatch(1);
+      TestSourceContext sourceContext = new TestSourceContext(latch);
+      runSourceFunctionInTask(sourceContext, newFunc);
+
+      Assert.assertTrue("Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
+      Thread.sleep(1000L);
+
+      // Stop the stream task.
+      newFunc.close();
+
+      Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size());
+      TestFlinkScan.assertRecords(sourceContext.toRows(), Lists.newArrayList(Iterables.concat(newRecordsList)), SCHEMA);
+    }
+  }
+
+  private List<List<Record>> generateRecordsAndCommitTxn(int commitTimes) throws IOException {
+    List<List<Record>> expectedRecords = Lists.newArrayList();
+    for (int i = 0; i < commitTimes; i++) {
+      List<Record> records = RandomGenericData.generate(SCHEMA, 100, 0L);
+      expectedRecords.add(records);
+
+      // Commit those records to iceberg table.
+      writeRecords(records);
+    }
+    return expectedRecords;
+  }
+
+  private void writeRecords(List<Record> records) throws IOException {
+    GenericAppenderHelper appender = new GenericAppenderHelper(table, DEFAULT_FORMAT, temp);
+    appender.appendToTable(records);
+  }
+
+  private StreamingMonitorFunction createFunction(ScanContext scanContext) {
+    return new StreamingMonitorFunction(TestTableLoader.of(tableDir.getAbsolutePath()), scanContext);
+  }
+
+  private AbstractStreamOperatorTestHarness<FlinkInputSplit> createHarness(StreamingMonitorFunction function)
+      throws Exception {
+    StreamSource<FlinkInputSplit, StreamingMonitorFunction> streamSource = new StreamSource<>(function);
+    return new AbstractStreamOperatorTestHarness<>(streamSource, 1, 1, 0);
+  }
+
+  private class TestSourceContext implements SourceFunction.SourceContext<FlinkInputSplit> {
+    private final List<FlinkInputSplit> splits = Lists.newArrayList();
+    private final Object checkpointLock = new Object();
+    private final CountDownLatch latch;
+
+    TestSourceContext(CountDownLatch latch) {
+      this.latch = latch;
+    }
+
+    @Override
+    public void collect(FlinkInputSplit element) {
+      splits.add(element);
+      latch.countDown();
+    }
+
+    @Override
+    public void collectWithTimestamp(FlinkInputSplit element, long timestamp) {
+      collect(element);
+    }
+
+    @Override
+    public void emitWatermark(Watermark mark) {
+
+    }
+
+    @Override
+    public void markAsTemporarilyIdle() {
+
+    }
+
+    @Override
+    public Object getCheckpointLock() {
+      return checkpointLock;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    private List<Row> toRows() throws IOException {
+      FlinkInputFormat format = FlinkSource.forRowData()
+          .tableLoader(TestTableLoader.of(tableDir.getAbsolutePath()))
+          .buildFormat();
+
+      List<Row> rows = Lists.newArrayList();
+      for (FlinkInputSplit split : splits) {
+        format.open(split);
+
+        RowData element = null;
+        try {
+          while (!format.reachedEnd()) {
+            element = format.nextRecord(element);
+            rows.add(Row.of(element.getInt(0), element.getString(1).toString()));
+          }
+        } finally {
+          format.close();
+        }
+      }
+
+      return rows;
+    }
+  }
+}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java
new file mode 100644
index 0000000..112f021
--- /dev/null
+++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
+import org.apache.flink.streaming.runtime.tasks.mailbox.SteppingMailboxProcessor;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.TestTableLoader;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestStreamingReaderOperator extends TableTestBase {
+
+  private static final Schema SCHEMA = new Schema(
+      Types.NestedField.required(1, "id", Types.IntegerType.get()),
+      Types.NestedField.required(2, "data", Types.StringType.get())
+  );
+  private static final FileFormat DEFAULT_FORMAT = FileFormat.PARQUET;
+
+  @Parameterized.Parameters(name = "FormatVersion={0}")
+  public static Iterable<Object[]> parameters() {
+    return ImmutableList.of(
+        new Object[] {1},
+        new Object[] {2}
+    );
+  }
+
+  public TestStreamingReaderOperator(int formatVersion) {
+    super(formatVersion);
+  }
+
+  @Before
+  @Override
+  public void setupTable() throws IOException {
+    this.tableDir = temp.newFolder();
+    this.metadataDir = new File(tableDir, "metadata");
+    Assert.assertTrue(tableDir.delete());
+
+    // Construct the iceberg table.
+    table = create(SCHEMA, PartitionSpec.unpartitioned());
+  }
+
+  @Test
+  public void testProcessAllRecords() throws Exception {
+    List<List<Record>> expectedRecords = generateRecordsAndCommitTxn(10);
+
+    List<FlinkInputSplit> splits = generateSplits();
+    Assert.assertEquals("Should have 10 splits", 10, splits.size());
+
+    try (OneInputStreamOperatorTestHarness<FlinkInputSplit, RowData> harness = createReader()) {
+      harness.setup();
+      harness.open();
+
+      SteppingMailboxProcessor processor = createLocalMailbox(harness);
+
+      List<Record> expected = Lists.newArrayList();
+      for (int i = 0; i < splits.size(); i++) {
+        // Process this element to enqueue to mail-box.
+        harness.processElement(splits.get(i), -1);
+
+        // Run the mail-box once to read all records from the given split.
+        Assert.assertTrue("Should processed 1 split", processor.runMailboxStep());
+
+        // Assert the output has expected elements.
+        expected.addAll(expectedRecords.get(i));
+        TestFlinkScan.assertRecords(readOutputValues(harness), expected, SCHEMA);
+      }
+    }
+  }
+
+  @Test
+  public void testTriggerCheckpoint() throws Exception {
+    // Received emitted splits: split1, split2, split3, checkpoint request is triggered when reading records from
+    // split1.
+    List<List<Record>> expectedRecords = generateRecordsAndCommitTxn(3);
+
+    List<FlinkInputSplit> splits = generateSplits();
+    Assert.assertEquals("Should have 3 splits", 3, splits.size());
+
+    long timestamp = 0;
+    try (OneInputStreamOperatorTestHarness<FlinkInputSplit, RowData> harness = createReader()) {
+      harness.setup();
+      harness.open();
+
+      SteppingMailboxProcessor processor = createLocalMailbox(harness);
+
+      harness.processElement(splits.get(0), ++timestamp);
+      harness.processElement(splits.get(1), ++timestamp);
+      harness.processElement(splits.get(2), ++timestamp);
+
+      // Trigger snapshot state, it will start to work once all records from split0 are read.
+      processor.getMainMailboxExecutor()
+          .execute(() -> harness.snapshot(1, 3), "Trigger snapshot");
+
+      Assert.assertTrue("Should have processed the split0", processor.runMailboxStep());
+      Assert.assertTrue("Should have processed the snapshot state action", processor.runMailboxStep());
+
+      TestFlinkScan.assertRecords(readOutputValues(harness), expectedRecords.get(0), SCHEMA);
+
+      // Read records from split1.
+      Assert.assertTrue("Should have processed the split1", processor.runMailboxStep());
+
+      // Read records from split2.
+      Assert.assertTrue("Should have processed the split2", processor.runMailboxStep());
+
+      TestFlinkScan.assertRecords(readOutputValues(harness),
+          Lists.newArrayList(Iterables.concat(expectedRecords)), SCHEMA);
+    }
+  }
+
+  @Test
+  public void testCheckpointRestore() throws Exception {
+    List<List<Record>> expectedRecords = generateRecordsAndCommitTxn(15);
+
+    List<FlinkInputSplit> splits = generateSplits();
+    Assert.assertEquals("Should have 10 splits", 15, splits.size());
+
+    OperatorSubtaskState state;
+    List<Record> expected = Lists.newArrayList();
+    try (OneInputStreamOperatorTestHarness<FlinkInputSplit, RowData> harness = createReader()) {
+      harness.setup();
+      harness.open();
+
+      // Enqueue all the splits.
+      for (FlinkInputSplit split : splits) {
+        harness.processElement(split, -1);
+      }
+
+      // Read all records from the first five splits.
+      SteppingMailboxProcessor localMailbox = createLocalMailbox(harness);
+      for (int i = 0; i < 5; i++) {
+        expected.addAll(expectedRecords.get(i));
+        Assert.assertTrue("Should have processed the split#" + i, localMailbox.runMailboxStep());
+
+        TestFlinkScan.assertRecords(readOutputValues(harness), expected, SCHEMA);
+      }
+
+      // Snapshot state now,  there're 10 splits left in the state.
+      state = harness.snapshot(1, 1);
+    }
+
+    expected.clear();
+    try (OneInputStreamOperatorTestHarness<FlinkInputSplit, RowData> harness = createReader()) {
+      harness.setup();
+      // Recover to process the remaining splits.
+      harness.initializeState(state);
+      harness.open();
+
+      SteppingMailboxProcessor localMailbox = createLocalMailbox(harness);
+
+      for (int i = 5; i < 10; i++) {
+        expected.addAll(expectedRecords.get(i));
+        Assert.assertTrue("Should have processed one split#" + i, localMailbox.runMailboxStep());
+
+        TestFlinkScan.assertRecords(readOutputValues(harness), expected, SCHEMA);
+      }
+
+      // Let's process the final 5 splits now.
+      for (int i = 10; i < 15; i++) {
+        expected.addAll(expectedRecords.get(i));
+        harness.processElement(splits.get(i), 1);
+
+        Assert.assertTrue("Should have processed the split#" + i, localMailbox.runMailboxStep());
+        TestFlinkScan.assertRecords(readOutputValues(harness), expected, SCHEMA);
+      }
+    }
+  }
+
+  private List<Row> readOutputValues(OneInputStreamOperatorTestHarness<FlinkInputSplit, RowData> harness) {
+    List<Row> results = Lists.newArrayList();
+    for (RowData rowData : harness.extractOutputValues()) {
+      results.add(Row.of(rowData.getInt(0), rowData.getString(1).toString()));
+    }
+    return results;
+  }
+
+  private List<List<Record>> generateRecordsAndCommitTxn(int commitTimes) throws IOException {
+    List<List<Record>> expectedRecords = Lists.newArrayList();
+    for (int i = 0; i < commitTimes; i++) {
+      List<Record> records = RandomGenericData.generate(SCHEMA, 100, 0L);
+      expectedRecords.add(records);
+
+      // Commit those records to iceberg table.
+      writeRecords(records);
+    }
+    return expectedRecords;
+  }
+
+  private void writeRecords(List<Record> records) throws IOException {
+    GenericAppenderHelper appender = new GenericAppenderHelper(table, DEFAULT_FORMAT, temp);
+    appender.appendToTable(records);
+  }
+
+  private List<FlinkInputSplit> generateSplits() {
+    List<FlinkInputSplit> inputSplits = Lists.newArrayList();
+
+    List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+    for (int i = snapshotIds.size() - 1; i >= 0; i--) {
+      ScanContext scanContext;
+      if (i == snapshotIds.size() - 1) {
+        // Generate the splits from the first snapshot.
+        scanContext = ScanContext.builder()
+            .useSnapshotId(snapshotIds.get(i))
+            .build();
+      } else {
+        // Generate the splits between the previous snapshot and current snapshot.
+        scanContext = ScanContext.builder()
+            .startSnapshotId(snapshotIds.get(i + 1))
+            .endSnapshotId(snapshotIds.get(i))
+            .build();
+      }
+
+      Collections.addAll(inputSplits, FlinkSplitGenerator.createInputSplits(table, scanContext));
+    }
+
+    return inputSplits;
+  }
+
+  private OneInputStreamOperatorTestHarness<FlinkInputSplit, RowData> createReader() throws Exception {
+    // This input format is used to opening the emitted split.
+    FlinkInputFormat inputFormat = FlinkSource.forRowData()
+        .tableLoader(TestTableLoader.of(tableDir.getAbsolutePath()))
+        .buildFormat();
+
+    OneInputStreamOperatorFactory<FlinkInputSplit, RowData> factory = StreamingReaderOperator.factory(inputFormat);
+    OneInputStreamOperatorTestHarness<FlinkInputSplit, RowData> harness = new OneInputStreamOperatorTestHarness<>(
+        factory, 1, 1, 0);
+    harness.getStreamConfig().setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+    return harness;
+  }
+
+  private SteppingMailboxProcessor createLocalMailbox(
+      OneInputStreamOperatorTestHarness<FlinkInputSplit, RowData> harness) {
+    return new SteppingMailboxProcessor(
+        MailboxDefaultAction.Controller::suspendDefaultAction,
+        harness.getTaskMailbox(),
+        StreamTaskActionExecutor.IMMEDIATE);
+  }
+}