You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2021/01/14 04:35:36 UTC
[iceberg] branch master updated: Flink: Support streaming reader.
(#1793)
This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 14331c4 Flink: Support streaming reader. (#1793)
14331c4 is described below
commit 14331c4e5f61e14cdc527a567f5dafc7fd95c3e7
Author: openinx <op...@gmail.com>
AuthorDate: Thu Jan 14 12:35:25 2021 +0800
Flink: Support streaming reader. (#1793)
---
.../apache/iceberg/data/GenericAppenderHelper.java | 2 +-
.../iceberg/flink/source/FlinkInputFormat.java | 4 +-
.../iceberg/flink/source/FlinkInputSplit.java | 9 +
.../apache/iceberg/flink/source/FlinkSource.java | 74 ++---
.../iceberg/flink/source/FlinkSplitGenerator.java | 6 +-
.../apache/iceberg/flink/source/ScanContext.java | 268 ++++++++++++------
.../flink/source/StreamingMonitorFunction.java | 177 ++++++++++++
.../flink/source/StreamingReaderOperator.java | 232 ++++++++++++++++
.../org/apache/iceberg/flink/FlinkTestBase.java | 24 +-
.../org/apache/iceberg/flink/TestTableLoader.java | 4 +
.../iceberg/flink/source/TestStreamScanSql.java | 241 +++++++++++++++++
.../flink/source/TestStreamingMonitorFunction.java | 300 +++++++++++++++++++++
.../flink/source/TestStreamingReaderOperator.java | 283 +++++++++++++++++++
13 files changed, 1491 insertions(+), 133 deletions(-)
diff --git a/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java b/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java
index c32be08..299135a 100644
--- a/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java
+++ b/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java
@@ -88,7 +88,7 @@ public class GenericAppenderHelper {
return DataFiles.builder(table.spec())
.withRecordCount(records.size())
.withFileSizeInBytes(file.length())
- .withPath(file.toURI().toString())
+ .withPath(Files.localInput(file).location())
.withMetrics(appender.metrics())
.withFormat(format)
.withPartition(partition)
diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
index f25af65..1bad1c2 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
@@ -61,7 +61,7 @@ public class FlinkInputFormat extends RichInputFormat<RowData, FlinkInputSplit>
@VisibleForTesting
Schema projectedSchema() {
- return context.projectedSchema();
+ return context.project();
}
@Override
@@ -92,7 +92,7 @@ public class FlinkInputFormat extends RichInputFormat<RowData, FlinkInputSplit>
@Override
public void open(FlinkInputSplit split) {
this.iterator = new RowDataIterator(
- split.getTask(), io, encryption, tableSchema, context.projectedSchema(), context.nameMapping(),
+ split.getTask(), io, encryption, tableSchema, context.project(), context.nameMapping(),
context.caseSensitive());
}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java
index 21a2f71..b59574f 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.flink.source;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.LocatableInputSplit;
import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
/**
* TODO Implement {@link LocatableInputSplit}.
@@ -44,4 +45,12 @@ public class FlinkInputSplit implements InputSplit {
CombinedScanTask getTask() {
return task;
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("splitNumber", splitNumber)
+ .add("task", task)
+ .toString();
+ }
}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
index e4b5907..95a7ba9 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
@@ -23,12 +23,12 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
-import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
@@ -70,10 +70,7 @@ public class FlinkSource {
private Table table;
private TableLoader tableLoader;
private TableSchema projectedSchema;
- private long limit;
- private ScanContext context = new ScanContext();
-
- private RowDataTypeInfo rowTypeInfo;
+ private final ScanContext.Builder contextBuilder = ScanContext.builder();
public Builder tableLoader(TableLoader newLoader) {
this.tableLoader = newLoader;
@@ -91,7 +88,7 @@ public class FlinkSource {
}
public Builder filters(List<Expression> filters) {
- this.context = context.filterRows(filters);
+ contextBuilder.filters(filters);
return this;
}
@@ -101,57 +98,62 @@ public class FlinkSource {
}
public Builder limit(long newLimit) {
- this.limit = newLimit;
+ contextBuilder.limit(newLimit);
return this;
}
public Builder properties(Map<String, String> properties) {
- this.context = context.fromProperties(properties);
+ contextBuilder.fromProperties(properties);
return this;
}
public Builder caseSensitive(boolean caseSensitive) {
- this.context = context.setCaseSensitive(caseSensitive);
+ contextBuilder.caseSensitive(caseSensitive);
return this;
}
public Builder snapshotId(Long snapshotId) {
- this.context = context.useSnapshotId(snapshotId);
+ contextBuilder.useSnapshotId(snapshotId);
return this;
}
public Builder startSnapshotId(Long startSnapshotId) {
- this.context = context.startSnapshotId(startSnapshotId);
+ contextBuilder.startSnapshotId(startSnapshotId);
return this;
}
public Builder endSnapshotId(Long endSnapshotId) {
- this.context = context.endSnapshotId(endSnapshotId);
+ contextBuilder.endSnapshotId(endSnapshotId);
return this;
}
public Builder asOfTimestamp(Long asOfTimestamp) {
- this.context = context.asOfTimestamp(asOfTimestamp);
+ contextBuilder.asOfTimestamp(asOfTimestamp);
return this;
}
public Builder splitSize(Long splitSize) {
- this.context = context.splitSize(splitSize);
+ contextBuilder.splitSize(splitSize);
return this;
}
public Builder splitLookback(Integer splitLookback) {
- this.context = context.splitLookback(splitLookback);
+ contextBuilder.splitLookback(splitLookback);
return this;
}
public Builder splitOpenFileCost(Long splitOpenFileCost) {
- this.context = context.splitOpenFileCost(splitOpenFileCost);
+ contextBuilder.splitOpenFileCost(splitOpenFileCost);
+ return this;
+ }
+
+ public Builder streaming(boolean streaming) {
+ contextBuilder.streaming(streaming);
return this;
}
public Builder nameMapping(String nameMapping) {
- this.context = context.nameMapping(nameMapping);
+ contextBuilder.nameMapping(nameMapping);
return this;
}
@@ -178,35 +180,37 @@ public class FlinkSource {
encryption = table.encryption();
}
- rowTypeInfo = RowDataTypeInfo.of((RowType) (
- projectedSchema == null ?
- FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergSchema)) :
- projectedSchema).toRowDataType().getLogicalType());
-
- context = context.project(projectedSchema == null ? icebergSchema :
- FlinkSchemaUtil.convert(icebergSchema, projectedSchema));
-
- context = context.limit(limit);
+ if (projectedSchema == null) {
+ contextBuilder.project(icebergSchema);
+ } else {
+ contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedSchema));
+ }
- return new FlinkInputFormat(tableLoader, icebergSchema, io, encryption, context);
+ return new FlinkInputFormat(tableLoader, icebergSchema, io, encryption, contextBuilder.build());
}
public DataStream<RowData> build() {
Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null");
FlinkInputFormat format = buildFormat();
- if (isBounded(context)) {
- return env.createInput(format, rowTypeInfo);
+
+ ScanContext context = contextBuilder.build();
+ TypeInformation<RowData> typeInfo = RowDataTypeInfo.of(FlinkSchemaUtil.convert(context.project()));
+
+ if (!context.isStreaming()) {
+ return env.createInput(format, typeInfo);
} else {
- throw new UnsupportedOperationException("The Unbounded mode is not supported yet");
+ StreamingMonitorFunction function = new StreamingMonitorFunction(tableLoader, context);
+
+ String monitorFunctionName = String.format("Iceberg table (%s) monitor", table);
+ String readerOperatorName = String.format("Iceberg table (%s) reader", table);
+
+ return env.addSource(function, monitorFunctionName)
+ .transform(readerOperatorName, typeInfo, StreamingReaderOperator.factory(format));
}
}
}
- private static boolean isBounded(ScanContext context) {
- return context.startSnapshotId() == null || context.endSnapshotId() != null;
- }
-
public static boolean isBounded(Map<String, String> properties) {
- return isBounded(new ScanContext().fromProperties(properties));
+ return !ScanContext.builder().fromProperties(properties).build().isStreaming();
}
}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
index ade4cfb..f495e09 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
@@ -47,7 +47,7 @@ class FlinkSplitGenerator {
TableScan scan = table
.newScan()
.caseSensitive(context.caseSensitive())
- .project(context.projectedSchema());
+ .project(context.project());
if (context.snapshotId() != null) {
scan = scan.useSnapshot(context.snapshotId());
@@ -77,8 +77,8 @@ class FlinkSplitGenerator {
scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, context.splitOpenFileCost().toString());
}
- if (context.filterExpressions() != null) {
- for (Expression filter : context.filterExpressions()) {
+ if (context.filters() != null) {
+ for (Expression filter : context.filters()) {
scan = scan.filter(filter);
}
}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index 42804a0..2896efb 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -20,6 +20,7 @@
package org.apache.iceberg.flink.source;
import java.io.Serializable;
+import java.time.Duration;
import java.util.List;
import java.util.Map;
import org.apache.flink.configuration.ConfigOption;
@@ -61,6 +62,12 @@ class ScanContext implements Serializable {
private static final ConfigOption<Long> SPLIT_FILE_OPEN_COST =
ConfigOptions.key("split-file-open-cost").longType().defaultValue(null);
+ private static final ConfigOption<Boolean> STREAMING =
+ ConfigOptions.key("streaming").booleanType().defaultValue(false);
+
+ private static final ConfigOption<Duration> MONITOR_INTERVAL =
+ ConfigOptions.key("monitor-interval").durationType().defaultValue(Duration.ofSeconds(10));
+
private final boolean caseSensitive;
private final Long snapshotId;
private final Long startSnapshotId;
@@ -69,29 +76,18 @@ class ScanContext implements Serializable {
private final Long splitSize;
private final Integer splitLookback;
private final Long splitOpenFileCost;
+ private final boolean isStreaming;
+ private final Duration monitorInterval;
+
private final String nameMapping;
- private final Schema projectedSchema;
- private final List<Expression> filterExpressions;
- private final Long limit;
-
- ScanContext() {
- this.caseSensitive = CASE_SENSITIVE.defaultValue();
- this.snapshotId = SNAPSHOT_ID.defaultValue();
- this.startSnapshotId = START_SNAPSHOT_ID.defaultValue();
- this.endSnapshotId = END_SNAPSHOT_ID.defaultValue();
- this.asOfTimestamp = AS_OF_TIMESTAMP.defaultValue();
- this.splitSize = SPLIT_SIZE.defaultValue();
- this.splitLookback = SPLIT_LOOKBACK.defaultValue();
- this.splitOpenFileCost = SPLIT_FILE_OPEN_COST.defaultValue();
- this.nameMapping = null;
- this.projectedSchema = null;
- this.filterExpressions = null;
- this.limit = null;
- }
+ private final Schema schema;
+ private final List<Expression> filters;
+ private final long limit;
private ScanContext(boolean caseSensitive, Long snapshotId, Long startSnapshotId, Long endSnapshotId,
- Long asOfTimestamp, Long splitSize, Integer splitLookback, Long splitOpenFileCost,
- String nameMapping, Schema projectedSchema, List<Expression> filterExpressions, Long limit) {
+ Long asOfTimestamp, Long splitSize, Integer splitLookback, Long splitOpenFileCost,
+ boolean isStreaming, Duration monitorInterval, String nameMapping,
+ Schema schema, List<Expression> filters, long limit) {
this.caseSensitive = caseSensitive;
this.snapshotId = snapshotId;
this.startSnapshotId = startSnapshotId;
@@ -100,126 +96,224 @@ class ScanContext implements Serializable {
this.splitSize = splitSize;
this.splitLookback = splitLookback;
this.splitOpenFileCost = splitOpenFileCost;
+ this.isStreaming = isStreaming;
+ this.monitorInterval = monitorInterval;
+
this.nameMapping = nameMapping;
- this.projectedSchema = projectedSchema;
- this.filterExpressions = filterExpressions;
+ this.schema = schema;
+ this.filters = filters;
this.limit = limit;
}
- ScanContext fromProperties(Map<String, String> properties) {
- Configuration config = new Configuration();
- properties.forEach(config::setString);
- return new ScanContext(config.get(CASE_SENSITIVE), config.get(SNAPSHOT_ID), config.get(START_SNAPSHOT_ID),
- config.get(END_SNAPSHOT_ID), config.get(AS_OF_TIMESTAMP), config.get(SPLIT_SIZE), config.get(SPLIT_LOOKBACK),
- config.get(SPLIT_FILE_OPEN_COST), properties.get(DEFAULT_NAME_MAPPING), projectedSchema, filterExpressions,
- limit);
- }
-
boolean caseSensitive() {
return caseSensitive;
}
- ScanContext setCaseSensitive(boolean isCaseSensitive) {
- return new ScanContext(isCaseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize,
- splitLookback, splitOpenFileCost, nameMapping, projectedSchema, filterExpressions, limit);
- }
-
Long snapshotId() {
return snapshotId;
}
- ScanContext useSnapshotId(Long scanSnapshotId) {
- return new ScanContext(caseSensitive, scanSnapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize,
- splitLookback, splitOpenFileCost, nameMapping, projectedSchema, filterExpressions, limit);
- }
-
Long startSnapshotId() {
return startSnapshotId;
}
- ScanContext startSnapshotId(Long id) {
- return new ScanContext(caseSensitive, snapshotId, id, endSnapshotId, asOfTimestamp, splitSize, splitLookback,
- splitOpenFileCost, nameMapping, projectedSchema, filterExpressions, limit);
- }
-
Long endSnapshotId() {
return endSnapshotId;
}
- ScanContext endSnapshotId(Long id) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId, id, asOfTimestamp, splitSize, splitLookback,
- splitOpenFileCost, nameMapping, projectedSchema, filterExpressions, limit);
- }
-
Long asOfTimestamp() {
return asOfTimestamp;
}
- ScanContext asOfTimestamp(Long timestamp) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, timestamp, splitSize,
- splitLookback, splitOpenFileCost, nameMapping, projectedSchema, filterExpressions, limit);
- }
-
Long splitSize() {
return splitSize;
}
- ScanContext splitSize(Long size) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, size,
- splitLookback, splitOpenFileCost, nameMapping, projectedSchema, filterExpressions, limit);
- }
-
Integer splitLookback() {
return splitLookback;
}
- ScanContext splitLookback(Integer lookback) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize,
- lookback, splitOpenFileCost, nameMapping, projectedSchema, filterExpressions, limit);
- }
-
Long splitOpenFileCost() {
return splitOpenFileCost;
}
- ScanContext splitOpenFileCost(Long fileCost) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize,
- splitLookback, fileCost, nameMapping, projectedSchema, filterExpressions, limit);
+ boolean isStreaming() {
+ return isStreaming;
+ }
+
+ Duration monitorInterval() {
+ return monitorInterval;
}
String nameMapping() {
return nameMapping;
}
- ScanContext nameMapping(String mapping) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize,
- splitLookback, splitOpenFileCost, mapping, projectedSchema, filterExpressions, limit);
+ Schema project() {
+ return schema;
}
- Schema projectedSchema() {
- return projectedSchema;
+ List<Expression> filters() {
+ return filters;
}
- ScanContext project(Schema schema) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize,
- splitLookback, splitOpenFileCost, nameMapping, schema, filterExpressions, limit);
+ long limit() {
+ return limit;
}
- List<Expression> filterExpressions() {
- return filterExpressions;
+ ScanContext copyWithAppendsBetween(long newStartSnapshotId, long newEndSnapshotId) {
+ return ScanContext.builder()
+ .caseSensitive(caseSensitive)
+ .useSnapshotId(null)
+ .startSnapshotId(newStartSnapshotId)
+ .endSnapshotId(newEndSnapshotId)
+ .asOfTimestamp(null)
+ .splitSize(splitSize)
+ .splitLookback(splitLookback)
+ .splitOpenFileCost(splitOpenFileCost)
+ .streaming(isStreaming)
+ .monitorInterval(monitorInterval)
+ .nameMapping(nameMapping)
+ .project(schema)
+ .filters(filters)
+ .limit(limit)
+ .build();
}
- ScanContext filterRows(List<Expression> filters) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize,
- splitLookback, splitOpenFileCost, nameMapping, projectedSchema, filters, limit);
+ ScanContext copyWithSnapshotId(long newSnapshotId) {
+ return ScanContext.builder()
+ .caseSensitive(caseSensitive)
+ .useSnapshotId(newSnapshotId)
+ .startSnapshotId(null)
+ .endSnapshotId(null)
+ .asOfTimestamp(null)
+ .splitSize(splitSize)
+ .splitLookback(splitLookback)
+ .splitOpenFileCost(splitOpenFileCost)
+ .streaming(isStreaming)
+ .monitorInterval(monitorInterval)
+ .nameMapping(nameMapping)
+ .project(schema)
+ .filters(filters)
+ .limit(limit)
+ .build();
}
- long limit() {
- return limit;
+ static Builder builder() {
+ return new Builder();
}
- ScanContext limit(Long newLimit) {
- return new ScanContext(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize,
- splitLookback, splitOpenFileCost, nameMapping, projectedSchema, filterExpressions, newLimit);
+ static class Builder {
+ private boolean caseSensitive = CASE_SENSITIVE.defaultValue();
+ private Long snapshotId = SNAPSHOT_ID.defaultValue();
+ private Long startSnapshotId = START_SNAPSHOT_ID.defaultValue();
+ private Long endSnapshotId = END_SNAPSHOT_ID.defaultValue();
+ private Long asOfTimestamp = AS_OF_TIMESTAMP.defaultValue();
+ private Long splitSize = SPLIT_SIZE.defaultValue();
+ private Integer splitLookback = SPLIT_LOOKBACK.defaultValue();
+ private Long splitOpenFileCost = SPLIT_FILE_OPEN_COST.defaultValue();
+ private boolean isStreaming = STREAMING.defaultValue();
+ private Duration monitorInterval = MONITOR_INTERVAL.defaultValue();
+ private String nameMapping;
+ private Schema projectedSchema;
+ private List<Expression> filters;
+ private long limit = -1L;
+
+ private Builder() {
+ }
+
+ Builder caseSensitive(boolean newCaseSensitive) {
+ this.caseSensitive = newCaseSensitive;
+ return this;
+ }
+
+ Builder useSnapshotId(Long newSnapshotId) {
+ this.snapshotId = newSnapshotId;
+ return this;
+ }
+
+ Builder startSnapshotId(Long newStartSnapshotId) {
+ this.startSnapshotId = newStartSnapshotId;
+ return this;
+ }
+
+ Builder endSnapshotId(Long newEndSnapshotId) {
+ this.endSnapshotId = newEndSnapshotId;
+ return this;
+ }
+
+ Builder asOfTimestamp(Long newAsOfTimestamp) {
+ this.asOfTimestamp = newAsOfTimestamp;
+ return this;
+ }
+
+ Builder splitSize(Long newSplitSize) {
+ this.splitSize = newSplitSize;
+ return this;
+ }
+
+ Builder splitLookback(Integer newSplitLookback) {
+ this.splitLookback = newSplitLookback;
+ return this;
+ }
+
+ Builder splitOpenFileCost(Long newSplitOpenFileCost) {
+ this.splitOpenFileCost = newSplitOpenFileCost;
+ return this;
+ }
+
+ Builder streaming(boolean streaming) {
+ this.isStreaming = streaming;
+ return this;
+ }
+
+ Builder monitorInterval(Duration newMonitorInterval) {
+ this.monitorInterval = newMonitorInterval;
+ return this;
+ }
+
+ Builder nameMapping(String newNameMapping) {
+ this.nameMapping = newNameMapping;
+ return this;
+ }
+
+ Builder project(Schema newProjectedSchema) {
+ this.projectedSchema = newProjectedSchema;
+ return this;
+ }
+
+ Builder filters(List<Expression> newFilters) {
+ this.filters = newFilters;
+ return this;
+ }
+
+ Builder limit(long newLimit) {
+ this.limit = newLimit;
+ return this;
+ }
+
+ Builder fromProperties(Map<String, String> properties) {
+ Configuration config = new Configuration();
+ properties.forEach(config::setString);
+
+ return this.useSnapshotId(config.get(SNAPSHOT_ID))
+ .caseSensitive(config.get(CASE_SENSITIVE))
+ .asOfTimestamp(config.get(AS_OF_TIMESTAMP))
+ .startSnapshotId(config.get(START_SNAPSHOT_ID))
+ .endSnapshotId(config.get(END_SNAPSHOT_ID))
+ .splitSize(config.get(SPLIT_SIZE))
+ .splitLookback(config.get(SPLIT_LOOKBACK))
+ .splitOpenFileCost(config.get(SPLIT_FILE_OPEN_COST))
+ .streaming(config.get(STREAMING))
+ .monitorInterval(config.get(MONITOR_INTERVAL))
+ .nameMapping(properties.get(DEFAULT_NAME_MAPPING));
+ }
+
+ public ScanContext build() {
+ return new ScanContext(caseSensitive, snapshotId, startSnapshotId,
+ endSnapshotId, asOfTimestamp, splitSize, splitLookback,
+ splitOpenFileCost, isStreaming, monitorInterval, nameMapping, projectedSchema,
+ filters, limit);
+ }
}
}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java b/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
new file mode 100644
index 0000000..b31426a
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the single (non-parallel) monitoring task which takes a {@link FlinkInputFormat},
+ * it is responsible for:
+ *
+ * <ol>
+ * <li>Monitoring snapshots of the Iceberg table.</li>
+ * <li>Creating the {@link FlinkInputSplit splits} corresponding to the incremental files</li>
+ * <li>Assigning them to downstream tasks for further processing.</li>
+ * </ol>
+ *
+ * <p>The splits to be read are forwarded to the downstream {@link StreamingReaderOperator}
+ * which can have parallelism greater than one.
+ */
+public class StreamingMonitorFunction extends RichSourceFunction<FlinkInputSplit> implements CheckpointedFunction {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StreamingMonitorFunction.class);
+
+ private static final long INIT_LAST_SNAPSHOT_ID = -1L;
+
+ private final TableLoader tableLoader;
+ private final ScanContext scanContext;
+
+ private volatile boolean isRunning = true;
+
+ // The checkpoint thread is not the same thread that running the function for SourceStreamTask now. It's necessary to
+ // mark this as volatile.
+ private volatile long lastSnapshotId = INIT_LAST_SNAPSHOT_ID;
+
+ private transient SourceContext<FlinkInputSplit> sourceContext;
+ private transient Table table;
+ private transient ListState<Long> lastSnapshotIdState;
+
+ public StreamingMonitorFunction(TableLoader tableLoader, ScanContext scanContext) {
+ Preconditions.checkArgument(scanContext.snapshotId() == null,
+ "Cannot set snapshot-id option for streaming reader");
+ Preconditions.checkArgument(scanContext.asOfTimestamp() == null,
+ "Cannot set as-of-timestamp option for streaming reader");
+ Preconditions.checkArgument(scanContext.endSnapshotId() == null,
+ "Cannot set end-snapshot-id option for streaming reader");
+ this.tableLoader = tableLoader;
+ this.scanContext = scanContext;
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ // Load iceberg table from table loader.
+ tableLoader.open();
+ table = tableLoader.loadTable();
+
+ // Initialize the flink state for last snapshot id.
+ lastSnapshotIdState = context.getOperatorStateStore().getListState(
+ new ListStateDescriptor<>(
+ "snapshot-id-state",
+ LongSerializer.INSTANCE));
+
+ // Restore the last-snapshot-id from flink's state if possible.
+ if (context.isRestored()) {
+ LOG.info("Restoring state for the {}.", getClass().getSimpleName());
+ lastSnapshotId = lastSnapshotIdState.get().iterator().next();
+ } else if (scanContext.startSnapshotId() != null) {
+ Preconditions.checkNotNull(table.currentSnapshot(), "Don't have any available snapshot in table.");
+
+ long currentSnapshotId = table.currentSnapshot().snapshotId();
+ Preconditions.checkState(SnapshotUtil.ancestorOf(table, currentSnapshotId, scanContext.startSnapshotId()),
+ "The option start-snapshot-id %s is not an ancestor of the current snapshot.", scanContext.startSnapshotId());
+
+ lastSnapshotId = scanContext.startSnapshotId();
+ }
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ lastSnapshotIdState.clear();
+ lastSnapshotIdState.add(lastSnapshotId);
+ }
+
+ @Override
+ public void run(SourceContext<FlinkInputSplit> ctx) throws Exception {
+ this.sourceContext = ctx;
+ while (isRunning) {
+ synchronized (sourceContext.getCheckpointLock()) {
+ if (isRunning) {
+ monitorAndForwardSplits();
+ }
+ }
+ Thread.sleep(scanContext.monitorInterval().toMillis());
+ }
+ }
+
+ private void monitorAndForwardSplits() {
+ // Refresh the table to get the latest committed snapshot.
+ table.refresh();
+
+ Snapshot snapshot = table.currentSnapshot();
+ if (snapshot != null && snapshot.snapshotId() != lastSnapshotId) {
+ long snapshotId = snapshot.snapshotId();
+
+ ScanContext newScanContext;
+ if (lastSnapshotId == INIT_LAST_SNAPSHOT_ID) {
+ newScanContext = scanContext.copyWithSnapshotId(snapshotId);
+ } else {
+ newScanContext = scanContext.copyWithAppendsBetween(lastSnapshotId, snapshotId);
+ }
+
+ FlinkInputSplit[] splits = FlinkSplitGenerator.createInputSplits(table, newScanContext);
+ for (FlinkInputSplit split : splits) {
+ sourceContext.collect(split);
+ }
+
+ lastSnapshotId = snapshotId;
+ }
+ }
+
+ @Override
+ public void cancel() {
+ // this is to cover the case where cancel() is called before the run()
+ if (sourceContext != null) {
+ synchronized (sourceContext.getCheckpointLock()) {
+ isRunning = false;
+ }
+ } else {
+ isRunning = false;
+ }
+
+ // Release all the resources here.
+ if (tableLoader != null) {
+ try {
+ tableLoader.close();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ cancel();
+ }
+}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java b/flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java
new file mode 100644
index 0000000..235b173
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import java.util.Queue;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.state.JavaSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.MailboxExecutor;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.api.operators.StreamSourceContexts;
+import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The operator that reads the {@link FlinkInputSplit splits} received from the preceding {@link
+ * StreamingMonitorFunction}. Contrary to the {@link StreamingMonitorFunction} which has a parallelism of 1,
+ * this operator can have multiple parallelism.
+ *
+ * <p>As soon as a split descriptor is received, it is put in a queue, and use {@link MailboxExecutor}
+ * read the actual data of the split. This architecture allows the separation of the reading thread from the one split
+ * processing the checkpoint barriers, thus removing any potential back-pressure.
+ */
+public class StreamingReaderOperator extends AbstractStreamOperator<RowData>
+ implements OneInputStreamOperator<FlinkInputSplit, RowData> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StreamingReaderOperator.class);
+
+ // It's the same thread that is running this operator and checkpoint actions. we use this executor to schedule only
+ // one split for future reading, so that a new checkpoint could be triggered without blocking long time for exhausting
+ // all scheduled splits.
+ private final MailboxExecutor executor;
+ private FlinkInputFormat format;
+
+ private transient SourceFunction.SourceContext<RowData> sourceContext;
+
+ private transient ListState<FlinkInputSplit> inputSplitsState;
+ private transient Queue<FlinkInputSplit> splits;
+
+ // Splits are read by the same thread that calls processElement. Each read task is submitted to that thread by adding
+ // them to the executor. This state is used to ensure that only one read task is in that queue at a time, so that read
+ // tasks do not accumulate ahead of checkpoint tasks. When there is a read task in the queue, this is set to RUNNING.
+ // When there are no more files to read, this will be set to IDLE.
+ private transient SplitState currentSplitState;
+
+ private StreamingReaderOperator(FlinkInputFormat format, ProcessingTimeService timeService,
+ MailboxExecutor mailboxExecutor) {
+ this.format = Preconditions.checkNotNull(format, "The InputFormat should not be null.");
+ this.processingTimeService = timeService;
+ this.executor = Preconditions.checkNotNull(mailboxExecutor, "The mailboxExecutor should not be null.");
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws Exception {
+ super.initializeState(context);
+
+ // TODO Replace Java serialization with Avro approach to keep state compatibility.
+ // See issue: https://github.com/apache/iceberg/issues/1698
+ inputSplitsState = context.getOperatorStateStore().getListState(
+ new ListStateDescriptor<>("splits", new JavaSerializer<>()));
+
+ // Initialize the current split state to IDLE.
+ currentSplitState = SplitState.IDLE;
+
+ // Recover splits state from flink state backend if possible.
+ splits = Lists.newLinkedList();
+ if (context.isRestored()) {
+ int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
+ LOG.info("Restoring state for the {} (taskIdx: {}).", getClass().getSimpleName(), subtaskIdx);
+
+ for (FlinkInputSplit split : inputSplitsState.get()) {
+ splits.add(split);
+ }
+ }
+
+ this.sourceContext = StreamSourceContexts.getSourceContext(
+ getOperatorConfig().getTimeCharacteristic(),
+ getProcessingTimeService(),
+ new Object(), // no actual locking needed
+ getContainingTask().getStreamStatusMaintainer(),
+ output,
+ getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(),
+ -1);
+
+ // Enqueue to process the recovered input splits.
+ enqueueProcessSplits();
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ super.snapshotState(context);
+
+ inputSplitsState.clear();
+ inputSplitsState.addAll(Lists.newArrayList(splits));
+ }
+
+ @Override
+ public void processElement(StreamRecord<FlinkInputSplit> element) {
+ splits.add(element.getValue());
+ enqueueProcessSplits();
+ }
+
+ private void enqueueProcessSplits() {
+ if (currentSplitState == SplitState.IDLE && !splits.isEmpty()) {
+ currentSplitState = SplitState.RUNNING;
+ executor.execute(this::processSplits, this.getClass().getSimpleName());
+ }
+ }
+
+ private void processSplits() throws IOException {
+ FlinkInputSplit split = splits.poll();
+ if (split == null) {
+ currentSplitState = SplitState.IDLE;
+ return;
+ }
+
+ format.open(split);
+ try {
+ RowData nextElement = null;
+ while (!format.reachedEnd()) {
+ nextElement = format.nextRecord(nextElement);
+ sourceContext.collect(nextElement);
+ }
+ } finally {
+ currentSplitState = SplitState.IDLE;
+ format.close();
+ }
+
+ // Re-schedule to process the next split.
+ enqueueProcessSplits();
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) {
+ // we do nothing because we emit our own watermarks if needed.
+ }
+
+ @Override
+ public void dispose() throws Exception {
+ super.dispose();
+
+ if (format != null) {
+ format.close();
+ format.closeInputFormat();
+ format = null;
+ }
+
+ sourceContext = null;
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ output.close();
+ if (sourceContext != null) {
+ sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
+ sourceContext.close();
+ sourceContext = null;
+ }
+ }
+
+ static OneInputStreamOperatorFactory<FlinkInputSplit, RowData> factory(FlinkInputFormat format) {
+ return new OperatorFactory(format);
+ }
+
+ private enum SplitState {
+ IDLE, RUNNING
+ }
+
+ private static class OperatorFactory extends AbstractStreamOperatorFactory<RowData>
+ implements YieldingOperatorFactory<RowData>, OneInputStreamOperatorFactory<FlinkInputSplit, RowData> {
+
+ private final FlinkInputFormat format;
+
+ private transient MailboxExecutor mailboxExecutor;
+
+ private OperatorFactory(FlinkInputFormat format) {
+ this.format = format;
+ }
+
+ @Override
+ public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
+ this.mailboxExecutor = mailboxExecutor;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <O extends StreamOperator<RowData>> O createStreamOperator(StreamOperatorParameters<RowData> parameters) {
+ StreamingReaderOperator operator = new StreamingReaderOperator(format, processingTimeService, mailboxExecutor);
+ operator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
+ return (O) operator;
+ }
+
+ @Override
+ public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
+ return StreamingReaderOperator.class;
+ }
+ }
+}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
index 6782267..8302be3 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
@@ -72,8 +72,17 @@ public abstract class FlinkTestBase extends AbstractTestBase {
return tEnv;
}
+ protected static TableResult exec(TableEnvironment env, String query, Object... args) {
+ return env.executeSql(String.format(query, args));
+ }
+
+ protected TableResult exec(String query, Object... args) {
+ return exec(getTableEnv(), query, args);
+ }
+
protected List<Object[]> sql(String query, Object... args) {
- TableResult tableResult = getTableEnv().executeSql(String.format(query, args));
+ TableResult tableResult = exec(String.format(query, args));
+
tableResult.getJobClient().ifPresent(c -> {
try {
c.getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
@@ -81,12 +90,17 @@ public abstract class FlinkTestBase extends AbstractTestBase {
throw new RuntimeException(e);
}
});
- CloseableIterator<Row> iter = tableResult.collect();
+
List<Object[]> results = Lists.newArrayList();
- while (iter.hasNext()) {
- Row row = iter.next();
- results.add(IntStream.range(0, row.getArity()).mapToObj(row::getField).toArray(Object[]::new));
+ try (CloseableIterator<Row> iter = tableResult.collect()) {
+ while (iter.hasNext()) {
+ Row row = iter.next();
+ results.add(IntStream.range(0, row.getArity()).mapToObj(row::getField).toArray(Object[]::new));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
+
return results;
}
}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestTableLoader.java b/flink/src/test/java/org/apache/iceberg/flink/TestTableLoader.java
index f3df428..5f7ae29 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestTableLoader.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestTableLoader.java
@@ -26,6 +26,10 @@ import org.apache.iceberg.TestTables;
public class TestTableLoader implements TableLoader {
private File dir;
+ public static TableLoader of(String dir) {
+ return new TestTableLoader(dir);
+ }
+
public TestTableLoader(String dir) {
this.dir = new File(dir);
}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
new file mode 100644
index 0000000..91ce983
--- /dev/null
+++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.FlinkCatalogTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestStreamScanSql extends FlinkCatalogTestBase {
+ private static final String TABLE = "test_table";
+ private static final FileFormat FORMAT = FileFormat.PARQUET;
+
+ private TableEnvironment tEnv;
+
+ public TestStreamScanSql(String catalogName, Namespace baseNamespace) {
+ super(catalogName, baseNamespace);
+ }
+
+ @Override
+ protected TableEnvironment getTableEnv() {
+ if (tEnv == null) {
+ synchronized (this) {
+ if (tEnv == null) {
+ EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode();
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.enableCheckpointing(400);
+
+ StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(env, settingsBuilder.build());
+ streamTableEnv.getConfig()
+ .getConfiguration()
+ .set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true);
+ tEnv = streamTableEnv;
+ }
+ }
+ }
+ return tEnv;
+ }
+
+ @Before
+ public void before() {
+ super.before();
+ sql("CREATE DATABASE %s", flinkDatabase);
+ sql("USE CATALOG %s", catalogName);
+ sql("USE %s", DATABASE);
+ }
+
+ @After
+ public void clean() {
+ sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE);
+ sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+ super.clean();
+ }
+
+ private void insertRows(String partition, Table table, Row... rows) throws IOException {
+ GenericAppenderHelper appender = new GenericAppenderHelper(table, FORMAT, TEMPORARY_FOLDER);
+
+ GenericRecord gRecord = GenericRecord.create(table.schema());
+ List<Record> records = Lists.newArrayList();
+ for (Row row : rows) {
+ records.add(gRecord.copy(
+ "id", row.getField(0),
+ "data", row.getField(1),
+ "dt", row.getField(2)
+ ));
+ }
+
+ if (partition != null) {
+ appender.appendToTable(TestHelpers.Row.of(partition, 0), records);
+ } else {
+ appender.appendToTable(records);
+ }
+ }
+
+ private void insertRows(Table table, Row... rows) throws IOException {
+ insertRows(null, table, rows);
+ }
+
+ private void assertRows(List<Row> expectedRows, Iterator<Row> iterator) {
+ for (Row expectedRow : expectedRows) {
+ Assert.assertTrue("Should have more records", iterator.hasNext());
+
+ Row actualRow = iterator.next();
+ Assert.assertEquals("Should have expected fields", 3, actualRow.getArity());
+ Assert.assertEquals("Should have expected id", expectedRow.getField(0), actualRow.getField(0));
+ Assert.assertEquals("Should have expected data", expectedRow.getField(1), actualRow.getField(1));
+ Assert.assertEquals("Should have expected dt", expectedRow.getField(2), actualRow.getField(2));
+ }
+ }
+
+ @Test
+ public void testUnPartitionedTable() throws Exception {
+ sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
+ Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+
+ TableResult result = exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/", TABLE);
+ try (CloseableIterator<Row> iterator = result.collect()) {
+
+ Row row1 = Row.of(1, "aaa", "2021-01-01");
+ insertRows(table, row1);
+ assertRows(ImmutableList.of(row1), iterator);
+
+ Row row2 = Row.of(2, "bbb", "2021-01-01");
+ insertRows(table, row2);
+ assertRows(ImmutableList.of(row2), iterator);
+ }
+ result.getJobClient().ifPresent(JobClient::cancel);
+ }
+
+
+ @Test
+ public void testPartitionedTable() throws Exception {
+ sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR) PARTITIONED BY (dt)", TABLE);
+ Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+
+ TableResult result = exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/", TABLE);
+ try (CloseableIterator<Row> iterator = result.collect()) {
+ Row row1 = Row.of(1, "aaa", "2021-01-01");
+ insertRows("2021-01-01", table, row1);
+ assertRows(ImmutableList.of(row1), iterator);
+
+ Row row2 = Row.of(2, "bbb", "2021-01-02");
+ insertRows("2021-01-02", table, row2);
+ assertRows(ImmutableList.of(row2), iterator);
+
+ Row row3 = Row.of(1, "aaa", "2021-01-02");
+ insertRows("2021-01-02", table, row3);
+ assertRows(ImmutableList.of(row3), iterator);
+
+ Row row4 = Row.of(2, "bbb", "2021-01-01");
+ insertRows("2021-01-01", table, row4);
+ assertRows(ImmutableList.of(row4), iterator);
+ }
+ result.getJobClient().ifPresent(JobClient::cancel);
+ }
+
+ @Test
+ public void testConsumeFromBeginning() throws Exception {
+ sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
+ Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+
+ Row row1 = Row.of(1, "aaa", "2021-01-01");
+ Row row2 = Row.of(2, "bbb", "2021-01-01");
+ insertRows(table, row1, row2);
+
+ TableResult result = exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/", TABLE);
+ try (CloseableIterator<Row> iterator = result.collect()) {
+ assertRows(ImmutableList.of(row1, row2), iterator);
+
+ Row row3 = Row.of(3, "ccc", "2021-01-01");
+ insertRows(table, row3);
+ assertRows(ImmutableList.of(row3), iterator);
+
+ Row row4 = Row.of(4, "ddd", "2021-01-01");
+ insertRows(table, row4);
+ assertRows(ImmutableList.of(row4), iterator);
+ }
+ result.getJobClient().ifPresent(JobClient::cancel);
+ }
+
+ @Test
+ public void testConsumeFromStartSnapshotId() throws Exception {
+ sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
+ Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+
+ // Produce two snapshots.
+ Row row1 = Row.of(1, "aaa", "2021-01-01");
+ Row row2 = Row.of(2, "bbb", "2021-01-01");
+ insertRows(table, row1);
+ insertRows(table, row2);
+
+ long startSnapshotId = table.currentSnapshot().snapshotId();
+
+ Row row3 = Row.of(3, "ccc", "2021-01-01");
+ Row row4 = Row.of(4, "ddd", "2021-01-01");
+ insertRows(table, row3, row4);
+
+ TableResult result = exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', " +
+ "'start-snapshot-id'='%d')*/", TABLE, startSnapshotId);
+ try (CloseableIterator<Row> iterator = result.collect()) {
+ // The row2 in start snapshot will be excluded.
+ assertRows(ImmutableList.of(row3, row4), iterator);
+
+ Row row5 = Row.of(5, "eee", "2021-01-01");
+ Row row6 = Row.of(6, "fff", "2021-01-01");
+ insertRows(table, row5, row6);
+ assertRows(ImmutableList.of(row5, row6), iterator);
+
+ Row row7 = Row.of(7, "ggg", "2021-01-01");
+ insertRows(table, row7);
+ assertRows(ImmutableList.of(row7), iterator);
+ }
+ result.getJobClient().ifPresent(JobClient::cancel);
+ }
+}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
new file mode 100644
index 0000000..dcd41dc
--- /dev/null
+++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.TestTableLoader;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestStreamingMonitorFunction extends TableTestBase {
+
+ private static final Schema SCHEMA = new Schema(
+ Types.NestedField.required(1, "id", Types.IntegerType.get()),
+ Types.NestedField.required(2, "data", Types.StringType.get())
+ );
+ private static final FileFormat DEFAULT_FORMAT = FileFormat.PARQUET;
+ private static final long WAIT_TIME_MILLIS = 10 * 1000L;
+
+ @Parameterized.Parameters(name = "FormatVersion={0}")
+ public static Iterable<Object[]> parameters() {
+ return ImmutableList.of(
+ new Object[] {1},
+ new Object[] {2}
+ );
+ }
+
+ public TestStreamingMonitorFunction(int formatVersion) {
+ super(formatVersion);
+ }
+
+ @Before
+ @Override
+ public void setupTable() throws IOException {
+ this.tableDir = temp.newFolder();
+ this.metadataDir = new File(tableDir, "metadata");
+ Assert.assertTrue(tableDir.delete());
+
+ // Construct the iceberg table.
+ table = create(SCHEMA, PartitionSpec.unpartitioned());
+ }
+
+ private void runSourceFunctionInTask(TestSourceContext sourceContext, StreamingMonitorFunction function) {
+ Thread task = new Thread(() -> {
+ try {
+ function.run(sourceContext);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ task.start();
+ }
+
+ @Test
+ public void testConsumeWithoutStartSnapshotId() throws Exception {
+ List<List<Record>> recordsList = generateRecordsAndCommitTxn(10);
+ ScanContext scanContext = ScanContext.builder()
+ .monitorInterval(Duration.ofMillis(100))
+ .build();
+
+ StreamingMonitorFunction function = createFunction(scanContext);
+ try (AbstractStreamOperatorTestHarness<FlinkInputSplit> harness = createHarness(function)) {
+ harness.setup();
+ harness.open();
+
+ CountDownLatch latch = new CountDownLatch(1);
+ TestSourceContext sourceContext = new TestSourceContext(latch);
+ runSourceFunctionInTask(sourceContext, function);
+
+ Assert.assertTrue("Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
+ Thread.sleep(1000L);
+
+ // Stop the stream task.
+ function.close();
+
+ Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size());
+ TestFlinkScan.assertRecords(sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA);
+ }
+ }
+
+ @Test
+ public void testConsumeFromStartSnapshotId() throws Exception {
+ // Commit the first five transactions.
+ generateRecordsAndCommitTxn(5);
+ long startSnapshotId = table.currentSnapshot().snapshotId();
+
+ // Commit the next five transactions.
+ List<List<Record>> recordsList = generateRecordsAndCommitTxn(5);
+
+ ScanContext scanContext = ScanContext.builder()
+ .monitorInterval(Duration.ofMillis(100))
+ .startSnapshotId(startSnapshotId)
+ .build();
+
+ StreamingMonitorFunction function = createFunction(scanContext);
+ try (AbstractStreamOperatorTestHarness<FlinkInputSplit> harness = createHarness(function)) {
+ harness.setup();
+ harness.open();
+
+ CountDownLatch latch = new CountDownLatch(1);
+ TestSourceContext sourceContext = new TestSourceContext(latch);
+ runSourceFunctionInTask(sourceContext, function);
+
+ Assert.assertTrue("Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
+ Thread.sleep(1000L);
+
+ // Stop the stream task.
+ function.close();
+
+ Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size());
+ TestFlinkScan.assertRecords(sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA);
+ }
+ }
+
+ @Test
+ public void testCheckpointRestore() throws Exception {
+ List<List<Record>> recordsList = generateRecordsAndCommitTxn(10);
+ ScanContext scanContext = ScanContext.builder()
+ .monitorInterval(Duration.ofMillis(100))
+ .build();
+
+ StreamingMonitorFunction func = createFunction(scanContext);
+ OperatorSubtaskState state;
+ try (AbstractStreamOperatorTestHarness<FlinkInputSplit> harness = createHarness(func)) {
+ harness.setup();
+ harness.open();
+
+ CountDownLatch latch = new CountDownLatch(1);
+ TestSourceContext sourceContext = new TestSourceContext(latch);
+ runSourceFunctionInTask(sourceContext, func);
+
+ Assert.assertTrue("Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
+ Thread.sleep(1000L);
+
+ state = harness.snapshot(1, 1);
+
+ // Stop the stream task.
+ func.close();
+
+ Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size());
+ TestFlinkScan.assertRecords(sourceContext.toRows(), Lists.newArrayList(Iterables.concat(recordsList)), SCHEMA);
+ }
+
+ List<List<Record>> newRecordsList = generateRecordsAndCommitTxn(10);
+ StreamingMonitorFunction newFunc = createFunction(scanContext);
+ try (AbstractStreamOperatorTestHarness<FlinkInputSplit> harness = createHarness(newFunc)) {
+ harness.setup();
+ // Recover to process the remaining snapshots.
+ harness.initializeState(state);
+ harness.open();
+
+ CountDownLatch latch = new CountDownLatch(1);
+ TestSourceContext sourceContext = new TestSourceContext(latch);
+ runSourceFunctionInTask(sourceContext, newFunc);
+
+ Assert.assertTrue("Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS));
+ Thread.sleep(1000L);
+
+ // Stop the stream task.
+ newFunc.close();
+
+ Assert.assertEquals("Should produce the expected splits", 1, sourceContext.splits.size());
+ TestFlinkScan.assertRecords(sourceContext.toRows(), Lists.newArrayList(Iterables.concat(newRecordsList)), SCHEMA);
+ }
+ }
+
+ private List<List<Record>> generateRecordsAndCommitTxn(int commitTimes) throws IOException {
+ List<List<Record>> expectedRecords = Lists.newArrayList();
+ for (int i = 0; i < commitTimes; i++) {
+ List<Record> records = RandomGenericData.generate(SCHEMA, 100, 0L);
+ expectedRecords.add(records);
+
+ // Commit those records to iceberg table.
+ writeRecords(records);
+ }
+ return expectedRecords;
+ }
+
+ private void writeRecords(List<Record> records) throws IOException {
+ GenericAppenderHelper appender = new GenericAppenderHelper(table, DEFAULT_FORMAT, temp);
+ appender.appendToTable(records);
+ }
+
+ private StreamingMonitorFunction createFunction(ScanContext scanContext) {
+ return new StreamingMonitorFunction(TestTableLoader.of(tableDir.getAbsolutePath()), scanContext);
+ }
+
+ private AbstractStreamOperatorTestHarness<FlinkInputSplit> createHarness(StreamingMonitorFunction function)
+ throws Exception {
+ StreamSource<FlinkInputSplit, StreamingMonitorFunction> streamSource = new StreamSource<>(function);
+ return new AbstractStreamOperatorTestHarness<>(streamSource, 1, 1, 0);
+ }
+
+ private class TestSourceContext implements SourceFunction.SourceContext<FlinkInputSplit> {
+ private final List<FlinkInputSplit> splits = Lists.newArrayList();
+ private final Object checkpointLock = new Object();
+ private final CountDownLatch latch;
+
+ TestSourceContext(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ @Override
+ public void collect(FlinkInputSplit element) {
+ splits.add(element);
+ latch.countDown();
+ }
+
+ @Override
+ public void collectWithTimestamp(FlinkInputSplit element, long timestamp) {
+ collect(element);
+ }
+
+ @Override
+ public void emitWatermark(Watermark mark) {
+
+ }
+
+ @Override
+ public void markAsTemporarilyIdle() {
+
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return checkpointLock;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ private List<Row> toRows() throws IOException {
+ FlinkInputFormat format = FlinkSource.forRowData()
+ .tableLoader(TestTableLoader.of(tableDir.getAbsolutePath()))
+ .buildFormat();
+
+ List<Row> rows = Lists.newArrayList();
+ for (FlinkInputSplit split : splits) {
+ format.open(split);
+
+ RowData element = null;
+ try {
+ while (!format.reachedEnd()) {
+ element = format.nextRecord(element);
+ rows.add(Row.of(element.getInt(0), element.getString(1).toString()));
+ }
+ } finally {
+ format.close();
+ }
+ }
+
+ return rows;
+ }
+ }
+}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java
new file mode 100644
index 0000000..112f021
--- /dev/null
+++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
+import org.apache.flink.streaming.runtime.tasks.mailbox.SteppingMailboxProcessor;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.TestTableLoader;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestStreamingReaderOperator extends TableTestBase {
+
+ private static final Schema SCHEMA = new Schema(
+ Types.NestedField.required(1, "id", Types.IntegerType.get()),
+ Types.NestedField.required(2, "data", Types.StringType.get())
+ );
+ private static final FileFormat DEFAULT_FORMAT = FileFormat.PARQUET;
+
+ @Parameterized.Parameters(name = "FormatVersion={0}")
+ public static Iterable<Object[]> parameters() {
+ return ImmutableList.of(
+ new Object[] {1},
+ new Object[] {2}
+ );
+ }
+
+ public TestStreamingReaderOperator(int formatVersion) {
+ super(formatVersion);
+ }
+
+ @Before
+ @Override
+ public void setupTable() throws IOException {
+ this.tableDir = temp.newFolder();
+ this.metadataDir = new File(tableDir, "metadata");
+ Assert.assertTrue(tableDir.delete());
+
+ // Construct the iceberg table.
+ table = create(SCHEMA, PartitionSpec.unpartitioned());
+ }
+
+ @Test
+ public void testProcessAllRecords() throws Exception {
+ List<List<Record>> expectedRecords = generateRecordsAndCommitTxn(10);
+
+ List<FlinkInputSplit> splits = generateSplits();
+ Assert.assertEquals("Should have 10 splits", 10, splits.size());
+
+ try (OneInputStreamOperatorTestHarness<FlinkInputSplit, RowData> harness = createReader()) {
+ harness.setup();
+ harness.open();
+
+ SteppingMailboxProcessor processor = createLocalMailbox(harness);
+
+ List<Record> expected = Lists.newArrayList();
+ for (int i = 0; i < splits.size(); i++) {
+ // Process this element to enqueue to mail-box.
+ harness.processElement(splits.get(i), -1);
+
+ // Run the mail-box once to read all records from the given split.
+ Assert.assertTrue("Should processed 1 split", processor.runMailboxStep());
+
+ // Assert the output has expected elements.
+ expected.addAll(expectedRecords.get(i));
+ TestFlinkScan.assertRecords(readOutputValues(harness), expected, SCHEMA);
+ }
+ }
+ }
+
+ @Test
+ public void testTriggerCheckpoint() throws Exception {
+ // Received emitted splits: split1, split2, split3, checkpoint request is triggered when reading records from
+ // split1.
+ List<List<Record>> expectedRecords = generateRecordsAndCommitTxn(3);
+
+ List<FlinkInputSplit> splits = generateSplits();
+ Assert.assertEquals("Should have 3 splits", 3, splits.size());
+
+ long timestamp = 0;
+ try (OneInputStreamOperatorTestHarness<FlinkInputSplit, RowData> harness = createReader()) {
+ harness.setup();
+ harness.open();
+
+ SteppingMailboxProcessor processor = createLocalMailbox(harness);
+
+ harness.processElement(splits.get(0), ++timestamp);
+ harness.processElement(splits.get(1), ++timestamp);
+ harness.processElement(splits.get(2), ++timestamp);
+
+ // Trigger snapshot state, it will start to work once all records from split0 are read.
+ processor.getMainMailboxExecutor()
+ .execute(() -> harness.snapshot(1, 3), "Trigger snapshot");
+
+ Assert.assertTrue("Should have processed the split0", processor.runMailboxStep());
+ Assert.assertTrue("Should have processed the snapshot state action", processor.runMailboxStep());
+
+ TestFlinkScan.assertRecords(readOutputValues(harness), expectedRecords.get(0), SCHEMA);
+
+ // Read records from split1.
+ Assert.assertTrue("Should have processed the split1", processor.runMailboxStep());
+
+ // Read records from split2.
+ Assert.assertTrue("Should have processed the split2", processor.runMailboxStep());
+
+ TestFlinkScan.assertRecords(readOutputValues(harness),
+ Lists.newArrayList(Iterables.concat(expectedRecords)), SCHEMA);
+ }
+ }
+
+ @Test
+ public void testCheckpointRestore() throws Exception {
+ List<List<Record>> expectedRecords = generateRecordsAndCommitTxn(15);
+
+ List<FlinkInputSplit> splits = generateSplits();
+ Assert.assertEquals("Should have 10 splits", 15, splits.size());
+
+ OperatorSubtaskState state;
+ List<Record> expected = Lists.newArrayList();
+ try (OneInputStreamOperatorTestHarness<FlinkInputSplit, RowData> harness = createReader()) {
+ harness.setup();
+ harness.open();
+
+ // Enqueue all the splits.
+ for (FlinkInputSplit split : splits) {
+ harness.processElement(split, -1);
+ }
+
+ // Read all records from the first five splits.
+ SteppingMailboxProcessor localMailbox = createLocalMailbox(harness);
+ for (int i = 0; i < 5; i++) {
+ expected.addAll(expectedRecords.get(i));
+ Assert.assertTrue("Should have processed the split#" + i, localMailbox.runMailboxStep());
+
+ TestFlinkScan.assertRecords(readOutputValues(harness), expected, SCHEMA);
+ }
+
+ // Snapshot state now, there're 10 splits left in the state.
+ state = harness.snapshot(1, 1);
+ }
+
+ expected.clear();
+ try (OneInputStreamOperatorTestHarness<FlinkInputSplit, RowData> harness = createReader()) {
+ harness.setup();
+ // Recover to process the remaining splits.
+ harness.initializeState(state);
+ harness.open();
+
+ SteppingMailboxProcessor localMailbox = createLocalMailbox(harness);
+
+ for (int i = 5; i < 10; i++) {
+ expected.addAll(expectedRecords.get(i));
+ Assert.assertTrue("Should have processed one split#" + i, localMailbox.runMailboxStep());
+
+ TestFlinkScan.assertRecords(readOutputValues(harness), expected, SCHEMA);
+ }
+
+ // Let's process the final 5 splits now.
+ for (int i = 10; i < 15; i++) {
+ expected.addAll(expectedRecords.get(i));
+ harness.processElement(splits.get(i), 1);
+
+ Assert.assertTrue("Should have processed the split#" + i, localMailbox.runMailboxStep());
+ TestFlinkScan.assertRecords(readOutputValues(harness), expected, SCHEMA);
+ }
+ }
+ }
+
+ private List<Row> readOutputValues(OneInputStreamOperatorTestHarness<FlinkInputSplit, RowData> harness) {
+ List<Row> results = Lists.newArrayList();
+ for (RowData rowData : harness.extractOutputValues()) {
+ results.add(Row.of(rowData.getInt(0), rowData.getString(1).toString()));
+ }
+ return results;
+ }
+
+ private List<List<Record>> generateRecordsAndCommitTxn(int commitTimes) throws IOException {
+ List<List<Record>> expectedRecords = Lists.newArrayList();
+ for (int i = 0; i < commitTimes; i++) {
+ List<Record> records = RandomGenericData.generate(SCHEMA, 100, 0L);
+ expectedRecords.add(records);
+
+ // Commit those records to iceberg table.
+ writeRecords(records);
+ }
+ return expectedRecords;
+ }
+
+ private void writeRecords(List<Record> records) throws IOException {
+ GenericAppenderHelper appender = new GenericAppenderHelper(table, DEFAULT_FORMAT, temp);
+ appender.appendToTable(records);
+ }
+
+ private List<FlinkInputSplit> generateSplits() {
+ List<FlinkInputSplit> inputSplits = Lists.newArrayList();
+
+ List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+ for (int i = snapshotIds.size() - 1; i >= 0; i--) {
+ ScanContext scanContext;
+ if (i == snapshotIds.size() - 1) {
+ // Generate the splits from the first snapshot.
+ scanContext = ScanContext.builder()
+ .useSnapshotId(snapshotIds.get(i))
+ .build();
+ } else {
+ // Generate the splits between the previous snapshot and current snapshot.
+ scanContext = ScanContext.builder()
+ .startSnapshotId(snapshotIds.get(i + 1))
+ .endSnapshotId(snapshotIds.get(i))
+ .build();
+ }
+
+ Collections.addAll(inputSplits, FlinkSplitGenerator.createInputSplits(table, scanContext));
+ }
+
+ return inputSplits;
+ }
+
+ private OneInputStreamOperatorTestHarness<FlinkInputSplit, RowData> createReader() throws Exception {
+ // This input format is used to opening the emitted split.
+ FlinkInputFormat inputFormat = FlinkSource.forRowData()
+ .tableLoader(TestTableLoader.of(tableDir.getAbsolutePath()))
+ .buildFormat();
+
+ OneInputStreamOperatorFactory<FlinkInputSplit, RowData> factory = StreamingReaderOperator.factory(inputFormat);
+ OneInputStreamOperatorTestHarness<FlinkInputSplit, RowData> harness = new OneInputStreamOperatorTestHarness<>(
+ factory, 1, 1, 0);
+ harness.getStreamConfig().setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+ return harness;
+ }
+
+ private SteppingMailboxProcessor createLocalMailbox(
+ OneInputStreamOperatorTestHarness<FlinkInputSplit, RowData> harness) {
+ return new SteppingMailboxProcessor(
+ MailboxDefaultAction.Controller::suspendDefaultAction,
+ harness.getTaskMailbox(),
+ StreamTaskActionExecutor.IMMEDIATE);
+ }
+}