You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/05/04 05:05:09 UTC
[incubator-paimon] branch master updated: [core] Introduce consume-id for retore consuming and reserve snapshot (#1022)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 50d562252 [core] Introduce consume-id for retore consuming and reserve snapshot (#1022)
50d562252 is described below
commit 50d56225201dd934e1e40e5932ef3bde2cb38436
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Thu May 4 13:05:04 2023 +0800
[core] Introduce consume-id for retore consuming and reserve snapshot (#1022)
---
docs/content/how-to/querying-tables.md | 23 ++
.../shortcodes/generated/core_configuration.html | 6 +
.../main/java/org/apache/paimon/CoreOptions.java | 11 +
.../java/org/apache/paimon/consumer/Consumer.java | 69 ++++++
.../apache/paimon/consumer/ConsumerManager.java | 100 +++++++++
.../paimon/operation/FileStoreExpireImpl.java | 10 +
.../table/source/AbstractInnerTableScan.java | 14 ++
.../table/source/InnerStreamTableScanImpl.java | 15 ++
.../paimon/table/source/StreamTableScan.java | 17 +-
.../table/source/snapshot/SnapshotSplitReader.java | 3 +
.../source/snapshot/SnapshotSplitReaderImpl.java | 10 +
.../apache/paimon/table/system/AuditLogTable.java | 11 +
.../org/apache/paimon/utils/SnapshotManager.java | 8 +
.../paimon/consumer/ConsumerManagerTest.java | 64 ++++++
.../paimon/table/FileStoreTableTestBase.java | 63 ++++++
.../java/org/apache/paimon/flink/FlinkRowData.java | 7 +-
.../paimon/flink/source/DataTableSource.java | 4 +-
.../paimon/flink/source/FlinkSourceBuilder.java | 119 ++++++----
.../flink/source/operator/MonitorFunction.java | 222 +++++++++++++++++++
.../paimon/flink/source/operator/ReadOperator.java | 71 ++++++
.../apache/paimon/flink/utils/JavaSerializer.java | 161 ++++++++++++++
.../apache/paimon/flink/utils/JavaTypeInfo.java | 128 +++++++++++
.../paimon/flink/ContinuousFileStoreITCase.java | 24 ++
.../org/apache/paimon/flink/FileStoreITCase.java | 17 +-
.../source/ContinuousFileSplitEnumeratorTest.java | 4 +
.../flink/source/operator/OperatorSourceTest.java | 243 +++++++++++++++++++++
.../paimon/flink/utils/JavaSerializerTest.java | 86 ++++++++
.../paimon/flink/utils/JavaTypeInfoTest.java | 30 ++-
28 files changed, 1478 insertions(+), 62 deletions(-)
diff --git a/docs/content/how-to/querying-tables.md b/docs/content/how-to/querying-tables.md
index cf8aad5e6..6614b8fa7 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -130,6 +130,29 @@ SELECT * FROM t TIMESTAMP AS OF 1678883047;
{{< /tabs >}}
+## Consumer ID
+
+{{< hint info >}}
+This is an experimental feature.
+{{< /hint >}}
+
+You can specify the `consumer-id` when streaming read table:
+```sql
+SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid') */;
+```
+
+When stream read Paimon tables, the next snapshot id to be recorded into the file system. This has several advantages:
+
+1. When previous job is stopped, the newly started job can continue to consume from the previous progress without
+ resuming from the state. The newly reading will start reading from next snapshot id found in consumer files.
+2. When deciding whether a snapshot has expired, Paimon looks at all the consumers of the table in the file system,
+ and if there are consumers that still depend on this snapshot, then this snapshot will not be deleted by expiration.
+
+{{< hint info >}}
+NOTE: If there is a consumer that will not be used anymore, please delete it, otherwise it will affect the expiration
+of the snapshot. The consumer file is in `${table-path}/consumer/consumer-${consumer-id}`.
+{{< /hint >}}
+
## System Tables
System tables contain metadata and information about each table, such as the snapshots created and the options in use. Users can access system tables with batch queries.
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 16138d13b..6c704aa38 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -68,6 +68,12 @@
<td>Integer</td>
<td>Percentage flexibility while comparing sorted run size for changelog mode table. If the candidate sorted run(s) size is 1% smaller than the next sorted run's size, then include next sorted run into this candidate set.</td>
</tr>
+ <tr>
+ <td><h5>consumer-id</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Consumer id for recording the offset of consumption in the storage.</td>
+ </tr>
<tr>
<td><h5>continuous.discovery-interval</h5></td>
<td style="word-wrap: break-word;">1 s</td>
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index dc2e310c8..7bb992cfa 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -564,6 +564,13 @@ public class CoreOptions implements Serializable {
.defaultValue(1024)
.withDescription("Read batch size for orc and parquet.");
+ public static final ConfigOption<String> CONSUMER_ID =
+ key("consumer-id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Consumer id for recording the offset of consumption in the storage.");
+
@Deprecated
@ExcludeFromDocumentation("For compatibility with older versions")
public static final ConfigOption<Boolean> APPEND_ONLY_ASSERT_DISORDER =
@@ -877,6 +884,10 @@ public class CoreOptions implements Serializable {
return options.get(READ_BATCH_SIZE);
}
+ public String consumerId() {
+ return options.get(CONSUMER_ID);
+ }
+
public static StreamingReadMode streamReadType(Options options) {
return options.get(STREAMING_READ_MODE);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java b/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java
new file mode 100644
index 000000000..82a25c0af
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.consumer;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/** Consumer which contains next snapshot. */
+public class Consumer {
+
+ private static final String FIELD_NEXT_SNAPSHOT = "nextSnapshot";
+
+ private final long nextSnapshot;
+
+ @JsonCreator
+ public Consumer(@JsonProperty(FIELD_NEXT_SNAPSHOT) long nextSnapshot) {
+ this.nextSnapshot = nextSnapshot;
+ }
+
+ @JsonGetter(FIELD_NEXT_SNAPSHOT)
+ public long nextSnapshot() {
+ return nextSnapshot;
+ }
+
+ public String toJson() {
+ return JsonSerdeUtil.toJson(this);
+ }
+
+ public static Consumer fromJson(String json) {
+ return JsonSerdeUtil.fromJson(json, Consumer.class);
+ }
+
+ public static Optional<Consumer> fromPath(FileIO fileIO, Path path) {
+ try {
+ if (!fileIO.exists(path)) {
+ return Optional.empty();
+ }
+
+ String json = fileIO.readFileUtf8(path);
+ return Optional.of(Consumer.fromJson(json));
+ } catch (IOException e) {
+ throw new RuntimeException("Fails to read snapshot from path " + path, e);
+ }
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
new file mode 100644
index 000000000..273a32dfc
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.consumer;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+/** Manage consumer groups. */
+public class ConsumerManager implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String CONSUMER_PREFIX = "consumer-";
+
+ private final FileIO fileIO;
+ private final Path tablePath;
+
+ public ConsumerManager(FileIO fileIO, Path tablePath) {
+ this.fileIO = fileIO;
+ this.tablePath = tablePath;
+ }
+
+ public Optional<Consumer> consumer(String consumerId) {
+ return Consumer.fromPath(fileIO, consumerPath(consumerId));
+ }
+
+ public void recordConsumer(String consumerId, Consumer consumer) {
+ try (PositionOutputStream out = fileIO.newOutputStream(consumerPath(consumerId), true)) {
+ OutputStreamWriter writer = new OutputStreamWriter(out, StandardCharsets.UTF_8);
+ writer.write(consumer.toJson());
+ writer.flush();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ public OptionalLong minNextSnapshot() {
+ try {
+ Path directory = consumerDirectory();
+ if (!fileIO.exists(directory)) {
+ return OptionalLong.empty();
+ }
+
+ FileStatus[] statuses = fileIO.listStatus(directory);
+
+ if (statuses == null) {
+ throw new RuntimeException(
+ String.format(
+ "The return value is null of the listStatus for the '%s' directory.",
+ directory));
+ }
+
+ return Arrays.stream(statuses)
+ .map(FileStatus::getPath)
+ .filter(path -> path.getName().startsWith(CONSUMER_PREFIX))
+ .map(path -> Consumer.fromPath(fileIO, path))
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .mapToLong(Consumer::nextSnapshot)
+ .reduce(Math::min);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private Path consumerDirectory() {
+ return new Path(tablePath + "/consumer");
+ }
+
+ private Path consumerPath(String consumerId) {
+ return new Path(tablePath + "/consumer/" + CONSUMER_PREFIX + consumerId);
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
index 8f0122e90..d31a1983a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
@@ -20,6 +20,7 @@ package org.apache.paimon.operation;
import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -45,6 +46,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.OptionalLong;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -71,6 +73,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
private final FileStorePathFactory pathFactory;
private final SnapshotManager snapshotManager;
+ private final ConsumerManager consumerManager;
private final ManifestFile manifestFile;
private final ManifestList manifestList;
@@ -91,6 +94,8 @@ public class FileStoreExpireImpl implements FileStoreExpire {
this.millisRetained = millisRetained;
this.pathFactory = pathFactory;
this.snapshotManager = snapshotManager;
+ this.consumerManager =
+ new ConsumerManager(snapshotManager.fileIO(), snapshotManager.tablePath());
this.manifestFile = manifestFileFactory.create();
this.manifestList = manifestListFactory.create();
}
@@ -135,6 +140,11 @@ public class FileStoreExpireImpl implements FileStoreExpire {
}
private void expireUntil(long earliestId, long endExclusiveId) {
+ OptionalLong minNextSnapshot = consumerManager.minNextSnapshot();
+ if (minNextSnapshot.isPresent()) {
+ endExclusiveId = Math.min(minNextSnapshot.getAsLong(), endExclusiveId);
+ }
+
if (endExclusiveId <= earliestId) {
// No expire happens:
// write the hint file in order to see the earliest snapshot directly next time
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
index 14066239f..250131467 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
@@ -21,6 +21,8 @@ package org.apache.paimon.table.source;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.consumer.Consumer;
+import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.table.source.snapshot.CompactedStartingScanner;
import org.apache.paimon.table.source.snapshot.ContinuousCompactorStartingScanner;
@@ -35,6 +37,8 @@ import org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner
import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
import org.apache.paimon.utils.Preconditions;
+import java.util.Optional;
+
import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
/** An abstraction layer above {@link FileStoreScan} to provide input split generation. */
@@ -65,6 +69,16 @@ public abstract class AbstractInnerTableScan implements InnerTableScan {
return new ContinuousCompactorStartingScanner();
}
+ // read from consumer id
+ String consumerId = options.consumerId();
+ if (consumerId != null) {
+ ConsumerManager consumerManager = snapshotSplitReader.consumerManager();
+ Optional<Consumer> consumer = consumerManager.consumer(consumerId);
+ if (consumer.isPresent()) {
+ return new ContinuousFromSnapshotStartingScanner(consumer.get().nextSnapshot());
+ }
+ }
+
CoreOptions.StartupMode startupMode = options.startupMode();
switch (startupMode) {
case LATEST_FULL:
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
index 8c0f1ec31..dba177076 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.source;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.consumer.Consumer;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.snapshot.BoundedChecker;
import org.apache.paimon.table.source.snapshot.CompactionChangelogFollowUpScanner;
@@ -203,4 +204,18 @@ public class InnerStreamTableScanImpl extends AbstractInnerTableScan
public void restore(@Nullable Long nextSnapshotId) {
this.nextSnapshotId = nextSnapshotId;
}
+
+ @Override
+ public void notifyCheckpointComplete(@Nullable Long nextSnapshot) {
+ if (nextSnapshot == null) {
+ return;
+ }
+
+ String consumerId = options.consumerId();
+ if (consumerId != null) {
+ snapshotSplitReader
+ .consumerManager()
+ .recordConsumer(consumerId, new Consumer(nextSnapshot));
+ }
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
index 44f0ce43a..f22f70058 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
@@ -21,6 +21,8 @@ package org.apache.paimon.table.source;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.utils.Restorable;
+import javax.annotation.Nullable;
+
/**
* {@link TableScan} for streaming, supports {@link #checkpoint} and {@link #restore}.
*
@@ -29,4 +31,17 @@ import org.apache.paimon.utils.Restorable;
* @since 0.4.0
*/
@Public
-public interface StreamTableScan extends TableScan, Restorable<Long> {}
+public interface StreamTableScan extends TableScan, Restorable<Long> {
+
+ /** Restore from checkpoint next snapshot id. */
+ @Override
+ void restore(@Nullable Long nextSnapshotId);
+
+ /** Checkpoint to return next snapshot id. */
+ @Nullable
+ @Override
+ Long checkpoint();
+
+ /** Notifies the checkpoint complete with next snapshot id. */
+ void notifyCheckpointComplete(@Nullable Long nextSnapshot);
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReader.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReader.java
index d813ffcc7..87def5fa1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReader.java
@@ -19,6 +19,7 @@
package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.operation.ScanKind;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.DataSplit;
@@ -29,6 +30,8 @@ import java.util.List;
/** Read splits from specified {@link Snapshot} with given configuration. */
public interface SnapshotSplitReader {
+ ConsumerManager consumerManager();
+
SnapshotSplitReader withSnapshot(long snapshotId);
SnapshotSplitReader withFilter(Predicate predicate);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReaderImpl.java
index 5b3176e50..6ab594501 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReaderImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotSplitReaderImpl.java
@@ -23,6 +23,7 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileKind;
@@ -47,10 +48,12 @@ import static org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping
/** Implementation of {@link SnapshotSplitReader}. */
public class SnapshotSplitReaderImpl implements SnapshotSplitReader {
+
private final FileStoreScan scan;
private final TableSchema tableSchema;
private final CoreOptions options;
private final SnapshotManager snapshotManager;
+ private final ConsumerManager consumerManager;
private final SplitGenerator splitGenerator;
private final BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer;
@@ -68,10 +71,17 @@ public class SnapshotSplitReaderImpl implements SnapshotSplitReader {
this.tableSchema = tableSchema;
this.options = options;
this.snapshotManager = snapshotManager;
+ this.consumerManager =
+ new ConsumerManager(snapshotManager.fileIO(), snapshotManager.tablePath());
this.splitGenerator = splitGenerator;
this.nonPartitionFilterConsumer = nonPartitionFilterConsumer;
}
+ @Override
+ public ConsumerManager consumerManager() {
+ return consumerManager;
+ }
+
@Override
public SnapshotSplitReader withSnapshot(long snapshotId) {
scan.withSnapshot(snapshotId);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 016e5b961..27d4efbb4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -19,6 +19,7 @@
package org.apache.paimon.table.system;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
@@ -184,6 +185,11 @@ public class AuditLogTable implements DataTable, ReadonlyTable {
this.snapshotSplitReader = snapshotSplitReader;
}
+ @Override
+ public ConsumerManager consumerManager() {
+ return snapshotSplitReader.consumerManager();
+ }
+
public SnapshotSplitReader withSnapshot(long snapshotId) {
snapshotSplitReader.withSnapshot(snapshotId);
return this;
@@ -267,6 +273,11 @@ public class AuditLogTable implements DataTable, ReadonlyTable {
public void restore(@Nullable Long nextSnapshotId) {
streamScan.restore(nextSnapshotId);
}
+
+ @Override
+ public void notifyCheckpointComplete(@Nullable Long nextSnapshot) {
+ streamScan.notifyCheckpointComplete(nextSnapshot);
+ }
}
private class AuditLogRead implements InnerTableRead {
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 077fe3cd0..f9f84fd55 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -55,6 +55,14 @@ public class SnapshotManager implements Serializable {
this.tablePath = tablePath;
}
+ public FileIO fileIO() {
+ return fileIO;
+ }
+
+ public Path tablePath() {
+ return tablePath;
+ }
+
public Path snapshotDirectory() {
return new Path(tablePath + "/snapshot");
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java
new file mode 100644
index 000000000..572055f17
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.consumer;
+
+import org.apache.paimon.fs.local.LocalFileIO;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link ConsumerManager}. */
+public class ConsumerManagerTest {
+
+ @TempDir Path tempDir;
+
+ private ConsumerManager manager;
+
+ @BeforeEach
+ public void before() {
+ this.manager =
+ new ConsumerManager(
+ LocalFileIO.create(), new org.apache.paimon.fs.Path(tempDir.toUri()));
+ }
+
+ @Test
+ public void test() {
+ Optional<Consumer> consumer = manager.consumer("id1");
+ assertThat(consumer).isEmpty();
+
+ assertThat(manager.minNextSnapshot()).isEmpty();
+
+ manager.recordConsumer("id1", new Consumer(5));
+ consumer = manager.consumer("id1");
+ assertThat(consumer).map(Consumer::nextSnapshot).get().isEqualTo(5L);
+
+ manager.recordConsumer("id2", new Consumer(8));
+ consumer = manager.consumer("id2");
+ assertThat(consumer).map(Consumer::nextSnapshot).get().isEqualTo(8L);
+
+ assertThat(manager.minNextSnapshot()).isEqualTo(OptionalLong.of(5L));
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index bc27cef46..d23132239 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -47,8 +47,11 @@ import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.InnerTableCommit;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
@@ -536,6 +539,66 @@ public abstract class FileStoreTableTestBase {
"1|40|400|binary|varbinary|mapKey:mapVal|multiset|4000"));
}
+ @Test
+ public void testConsumeId() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ options -> {
+ options.set(CoreOptions.CONSUMER_ID, "my_id");
+ options.set(SNAPSHOT_NUM_RETAINED_MIN, 3);
+ options.set(SNAPSHOT_NUM_RETAINED_MAX, 3);
+ });
+
+ StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder();
+ StreamTableWrite write = writeBuilder.newWrite();
+ StreamTableCommit commit = writeBuilder.newCommit();
+
+ ReadBuilder readBuilder = table.newReadBuilder();
+ StreamTableScan scan = readBuilder.newStreamScan();
+ TableRead read = readBuilder.newRead();
+
+ write.write(rowData(1, 10, 100L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ List<String> result = getResult(read, scan.plan().splits(), STREAMING_ROW_TO_STRING);
+ assertThat(result)
+ .containsExactlyInAnyOrder("+1|10|100|binary|varbinary|mapKey:mapVal|multiset");
+
+ write.write(rowData(1, 20, 200L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ result = getResult(read, scan.plan().splits(), STREAMING_ROW_TO_STRING);
+ assertThat(result)
+ .containsExactlyInAnyOrder("+1|20|200|binary|varbinary|mapKey:mapVal|multiset");
+
+ // checkpoint and notifyCheckpointComplete
+ Long nextSnapshot = scan.checkpoint();
+ scan.notifyCheckpointComplete(nextSnapshot);
+
+ write.write(rowData(1, 30, 300L));
+ commit.commit(2, write.prepareCommit(true, 2));
+
+ // read again using consume id
+ scan = readBuilder.newStreamScan();
+ assertThat(scan.plan().splits()).isEmpty();
+ result = getResult(read, scan.plan().splits(), STREAMING_ROW_TO_STRING);
+ assertThat(result)
+ .containsExactlyInAnyOrder("+1|30|300|binary|varbinary|mapKey:mapVal|multiset");
+
+ // test snapshot expiration
+ for (int i = 3; i <= 8; i++) {
+ write.write(rowData(1, (i + 1) * 10, (i + 1) * 100L));
+ commit.commit(i, write.prepareCommit(true, i));
+ }
+
+ // not expire
+ result = getResult(read, scan.plan().splits(), STREAMING_ROW_TO_STRING);
+ assertThat(result)
+ .containsExactlyInAnyOrder("+1|40|400|binary|varbinary|mapKey:mapVal|multiset");
+
+ write.close();
+ }
+
protected List<String> getResult(
TableRead read,
List<Split> splits,
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java
index f57fd6349..6e71b3292 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java
@@ -39,12 +39,17 @@ import static org.apache.paimon.flink.FlinkRowWrapper.fromFlinkRowKind;
/** Convert to Flink row data. */
public class FlinkRowData implements RowData {
- private final InternalRow row;
+ private InternalRow row;
public FlinkRowData(InternalRow row) {
this.row = row;
}
+ public FlinkRowData replace(InternalRow row) {
+ this.row = row;
+ return this;
+ }
+
@Override
public int getArity() {
return row.getFieldCount();
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index b24967965..0002cb91b 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -39,7 +39,7 @@ import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.Projection;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
@@ -208,7 +208,7 @@ public class DataTableSource extends FlinkTableSource
!streaming, env -> configureSource(sourceBuilder, env));
}
- private DataStreamSource<RowData> configureSource(
+ private DataStream<RowData> configureSource(
FlinkSourceBuilder sourceBuilder, StreamExecutionEnvironment env) {
Options options = Options.fromMap(this.table.options());
Integer parallelism = options.get(FlinkConnectorOptions.SCAN_PARALLELISM);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 0895c7389..2159192b1 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -24,16 +24,20 @@ import org.apache.paimon.CoreOptions.StreamingReadMode;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.Projection;
import org.apache.paimon.flink.log.LogSourceProvider;
+import org.apache.paimon.flink.source.operator.MonitorFunction;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.connector.base.source.hybrid.HybridSource;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -122,23 +126,55 @@ public class FlinkSourceBuilder {
return this;
}
- private StaticFileStoreSource buildStaticFileSource() {
- return new StaticFileStoreSource(
- table.newReadBuilder().withProjection(projectedFields).withFilter(predicate),
- limit,
- Options.fromMap(table.options())
- .get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE),
- splits);
+ private ReadBuilder createReadBuilder() {
+ return table.newReadBuilder().withProjection(projectedFields).withFilter(predicate);
}
- private ContinuousFileStoreSource buildContinuousFileSource() {
- return new ContinuousFileStoreSource(
- table.newReadBuilder().withProjection(projectedFields).withFilter(predicate),
- table.options(),
- limit);
+ private DataStream<RowData> buildStaticFileSource() {
+ return toDataStream(
+ new StaticFileStoreSource(
+ createReadBuilder(),
+ limit,
+ Options.fromMap(table.options())
+ .get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE),
+ splits));
}
- public Source<RowData, ?, ?> buildSource() {
+ private DataStream<RowData> buildContinuousFileSource() {
+ return toDataStream(
+ new ContinuousFileStoreSource(createReadBuilder(), table.options(), limit));
+ }
+
+ private DataStream<RowData> toDataStream(Source<RowData, ?, ?> source) {
+ DataStreamSource<RowData> dataStream =
+ env.fromSource(
+ source,
+ watermarkStrategy == null
+ ? WatermarkStrategy.noWatermarks()
+ : watermarkStrategy,
+ tableIdentifier.asSummaryString(),
+ produceTypeInfo());
+ if (parallelism != null) {
+ dataStream.setParallelism(parallelism);
+ }
+ return dataStream;
+ }
+
+ private TypeInformation<RowData> produceTypeInfo() {
+ RowType rowType = toLogicalType(table.rowType());
+ LogicalType produceType =
+ Optional.ofNullable(projectedFields)
+ .map(Projection::of)
+ .map(p -> p.project(rowType))
+ .orElse(rowType);
+ return InternalTypeInfo.of(produceType);
+ }
+
+ public DataStream<RowData> build() {
+ if (env == null) {
+ throw new IllegalArgumentException("StreamExecutionEnvironment should not be null.");
+ }
+
if (isContinuous) {
TableScanUtils.streamingReadingValidate(table);
@@ -148,44 +184,47 @@ public class FlinkSourceBuilder {
if (logSourceProvider != null && streamingReadMode != FILE) {
if (startupMode != StartupMode.LATEST_FULL) {
- return logSourceProvider.createSource(null);
+ return toDataStream(logSourceProvider.createSource(null));
+ } else {
+ return toDataStream(
+ HybridSource.<RowData, StaticFileStoreSplitEnumerator>builder(
+ LogHybridSourceFactory.buildHybridFirstSource(
+ table, projectedFields, predicate))
+ .addSource(
+ new LogHybridSourceFactory(logSourceProvider),
+ Boundedness.CONTINUOUS_UNBOUNDED)
+ .build());
}
- return HybridSource.<RowData, StaticFileStoreSplitEnumerator>builder(
- LogHybridSourceFactory.buildHybridFirstSource(
- table, projectedFields, predicate))
- .addSource(
- new LogHybridSourceFactory(logSourceProvider),
- Boundedness.CONTINUOUS_UNBOUNDED)
- .build();
} else {
- return buildContinuousFileSource();
+ if (conf.contains(CoreOptions.CONSUMER_ID)) {
+ return buildContinuousStreamOperator();
+ } else {
+ return buildContinuousFileSource();
+ }
}
} else {
return buildStaticFileSource();
}
}
- public DataStreamSource<RowData> build() {
- if (env == null) {
- throw new IllegalArgumentException("StreamExecutionEnvironment should not be null.");
+ private DataStream<RowData> buildContinuousStreamOperator() {
+ DataStream<RowData> dataStream;
+ if (limit != null) {
+ throw new IllegalArgumentException(
+ "Cannot limit streaming source, please use batch execution mode.");
}
-
- RowType rowType = toLogicalType(table.rowType());
- LogicalType produceType =
- Optional.ofNullable(projectedFields)
- .map(Projection::of)
- .map(p -> p.project(rowType))
- .orElse(rowType);
- DataStreamSource<RowData> dataStream =
- env.fromSource(
- buildSource(),
- watermarkStrategy == null
- ? WatermarkStrategy.noWatermarks()
- : watermarkStrategy,
+ dataStream =
+ MonitorFunction.buildSource(
+ env,
tableIdentifier.asSummaryString(),
- InternalTypeInfo.of(produceType));
+ produceTypeInfo(),
+ createReadBuilder(),
+ conf.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis());
if (parallelism != null) {
- dataStream.setParallelism(parallelism);
+ dataStream.getTransformation().setParallelism(parallelism);
+ }
+ if (watermarkStrategy != null) {
+ dataStream = dataStream.assignTimestampsAndWatermarks(watermarkStrategy);
}
return dataStream;
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
new file mode 100644
index 000000000..0e8688ba9
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.operator;
+
+import org.apache.paimon.flink.utils.JavaTypeInfo;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.StreamTableScan;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.OptionalLong;
+import java.util.TreeMap;
+
+/**
+ * This is the single (non-parallel) monitoring task, it is responsible for:
+ *
+ * <ol>
+ * <li>Monitoring snapshots of the Paimon table.
+ * <li>Creating the {@link Split splits} corresponding to the incremental files
+ * <li>Assigning them to downstream tasks for further processing.
+ * </ol>
+ *
+ * <p>The splits to be read are forwarded to the downstream {@link ReadOperator} which can have
+ * parallelism greater than one.
+ */
+public class MonitorFunction extends RichSourceFunction<Split>
+ implements CheckpointedFunction, CheckpointListener {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(MonitorFunction.class);
+
+ private final ReadBuilder readBuilder;
+ private final long monitorInterval;
+
+ private volatile boolean isRunning = true;
+
+ private transient StreamTableScan scan;
+ private transient SourceContext<Split> ctx;
+
+ private transient ListState<Long> checkpointState;
+ private transient ListState<Tuple2<Long, Long>> nextSnapshotState;
+ private transient TreeMap<Long, Long> nextSnapshotPerCheckpoint;
+
+ public MonitorFunction(ReadBuilder readBuilder, long monitorInterval) {
+ this.readBuilder = readBuilder;
+ this.monitorInterval = monitorInterval;
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ this.scan = readBuilder.newStreamScan();
+
+ this.checkpointState =
+ context.getOperatorStateStore()
+ .getListState(
+ new ListStateDescriptor<>(
+ "next-snapshot", LongSerializer.INSTANCE));
+
+ @SuppressWarnings("unchecked")
+ final Class<Tuple2<Long, Long>> typedTuple =
+ (Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class;
+ this.nextSnapshotState =
+ context.getOperatorStateStore()
+ .getListState(
+ new ListStateDescriptor<>(
+ "next-snapshot-per-checkpoint",
+ new TupleSerializer<>(
+ typedTuple,
+ new TypeSerializer[] {
+ LongSerializer.INSTANCE, LongSerializer.INSTANCE
+ })));
+
+ this.nextSnapshotPerCheckpoint = new TreeMap<>();
+
+ if (context.isRestored()) {
+ LOG.info("Restoring state for the {}.", getClass().getSimpleName());
+
+ List<Long> retrievedStates = new ArrayList<>();
+ for (Long entry : this.checkpointState.get()) {
+ retrievedStates.add(entry);
+ }
+
+ // given that the parallelism of the function is 1, we can only have 1 retrieved items.
+
+ Preconditions.checkArgument(
+ retrievedStates.size() <= 1,
+ getClass().getSimpleName() + " retrieved invalid state.");
+
+ if (retrievedStates.size() == 1) {
+ this.scan.restore(retrievedStates.get(0));
+ }
+
+ for (Tuple2<Long, Long> tuple2 : nextSnapshotState.get()) {
+ nextSnapshotPerCheckpoint.put(tuple2.f0, tuple2.f1);
+ }
+ } else {
+ LOG.info("No state to restore for the {}.", getClass().getSimpleName());
+ }
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
+ this.checkpointState.clear();
+ Long nextSnapshot = this.scan.checkpoint();
+ if (nextSnapshot != null) {
+ this.checkpointState.add(nextSnapshot);
+ this.nextSnapshotPerCheckpoint.put(ctx.getCheckpointId(), nextSnapshot);
+ }
+
+ List<Tuple2<Long, Long>> nextSnapshots = new ArrayList<>();
+ this.nextSnapshotPerCheckpoint.forEach((k, v) -> nextSnapshots.add(new Tuple2<>(k, v)));
+ this.nextSnapshotState.update(nextSnapshots);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} checkpoint {}.", getClass().getSimpleName(), nextSnapshot);
+ }
+ }
+
+ @SuppressWarnings("BusyWait")
+ @Override
+ public void run(SourceContext<Split> ctx) throws Exception {
+ this.ctx = ctx;
+ while (isRunning) {
+ boolean isEmpty;
+ synchronized (ctx.getCheckpointLock()) {
+ if (!isRunning) {
+ return;
+ }
+ try {
+ List<Split> splits = scan.plan().splits();
+ isEmpty = splits.isEmpty();
+ splits.forEach(ctx::collect);
+ } catch (EndOfScanException esf) {
+ LOG.info("Catching EndOfStreamException, the stream is finished.");
+ return;
+ }
+ }
+
+ if (isEmpty) {
+ Thread.sleep(monitorInterval);
+ }
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ NavigableMap<Long, Long> nextSnapshots =
+ nextSnapshotPerCheckpoint.headMap(checkpointId, true);
+ OptionalLong max = nextSnapshots.values().stream().mapToLong(Long::longValue).max();
+ max.ifPresent(scan::notifyCheckpointComplete);
+ nextSnapshots.clear();
+ }
+
+ @Override
+ public void cancel() {
+ // this is to cover the case where cancel() is called before the run()
+ if (ctx != null) {
+ synchronized (ctx.getCheckpointLock()) {
+ isRunning = false;
+ }
+ } else {
+ isRunning = false;
+ }
+ }
+
+ public static DataStream<RowData> buildSource(
+ StreamExecutionEnvironment env,
+ String name,
+ TypeInformation<RowData> typeInfo,
+ ReadBuilder readBuilder,
+ long monitorInterval) {
+ return env.addSource(
+ new MonitorFunction(readBuilder, monitorInterval),
+ name + "-Monitor",
+ new JavaTypeInfo<>(Split.class))
+ .forceNonParallel()
+ .partitionCustom(
+ (key, numPartitions) -> key % numPartitions,
+ split -> ((DataSplit) split).bucket())
+ .transform(name + "-Reader", typeInfo, new ReadOperator(readBuilder));
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
new file mode 100644
index 000000000..f33e31b5f
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.operator;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.utils.CloseableIterator;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * The operator that reads the {@link Split splits} received from the preceding {@link
+ * MonitorFunction}. Contrary to the {@link MonitorFunction} which has a parallelism of 1, this
+ * operator can have DOP > 1.
+ */
+public class ReadOperator extends AbstractStreamOperator<RowData>
+ implements OneInputStreamOperator<Split, RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final ReadBuilder readBuilder;
+
+ private transient TableRead read;
+ private transient StreamRecord<RowData> reuseRecord;
+ private transient FlinkRowData reuseRow;
+
+ public ReadOperator(ReadBuilder readBuilder) {
+ this.readBuilder = readBuilder;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ this.read = readBuilder.newRead();
+ this.reuseRow = new FlinkRowData(null);
+ this.reuseRecord = new StreamRecord<>(reuseRow);
+ }
+
+ @Override
+ public void processElement(StreamRecord<Split> record) throws Exception {
+ try (CloseableIterator<InternalRow> iterator =
+ read.createReader(record.getValue()).toCloseableIterator()) {
+ while (iterator.hasNext()) {
+ reuseRow.replace(iterator.next());
+ output.collect(reuseRecord);
+ }
+ }
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/JavaSerializer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/JavaSerializer.java
new file mode 100644
index 000000000..4fe1c82e8
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/JavaSerializer.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.api.common.typeutils.GenericTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A type serializer that serializes its type using the Java serialization. */
+public class JavaSerializer<T extends Serializable> extends TypeSerializer<T> {
+
+ private static final long serialVersionUID = 3L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(JavaSerializer.class);
+
+ private final Class<T> type;
+
+ public JavaSerializer(Class<T> type) {
+ this.type = checkNotNull(type);
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public JavaSerializer<T> duplicate() {
+ return new JavaSerializer<>(type);
+ }
+
+ @Override
+ public T createInstance() {
+ try {
+ return type.newInstance();
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public T copy(T from) {
+ try {
+ return InstantiationUtil.clone(from);
+ } catch (IOException | ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public T copy(T from, T reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(T record, DataOutputView target) throws IOException {
+ InstantiationUtil.serializeObject(new DataOutputViewStream(target), record);
+ }
+
+ @Override
+ public T deserialize(DataInputView source) throws IOException {
+ try {
+ return InstantiationUtil.deserializeObject(
+ new DataInputViewStream(source), getClass().getClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public T deserialize(T reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ serialize(deserialize(source), target);
+ }
+
+ @Override
+ public int hashCode() {
+ return type.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof JavaSerializer) {
+ JavaSerializer<?> other = (JavaSerializer<?>) obj;
+
+ return type == other.type;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public TypeSerializerSnapshot<T> snapshotConfiguration() {
+ return new JavaSerializerSnapshot<>(type);
+ }
+
+ /** {@link JavaSerializer} snapshot class. */
+ public static final class JavaSerializerSnapshot<T extends Serializable>
+ extends GenericTypeSerializerSnapshot<T, JavaSerializer> {
+
+ @SuppressWarnings("unused")
+ public JavaSerializerSnapshot() {}
+
+ JavaSerializerSnapshot(Class<T> typeClass) {
+ super(typeClass);
+ }
+
+ @Override
+ protected TypeSerializer<T> createSerializer(Class<T> typeClass) {
+ return new JavaSerializer<>(typeClass);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected Class<T> getTypeClass(JavaSerializer serializer) {
+ return serializer.type;
+ }
+
+ @Override
+ protected Class<?> serializerClass() {
+ return JavaSerializer.class;
+ }
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/JavaTypeInfo.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/JavaTypeInfo.java
new file mode 100644
index 000000000..a36243c5b
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/JavaTypeInfo.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A {@link TypeInformation} which uses java serialization. */
+public class JavaTypeInfo<T extends Serializable> extends TypeInformation<T>
+ implements AtomicType<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Class<T> typeClass;
+
+ public JavaTypeInfo(Class<T> typeClass) {
+ this.typeClass = checkNotNull(typeClass);
+ }
+
+ @Override
+ @PublicEvolving
+ public boolean isBasicType() {
+ return false;
+ }
+
+ @Override
+ @PublicEvolving
+ public boolean isTupleType() {
+ return false;
+ }
+
+ @Override
+ @PublicEvolving
+ public int getArity() {
+ return 1;
+ }
+
+ @Override
+ @PublicEvolving
+ public int getTotalFields() {
+ return 1;
+ }
+
+ @Override
+ @PublicEvolving
+ public Class<T> getTypeClass() {
+ return typeClass;
+ }
+
+ @Override
+ @PublicEvolving
+ public boolean isKeyType() {
+ return Comparable.class.isAssignableFrom(typeClass);
+ }
+
+ @Override
+ public TypeSerializer<T> createSerializer(ExecutionConfig config) {
+ return new JavaSerializer<>(this.typeClass);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public TypeComparator<T> createComparator(
+ boolean sortOrderAscending, ExecutionConfig executionConfig) {
+ if (isKeyType()) {
+ @SuppressWarnings("rawtypes")
+ GenericTypeComparator comparator =
+ new GenericTypeComparator(
+ sortOrderAscending, createSerializer(executionConfig), this.typeClass);
+ return (TypeComparator<T>) comparator;
+ }
+
+ throw new UnsupportedOperationException(
+ "Types that do not implement java.lang.Comparable cannot be used as keys.");
+ }
+
+ @Override
+ public int hashCode() {
+ return typeClass.hashCode();
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof JavaTypeInfo;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof JavaTypeInfo) {
+ @SuppressWarnings("unchecked")
+ JavaTypeInfo<T> genericTypeInfo = (JavaTypeInfo<T>) obj;
+
+ return typeClass == genericTypeInfo.typeClass;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "JavaType<" + typeClass.getCanonicalName() + ">";
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index df2fddbf7..c572d0309 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -92,6 +92,30 @@ public class ContinuousFileStoreITCase extends CatalogITCaseBase {
testProjection("T2");
}
+ @TestTemplate
+ public void testConsumerId() throws Exception {
+ String table = "T2";
+ BlockingIterator<Row, Row> iterator =
+ BlockingIterator.of(
+ streamSqlIter(
+ "SELECT * FROM %s /*+ OPTIONS('consumer-id'='me') */", table));
+
+ batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table);
+ assertThat(iterator.collect(2))
+ .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
+
+ Thread.sleep(1000);
+ iterator.close();
+
+ iterator =
+ BlockingIterator.of(
+ streamSqlIter(
+ "SELECT * FROM %s /*+ OPTIONS('consumer-id'='me') */", table));
+ batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
+ assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7", "8", "9"));
+ iterator.close();
+ }
+
private void testSimple(String table) throws Exception {
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(streamSqlIter("SELECT * FROM %s", table));
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
index 1e17967e0..05070c5e9 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
@@ -37,9 +37,11 @@ import org.apache.paimon.utils.FailingFileIO;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.data.GenericRowData;
@@ -326,12 +328,15 @@ public class FileStoreITCase extends AbstractTestBase {
table =
table.copy(
Collections.singletonMap(CoreOptions.SCAN_BOUNDED_WATERMARK.key(), "1024"));
- Source<RowData, ?, ?> source =
+ DataStream<RowData> source =
new FlinkSourceBuilder(IDENTIFIER, table)
.withContinuousMode(true)
.withEnv(env)
- .buildSource();
- assertThat(source.getBoundedness()).isEqualTo(Boundedness.BOUNDED);
+ .build();
+ Transformation<RowData> transformation = source.getTransformation();
+ assertThat(transformation).isInstanceOf(SourceTransformation.class);
+ assertThat(((SourceTransformation<?, ?, ?>) transformation).getSource().getBoundedness())
+ .isEqualTo(Boundedness.BOUNDED);
}
private void innerTestContinuous(FileStoreTable table) throws Exception {
@@ -449,12 +454,12 @@ public class FileStoreITCase extends AbstractTestBase {
InternalTypeInfo.of(TABLE_TYPE));
}
- public static List<Row> executeAndCollect(DataStreamSource<RowData> source) throws Exception {
+ public static List<Row> executeAndCollect(DataStream<RowData> source) throws Exception {
return executeAndCollect(source, CONVERTER);
}
public static List<Row> executeAndCollect(
- DataStreamSource<RowData> source, DataStructureConverter<RowData, Row> converter)
+ DataStream<RowData> source, DataStructureConverter<RowData, Row> converter)
throws Exception {
CloseableIterator<RowData> iterator = source.executeAndCollect();
List<Row> results = new ArrayList<>();
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index 2ecd7f54e..a0ab1fc41 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -27,6 +27,7 @@ import org.apache.paimon.table.source.TableScan;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
+import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
@@ -318,6 +319,9 @@ public class ContinuousFileSplitEnumeratorTest {
return null;
}
+ @Override
+ public void notifyCheckpointComplete(@Nullable Long nextSnapshot) {}
+
@Override
public void restore(Long state) {}
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
new file mode 100644
index 000000000..91e4d4e3a
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.operator;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataTypes;
+
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.function.SupplierWithException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.paimon.CoreOptions.CONSUMER_ID;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link MonitorFunction} and {@link ReadOperator}. */
+public class OperatorSourceTest {
+
+ @TempDir Path tempDir;
+
+ private Table table;
+
+ @BeforeEach
+ public void before()
+ throws Catalog.TableAlreadyExistException, Catalog.DatabaseNotExistException,
+ Catalog.TableNotExistException, Catalog.DatabaseAlreadyExistException {
+ Catalog catalog =
+ CatalogFactory.createCatalog(
+ CatalogContext.create(new org.apache.paimon.fs.Path(tempDir.toUri())));
+ Schema schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.INT())
+ .column("c", DataTypes.INT())
+ .primaryKey("a")
+ .options(Collections.singletonMap(CONSUMER_ID.key(), "my_consumer"))
+ .build();
+ Identifier identifier = Identifier.create("default", "t");
+ catalog.createDatabase("default", false);
+ catalog.createTable(identifier, schema, false);
+ this.table = catalog.getTable(identifier);
+ }
+
+ private void writeToTable(int a, int b, int c) throws Exception {
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ BatchTableWrite write = writeBuilder.newWrite();
+ write.write(GenericRow.of(a, b, c));
+ writeBuilder.newCommit().commit(write.prepareCommit());
+ write.close();
+ }
+
+ private List<List<Integer>> readSplit(Split split) throws IOException {
+ TableRead read = table.newReadBuilder().newRead();
+ List<List<Integer>> result = new ArrayList<>();
+ read.createReader(split)
+ .forEachRemaining(
+ row ->
+ result.add(
+ Arrays.asList(
+ row.getInt(0), row.getInt(1), row.getInt(2))));
+ return result;
+ }
+
+ @Test
+ public void testMonitorFunction() throws Exception {
+ // 1. run first
+ OperatorSubtaskState snapshot;
+ {
+ MonitorFunction function = new MonitorFunction(table.newReadBuilder(), 10);
+ StreamSource<Split, MonitorFunction> src = new StreamSource<>(function);
+ AbstractStreamOperatorTestHarness<Split> testHarness =
+ new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
+ testHarness.open();
+ snapshot = testReadSplit(function, () -> testHarness.snapshot(0, 0), 1, 1, 1);
+ }
+
+ // 2. restore from state
+ {
+ MonitorFunction functionCopy1 = new MonitorFunction(table.newReadBuilder(), 10);
+ StreamSource<Split, MonitorFunction> srcCopy1 = new StreamSource<>(functionCopy1);
+ AbstractStreamOperatorTestHarness<Split> testHarnessCopy1 =
+ new AbstractStreamOperatorTestHarness<>(srcCopy1, 1, 1, 0);
+ testHarnessCopy1.initializeState(snapshot);
+ testHarnessCopy1.open();
+ testReadSplit(
+ functionCopy1,
+ () -> {
+ testHarnessCopy1.snapshot(1, 1);
+ testHarnessCopy1.notifyOfCompletedCheckpoint(1);
+ return null;
+ },
+ 2,
+ 2,
+ 2);
+ }
+
+ // 3. restore from consumer id
+ {
+ MonitorFunction functionCopy2 = new MonitorFunction(table.newReadBuilder(), 10);
+ StreamSource<Split, MonitorFunction> srcCopy2 = new StreamSource<>(functionCopy2);
+ AbstractStreamOperatorTestHarness<Split> testHarnessCopy2 =
+ new AbstractStreamOperatorTestHarness<>(srcCopy2, 1, 1, 0);
+ testHarnessCopy2.open();
+ testReadSplit(functionCopy2, () -> null, 3, 3, 3);
+ }
+ }
+
+ @Test
+ public void testReadOperator() throws Exception {
+ ReadOperator readOperator = new ReadOperator(table.newReadBuilder());
+ OneInputStreamOperatorTestHarness<Split, RowData> harness =
+ new OneInputStreamOperatorTestHarness<>(readOperator);
+ harness.setup(
+ InternalSerializers.create(
+ RowType.of(new IntType(), new IntType(), new IntType())));
+ writeToTable(1, 1, 1);
+ writeToTable(2, 2, 2);
+ List<Split> splits = table.newReadBuilder().newScan().plan().splits();
+ harness.open();
+ for (Split split : splits) {
+ harness.processElement(new StreamRecord<>(split));
+ }
+ ArrayList<Object> values = new ArrayList<>(harness.getOutput());
+ assertThat(values)
+ .containsExactlyInAnyOrder(
+ new StreamRecord<>(GenericRowData.of(1, 1, 1)),
+ new StreamRecord<>(GenericRowData.of(2, 2, 2)));
+ }
+
+ private <T> T testReadSplit(
+ MonitorFunction function,
+ SupplierWithException<T, Exception> beforeClose,
+ int a,
+ int b,
+ int c)
+ throws Exception {
+ Throwable[] error = new Throwable[1];
+ ArrayBlockingQueue<Split> queue = new ArrayBlockingQueue<>(10);
+
+ DummySourceContext sourceContext =
+ new DummySourceContext() {
+ @Override
+ public void collect(Split element) {
+ queue.add(element);
+ }
+ };
+
+ Thread runner =
+ new Thread(
+ () -> {
+ try {
+ function.run(sourceContext);
+ } catch (Throwable t) {
+ t.printStackTrace();
+ error[0] = t;
+ }
+ });
+ runner.start();
+
+ writeToTable(a, b, c);
+
+ Split split = queue.poll(1, TimeUnit.MINUTES);
+ assertThat(readSplit(split)).containsExactlyInAnyOrder(Arrays.asList(a, b, c));
+
+ T t = beforeClose.get();
+ function.cancel();
+ runner.join();
+
+ assertThat(error[0]).isNull();
+
+ return t;
+ }
+
+ private abstract static class DummySourceContext
+ implements SourceFunction.SourceContext<Split> {
+
+ private final Object lock = new Object();
+
+ @Override
+ public void collectWithTimestamp(Split element, long timestamp) {}
+
+ @Override
+ public void emitWatermark(Watermark mark) {}
+
+ @Override
+ public void markAsTemporarilyIdle() {}
+
+ @Override
+ public Object getCheckpointLock() {
+ return lock;
+ }
+
+ @Override
+ public void close() {}
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/JavaSerializerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/JavaSerializerTest.java
new file mode 100644
index 000000000..08bc0c445
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/JavaSerializerTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.Random;
+
+/** Test for {@link JavaSerializer}. */
+public class JavaSerializerTest extends SerializerTestBase<JavaSerializerTest.TestClass> {
+
+ @Override
+ protected TypeSerializer<TestClass> createSerializer() {
+ return new JavaSerializer<>(TestClass.class);
+ }
+
+ @Override
+ protected int getLength() {
+ return -1;
+ }
+
+ @Override
+ protected Class<TestClass> getTypeClass() {
+ return TestClass.class;
+ }
+
+ @Override
+ protected TestClass[] getTestData() {
+ Random rnd = new Random();
+ int rndInt = rnd.nextInt();
+
+ Integer[] integers =
+ new Integer[] {0, 1, -1, Integer.MAX_VALUE, Integer.MIN_VALUE, rndInt, -rndInt};
+ TestClass[] testClasses = new TestClass[integers.length];
+ for (int i = 0; i < integers.length; i++) {
+ testClasses[i] = new TestClass(integers[i]);
+ }
+ return testClasses;
+ }
+
+ static class TestClass implements Serializable {
+ int v;
+
+ public TestClass() {}
+
+ public TestClass(int v) {
+ this.v = v;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TestClass testClass = (TestClass) o;
+ return v == testClass.v;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(v);
+ }
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/JavaTypeInfoTest.java
similarity index 58%
copy from paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
copy to paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/JavaTypeInfoTest.java
index 44f0ce43a..5294f5bd9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/JavaTypeInfoTest.java
@@ -16,17 +16,23 @@
* limitations under the License.
*/
-package org.apache.paimon.table.source;
+package org.apache.paimon.flink.utils;
-import org.apache.paimon.annotation.Public;
-import org.apache.paimon.utils.Restorable;
+import org.apache.flink.api.common.typeutils.TypeInformationTestBase;
-/**
- * {@link TableScan} for streaming, supports {@link #checkpoint} and {@link #restore}.
- *
- * <p>NOTE: {@link #checkpoint} will return the next snapshot id.
- *
- * @since 0.4.0
- */
-@Public
-public interface StreamTableScan extends TableScan, Restorable<Long> {}
+import java.io.Serializable;
+
+/** Test for {@link JavaTypeInfo}. */
+class JavaTypeInfoTest extends TypeInformationTestBase<JavaTypeInfo<?>> {
+
+ @Override
+ protected JavaTypeInfo<?>[] getTestData() {
+ return new JavaTypeInfo<?>[] {
+ new JavaTypeInfo<>(TestClass.class), new JavaTypeInfo<>(AlternativeClass.class)
+ };
+ }
+
+ static class TestClass implements Serializable {}
+
+ static class AlternativeClass implements Serializable {}
+}