You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/15 14:20:02 UTC
[flink-table-store] branch master updated: [FLINK-26475] FileStoreScan supports incremental consuming
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 7bbffef [FLINK-26475] FileStoreScan supports incremental consuming
7bbffef is described below
commit 7bbffef4fb26048500d37dfc42bdf478df0de73e
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Tue Mar 15 22:14:34 2022 +0800
[FLINK-26475] FileStoreScan supports incremental consuming
This closes #43
---
.../flink/table/store/connector/TableStore.java | 16 +-
.../source/ContinuousFileSplitEnumerator.java | 239 +++++++++++++++++++++
.../store/connector/source/FileStoreSource.java | 61 +++++-
.../source/StaticFileStoreSplitEnumerator.java | 10 -
.../table/store/connector/FileStoreITCase.java | 77 ++++++-
.../table/store/connector/FiniteTestSource.java | 23 +-
.../store/connector/ReadWriteTableITCase.java | 14 +-
.../flink/table/store/file/FileStoreOptions.java | 6 +
.../table/store/file/operation/FileStoreScan.java | 4 +
.../store/file/operation/FileStoreScanImpl.java | 26 ++-
10 files changed, 425 insertions(+), 51 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index 534cbe7..ac8a3a0 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -59,6 +59,7 @@ import java.util.UUID;
import java.util.stream.Collectors;
import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
+import static org.apache.flink.table.store.file.FileStoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
/** A table store api to create source and sink. */
@Experimental
@@ -207,11 +208,17 @@ public class TableStore {
return this;
}
- private FileStoreSource buildFileStoreSource() {
+ private long discoveryIntervalMills() {
+ return options.get(CONTINUOUS_DISCOVERY_INTERVAL).toMillis();
+ }
+
+ private FileStoreSource buildFileSource(boolean isContinuous) {
FileStore fileStore = buildFileStore();
return new FileStoreSource(
fileStore,
primaryKeys.length == 0,
+ isContinuous,
+ discoveryIntervalMills(),
projectedFields,
partitionPredicate,
fieldPredicate);
@@ -220,15 +227,14 @@ public class TableStore {
public Source<RowData, ?, ?> build() {
if (isContinuous) {
if (logSourceProvider == null) {
- throw new UnsupportedOperationException(
- "File store continuous mode is not supported yet.");
+ return buildFileSource(true);
}
// TODO project log source
if (isHybrid) {
return HybridSource.<RowData, StaticFileStoreSplitEnumerator>builder(
- buildFileStoreSource())
+ buildFileSource(false))
.addSource(
new LogHybridSourceFactory(logSourceProvider),
Boundedness.CONTINUOUS_UNBOUNDED)
@@ -237,7 +243,7 @@ public class TableStore {
return logSourceProvider.createSource(null);
}
} else {
- return buildFileStoreSource();
+ return buildFileSource(false);
}
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
new file mode 100644
index 0000000..35e42cf
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import static org.apache.flink.table.store.connector.source.PendingSplitsCheckpoint.INVALID_SNAPSHOT;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A continuously monitoring enumerator. */
+public class ContinuousFileSplitEnumerator
+ implements SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileSplitEnumerator.class);
+
+ private final SplitEnumeratorContext<FileStoreSourceSplit> context;
+
+ private final FileStoreScan scan;
+
+ private final Map<Integer, Queue<FileStoreSourceSplit>> bucketSplits;
+
+ private final long discoveryInterval;
+
+ private final Set<Integer> readersAwaitingSplit;
+
+ private final FileStoreSourceSplitGenerator splitGenerator;
+
+ private final SnapshotEnumerator snapshotEnumerator;
+
+ private Long currentSnapshotId;
+
+ public ContinuousFileSplitEnumerator(
+ SplitEnumeratorContext<FileStoreSourceSplit> context,
+ FileStoreScan scan,
+ Collection<FileStoreSourceSplit> remainSplits,
+ long currentSnapshotId,
+ long discoveryInterval) {
+ checkArgument(discoveryInterval > 0L);
+ this.context = checkNotNull(context);
+ this.scan = checkNotNull(scan);
+ this.bucketSplits = new HashMap<>();
+ addSplits(remainSplits);
+ this.currentSnapshotId = currentSnapshotId;
+ this.discoveryInterval = discoveryInterval;
+ this.readersAwaitingSplit = new HashSet<>();
+ this.splitGenerator = new FileStoreSourceSplitGenerator();
+ this.snapshotEnumerator = new SnapshotEnumerator(currentSnapshotId);
+ }
+
+ private void addSplits(Collection<FileStoreSourceSplit> splits) {
+ splits.forEach(this::addSplit);
+ }
+
+ private void addSplit(FileStoreSourceSplit split) {
+ bucketSplits.computeIfAbsent(split.bucket(), i -> new LinkedList<>()).add(split);
+ }
+
+ @Override
+ public void start() {
+ context.callAsync(
+ snapshotEnumerator,
+ this::processDiscoveredSplits,
+ discoveryInterval,
+ discoveryInterval);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // no resources to close
+ }
+
+ @Override
+ public void addReader(int subtaskId) {
+ // this source is purely lazy-pull-based, nothing to do upon registration
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
+ readersAwaitingSplit.add(subtaskId);
+ assignSplits();
+ }
+
+ @Override
+ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+ LOG.error("Received unrecognized event: {}", sourceEvent);
+ }
+
+ @Override
+ public void addSplitsBack(List<FileStoreSourceSplit> splits, int subtaskId) {
+ LOG.debug("File Source Enumerator adds splits back: {}", splits);
+ addSplits(splits);
+ }
+
+ @Override
+ public PendingSplitsCheckpoint snapshotState(long checkpointId) {
+ List<FileStoreSourceSplit> splits = new ArrayList<>();
+ bucketSplits.values().forEach(splits::addAll);
+ final PendingSplitsCheckpoint checkpoint =
+ new PendingSplitsCheckpoint(
+ splits, currentSnapshotId == null ? INVALID_SNAPSHOT : currentSnapshotId);
+
+ LOG.debug("Source Checkpoint is {}", checkpoint);
+ return checkpoint;
+ }
+
+ // ------------------------------------------------------------------------
+
+ private void processDiscoveredSplits(@Nullable EnumeratorResult result, Throwable error) {
+ if (error != null) {
+ LOG.error("Failed to enumerate files", error);
+ return;
+ }
+
+ if (result == null) {
+ return;
+ }
+
+ currentSnapshotId = result.snapshotId;
+ addSplits(result.splits);
+ assignSplits();
+ }
+
+ private void assignSplits() {
+ bucketSplits.forEach(
+ (bucket, splits) -> {
+ if (splits.size() > 0) {
+ // To ensure the order of consumption, the data of the same bucket is given
+ // to a task to be consumed.
+ int task = bucket % context.currentParallelism();
+ if (readersAwaitingSplit.remove(task)) {
+ // if the reader that requested another split has failed in the
+ // meantime, remove
+ // it from the list of waiting readers
+ if (!context.registeredReaders().containsKey(task)) {
+ return;
+ }
+ }
+ context.assignSplit(splits.poll(), task);
+ }
+ });
+ }
+
+ private class SnapshotEnumerator implements Callable<EnumeratorResult> {
+
+ private long nextSnapshotId;
+
+ private SnapshotEnumerator(long currentSnapshot) {
+ this.nextSnapshotId = currentSnapshot + 1;
+ }
+
+ @Nullable
+ @Override
+ public EnumeratorResult call() {
+ // TODO sync with processDiscoveredSplits to avoid too more splits in memory
+ while (true) {
+ if (!scan.snapshotExists(nextSnapshotId)) {
+ // TODO check latest snapshot id, expired?
+ LOG.debug(
+ "Next snapshot id {} not exists, wait for it to be generated.",
+ nextSnapshotId);
+ return null;
+ }
+
+ Snapshot snapshot = scan.snapshot(nextSnapshotId);
+ if (snapshot.commitKind() != Snapshot.CommitKind.APPEND) {
+ if (snapshot.commitKind() == Snapshot.CommitKind.OVERWRITE) {
+ throw new UnsupportedOperationException(
+ "Invalid overwrite snapshot id: " + nextSnapshotId);
+ }
+
+ nextSnapshotId++;
+ LOG.debug(
+ "Next snapshot id {} is not append, but is {}, check next one.",
+ nextSnapshotId,
+ snapshot.commitKind());
+ continue;
+ }
+
+ List<FileStoreSourceSplit> splits =
+ splitGenerator.createSplits(scan.withSnapshot(nextSnapshotId).plan());
+ EnumeratorResult result = new EnumeratorResult(nextSnapshotId, splits);
+ LOG.debug("Find snapshot id {}, it has {} splits.", nextSnapshotId, splits.size());
+
+ nextSnapshotId++;
+ return result;
+ }
+ }
+ }
+
+ private static class EnumeratorResult {
+
+ private final long snapshotId;
+
+ private final List<FileStoreSourceSplit> splits;
+
+ private EnumeratorResult(long snapshotId, List<FileStoreSourceSplit> splits) {
+ this.snapshotId = snapshotId;
+ this.splits = splits;
+ }
+ }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index 6a72c55..36b7c68 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -33,6 +33,8 @@ import org.apache.flink.table.store.file.predicate.Predicate;
import javax.annotation.Nullable;
+import java.util.Collection;
+
import static org.apache.flink.table.store.connector.source.PendingSplitsCheckpoint.INVALID_SNAPSHOT;
/** {@link Source} of file store. */
@@ -45,6 +47,10 @@ public class FileStoreSource
private final boolean valueCountMode;
+ private final boolean isContinuous;
+
+ private final long discoveryInterval;
+
@Nullable private final int[][] projectedFields;
@Nullable private final Predicate partitionPredicate;
@@ -54,11 +60,15 @@ public class FileStoreSource
public FileStoreSource(
FileStore fileStore,
boolean valueCountMode,
+ boolean isContinuous,
+ long discoveryInterval,
@Nullable int[][] projectedFields,
@Nullable Predicate partitionPredicate,
final Predicate fieldPredicate) {
this.fileStore = fileStore;
this.valueCountMode = valueCountMode;
+ this.isContinuous = isContinuous;
+ this.discoveryInterval = discoveryInterval;
this.projectedFields = projectedFields;
this.partitionPredicate = partitionPredicate;
this.fieldPredicate = fieldPredicate;
@@ -73,20 +83,34 @@ public class FileStoreSource
@Override
public SourceReader<RowData, FileStoreSourceSplit> createReader(SourceReaderContext context) {
FileStoreRead read = fileStore.newRead();
+
+ if (isContinuous) {
+ read.withDropDelete(false);
+ }
+
if (projectedFields != null) {
if (valueCountMode) {
- // TODO don't project keys, and add key projection to split reader
+ // TODO when isContinuous is false, don't project keys, and add key projection to
+ // split reader
read.withKeyProjection(projectedFields);
} else {
read.withValueProjection(projectedFields);
}
}
+
return new FileStoreSourceReader(context, read, valueCountMode);
}
@Override
public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> createEnumerator(
SplitEnumeratorContext<FileStoreSourceSplit> context) {
+ return restoreEnumerator(context, null);
+ }
+
+ @Override
+ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnumerator(
+ SplitEnumeratorContext<FileStoreSourceSplit> context,
+ PendingSplitsCheckpoint checkpoint) {
FileStoreScan scan = fileStore.newScan();
if (partitionPredicate != null) {
scan.withPartitionFilter(partitionPredicate);
@@ -98,18 +122,33 @@ public class FileStoreSource
scan.withValueFilter(fieldPredicate);
}
}
- return new StaticFileStoreSplitEnumerator(context, scan);
- }
- @Override
- public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnumerator(
- SplitEnumeratorContext<FileStoreSourceSplit> context,
- PendingSplitsCheckpoint checkpoint) {
- Snapshot snapshot = null;
- if (checkpoint.currentSnapshotId() != INVALID_SNAPSHOT) {
- snapshot = fileStore.newScan().snapshot(checkpoint.currentSnapshotId());
+ Long snapshotId;
+ Collection<FileStoreSourceSplit> splits;
+ if (checkpoint == null) {
+ FileStoreScan.Plan plan = scan.plan();
+ snapshotId = plan.snapshotId();
+ splits = new FileStoreSourceSplitGenerator().createSplits(plan);
+ } else {
+ snapshotId = checkpoint.currentSnapshotId();
+ if (snapshotId == INVALID_SNAPSHOT) {
+ snapshotId = null;
+ }
+ splits = checkpoint.splits();
+ }
+
+ if (isContinuous) {
+ long currentSnapshot = snapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID - 1 : snapshotId;
+ return new ContinuousFileSplitEnumerator(
+ context,
+ scan.withIncremental(true),
+ splits,
+ currentSnapshot,
+ discoveryInterval);
+ } else {
+ Snapshot snapshot = snapshotId == null ? null : scan.snapshot(snapshotId);
+ return new StaticFileStoreSplitEnumerator(context, snapshot, splits);
}
- return new StaticFileStoreSplitEnumerator(context, snapshot, checkpoint.splits());
}
@Override
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
index 4fc828b..c289a60 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.store.connector.source;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.table.store.file.Snapshot;
-import org.apache.flink.table.store.file.operation.FileStoreScan;
import javax.annotation.Nullable;
@@ -52,15 +51,6 @@ public class StaticFileStoreSplitEnumerator
this.splits = new LinkedList<>(splits);
}
- public StaticFileStoreSplitEnumerator(
- SplitEnumeratorContext<FileStoreSourceSplit> context, FileStoreScan scan) {
- this.context = context;
- FileStoreScan.Plan plan = scan.plan();
- Long snapshotId = plan.snapshotId();
- this.snapshot = snapshotId == null ? null : scan.snapshot(snapshotId);
- this.splits = new LinkedList<>(new FileStoreSourceSplitGenerator().createSplits(plan));
- }
-
@Override
public void start() {
// no resources to start
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index 7c499d3..a0d5185 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -39,6 +39,7 @@ import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.junit.Assume;
@@ -55,6 +56,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Stream;
import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
@@ -210,6 +212,78 @@ public class FileStoreITCase extends AbstractTestBase {
assertThat(results).containsExactlyInAnyOrder(expected);
}
+ @Test
+ public void testContinuous() throws Exception {
+ store.withPrimaryKeys(new int[] {2});
+ innerTestContinuous();
+ }
+
+ @Test
+ public void testContinuousWithoutPK() throws Exception {
+ store.withPrimaryKeys(new int[0]);
+ innerTestContinuous();
+ }
+
+ private void innerTestContinuous() throws Exception {
+ Assume.assumeFalse(isBatch);
+
+ CloseableIterator<RowData> iterator =
+ store.sourceBuilder().withContinuousMode(true).build(env).executeAndCollect();
+ Thread.sleep(ThreadLocalRandom.current().nextInt(1000));
+
+ sinkAndValidate(
+ Arrays.asList(
+ srcRow(RowKind.INSERT, 1, "p1", 1), srcRow(RowKind.INSERT, 2, "p2", 2)),
+ iterator,
+ Row.ofKind(RowKind.INSERT, 1, "p1", 1),
+ Row.ofKind(RowKind.INSERT, 2, "p2", 2));
+
+ sinkAndValidate(
+ Arrays.asList(
+ srcRow(RowKind.DELETE, 1, "p1", 1), srcRow(RowKind.INSERT, 3, "p3", 3)),
+ iterator,
+ Row.ofKind(RowKind.DELETE, 1, "p1", 1),
+ Row.ofKind(RowKind.INSERT, 3, "p3", 3));
+ }
+
+ private void sinkAndValidate(
+ List<RowData> src, CloseableIterator<RowData> iterator, Row... expected)
+ throws Exception {
+ if (isBatch) {
+ throw new UnsupportedOperationException();
+ }
+ DataStreamSource<RowData> source =
+ env.addSource(new FiniteTestSource<>(src, true), InternalTypeInfo.of(TABLE_TYPE));
+ store.sinkBuilder().withInput(source).build();
+ env.execute();
+ assertThat(collectFromUnbounded(iterator, expected.length))
+ .containsExactlyInAnyOrder(expected);
+ }
+
+ private static RowData srcRow(RowKind kind, int v, String p, int k) {
+ return wrap(GenericRowData.ofKind(kind, v, StringData.fromString(p), k));
+ }
+
+ private List<Row> collectFromUnbounded(CloseableIterator<RowData> iterator, int numElements) {
+ if (numElements == 0) {
+ return Collections.emptyList();
+ }
+
+ List<Row> result = new ArrayList<>();
+ while (iterator.hasNext()) {
+ result.add(CONVERTER.toExternal(iterator.next()));
+
+ if (result.size() == numElements) {
+ return result;
+ }
+ }
+
+ throw new IllegalArgumentException(
+ String.format(
+ "The stream ended before reaching the requested %d records. Only %d records were received.",
+ numElements, result.size()));
+ }
+
public static StreamExecutionEnvironment buildStreamEnv() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
@@ -251,7 +325,8 @@ public class FileStoreITCase extends AbstractTestBase {
return isBatch
? env.fromCollection(SOURCE_DATA, InternalTypeInfo.of(TABLE_TYPE))
: env.addSource(
- new FiniteTestSource<>(SOURCE_DATA), InternalTypeInfo.of(TABLE_TYPE));
+ new FiniteTestSource<>(SOURCE_DATA, false),
+ InternalTypeInfo.of(TABLE_TYPE));
}
public static List<Row> executeAndCollect(DataStreamSource<RowData> source) throws Exception {
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FiniteTestSource.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FiniteTestSource.java
index 0e60cea..c37643e 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FiniteTestSource.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FiniteTestSource.java
@@ -38,14 +38,14 @@ import java.util.List;
*
* <p>The reason this class is rewritten is to support {@link CheckpointedFunction}.
*/
-@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
public class FiniteTestSource<T>
implements SourceFunction<T>, CheckpointedFunction, CheckpointListener {
private static final long serialVersionUID = 1L;
- @SuppressWarnings("NonSerializableFieldInSerializableClass")
- private final Iterable<T> elements;
+ private final List<T> elements;
+
+ private final boolean emitOnce;
private volatile boolean running = true;
@@ -55,8 +55,9 @@ public class FiniteTestSource<T>
private volatile int numTimesEmitted;
- public FiniteTestSource(Iterable<T> elements) {
+ public FiniteTestSource(List<T> elements, boolean emitOnce) {
this.elements = elements;
+ this.emitOnce = emitOnce;
}
@Override
@@ -92,11 +93,11 @@ public class FiniteTestSource<T>
public void run(SourceContext<T> ctx) throws Exception {
switch (numTimesEmitted) {
case 0:
- emitElementsAndWaitForCheckpoints(ctx);
- emitElementsAndWaitForCheckpoints(ctx);
+ emitElementsAndWaitForCheckpoints(ctx, false);
+ emitElementsAndWaitForCheckpoints(ctx, true);
break;
case 1:
- emitElementsAndWaitForCheckpoints(ctx);
+ emitElementsAndWaitForCheckpoints(ctx, true);
break;
case 2:
// Maybe missed notifyCheckpointComplete, wait next notifyCheckpointComplete
@@ -111,15 +112,17 @@ public class FiniteTestSource<T>
}
}
- private void emitElementsAndWaitForCheckpoints(SourceContext<T> ctx)
+ private void emitElementsAndWaitForCheckpoints(SourceContext<T> ctx, boolean isSecond)
throws InterruptedException {
final Object lock = ctx.getCheckpointLock();
final int checkpointToAwait;
synchronized (lock) {
checkpointToAwait = numCheckpointsComplete + 2;
- for (T t : elements) {
- ctx.collect(t);
+ if (!isSecond || !emitOnce) {
+ for (T t : elements) {
+ ctx.collect(t);
+ }
}
numTimesEmitted++;
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index eff7b77..1caf5e9 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -146,19 +146,7 @@ public class ReadWriteTableITCase extends TableStoreTestBase {
});
// TODO: streaming with change-tracking
- // exception case
- specs.add(
- new Object[] {
- RuntimeExecutionMode.STREAMING,
- "table_" + UUID.randomUUID(),
- false, // enable change-tracking
- false, // has pk
- null, // with duplicate
- new ExpectedResult()
- .success(false)
- .expectedType(UnsupportedOperationException.class)
- .expectedMessage("File store continuous mode is not supported yet.")
- });
+ // TODO: streaming without log system
// TODO: add overwrite case
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
index 41b0490..aec50ed 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
@@ -97,6 +97,12 @@ public class FileStoreOptions implements Serializable {
.defaultValue(Duration.ofDays(1))
.withDescription("The maximum time of completed snapshots to retain.");
+ public static final ConfigOption<Duration> CONTINUOUS_DISCOVERY_INTERVAL =
+ ConfigOptions.key("continuous.discovery-interval")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(1))
+ .withDescription("The discovery interval of continuous reading.");
+
private final Configuration options;
public static Set<ConfigOption<?>> allOptions() {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
index 06dad34..3718dd2 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
@@ -38,6 +38,8 @@ import static org.apache.flink.util.Preconditions.checkArgument;
/** Scan operation which produces a plan. */
public interface FileStoreScan {
+ boolean snapshotExists(long snapshotId);
+
Snapshot snapshot(long snapshotId);
FileStoreScan withPartitionFilter(Predicate predicate);
@@ -54,6 +56,8 @@ public interface FileStoreScan {
FileStoreScan withManifestList(List<ManifestFileMeta> manifests);
+ FileStoreScan withIncremental(boolean isIncremental);
+
/** Produce a {@link Plan}. */
Plan plan();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
index ab1dd15..bccb0f2 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store.file.operation;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
@@ -37,6 +38,8 @@ import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -62,6 +65,7 @@ public class FileStoreScanImpl implements FileStoreScan {
private Long specifiedSnapshotId = null;
private Integer specifiedBucket = null;
private List<ManifestFileMeta> specifiedManifests = null;
+ private boolean isIncremental = false;
public FileStoreScanImpl(
RowType partitionType,
@@ -75,6 +79,16 @@ public class FileStoreScanImpl implements FileStoreScan {
}
@Override
+ public boolean snapshotExists(long snapshotId) {
+ Path path = pathFactory.toSnapshotPath(snapshotId);
+ try {
+ return path.getFileSystem().exists(path);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ @Override
public Snapshot snapshot(long snapshotId) {
return Snapshot.fromPath(pathFactory.toSnapshotPath(snapshotId));
}
@@ -149,6 +163,12 @@ public class FileStoreScanImpl implements FileStoreScan {
}
@Override
+ public FileStoreScan withIncremental(boolean isIncremental) {
+ this.isIncremental = isIncremental;
+ return this;
+ }
+
+ @Override
public Plan plan() {
List<ManifestFileMeta> manifests = specifiedManifests;
Long snapshotId = specifiedSnapshotId;
@@ -159,7 +179,11 @@ public class FileStoreScanImpl implements FileStoreScan {
if (snapshotId == null) {
manifests = Collections.emptyList();
} else {
- manifests = snapshot(snapshotId).readAllManifests(manifestList);
+ Snapshot snapshot = snapshot(snapshotId);
+ manifests =
+ isIncremental
+ ? manifestList.read(snapshot.deltaManifestList())
+ : snapshot.readAllManifests(manifestList);
}
}