You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/06/17 16:34:52 UTC
[iceberg] branch master updated: Flink: FLIP-27 source enumerator (#4986)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new c9442f1c0f Flink: FLIP-27 source enumerator (#4986)
c9442f1c0f is described below
commit c9442f1c0f0524b7ca5ac3ccbb927cc80b764239
Author: Steven Zhen Wu <st...@gmail.com>
AuthorDate: Fri Jun 17 09:34:47 2022 -0700
Flink: FLIP-27 source enumerator (#4986)
---
flink/v1.15/build.gradle | 1 +
.../enumerator/AbstractIcebergEnumerator.java | 168 +++++++++++++++++++++
.../enumerator/ContinuousIcebergEnumerator.java | 127 ++++++++++++++++
.../IcebergEnumeratorPositionSerializer.java | 89 +++++++++++
.../source/enumerator/IcebergEnumeratorState.java | 54 +++++++
.../IcebergEnumeratorStateSerializer.java | 115 ++++++++++++++
.../source/enumerator/StaticIcebergEnumerator.java | 94 ++++++++++++
.../TestContinuousIcebergEnumerator.java | 164 ++++++++++++++++++++
.../TestIcebergEnumeratorStateSerializer.java | 89 +++++++++++
9 files changed, 901 insertions(+)
diff --git a/flink/v1.15/build.gradle b/flink/v1.15/build.gradle
index 46572696f9..5d04cde8b6 100644
--- a/flink/v1.15/build.gradle
+++ b/flink/v1.15/build.gradle
@@ -61,6 +61,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
exclude group: 'org.apache.hive', module: 'hive-storage-api'
}
+ testImplementation "org.apache.flink:flink-connector-test-utils:${flinkVersion}"
testImplementation "org.apache.flink:flink-core:${flinkVersion}"
testImplementation "org.apache.flink:flink-runtime:${flinkVersion}"
testImplementation ("org.apache.flink:flink-test-utils-junit:${flinkVersion}") {
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
new file mode 100644
index 0000000000..98c5fa3ede
--- /dev/null
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.iceberg.flink.source.assigner.GetSplitResult;
+import org.apache.iceberg.flink.source.assigner.SplitAssigner;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.SplitRequestEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO: publish enumerator monitor metrics like number of pending metrics after FLINK-21000 is resolved
+ */
+abstract class AbstractIcebergEnumerator implements SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractIcebergEnumerator.class);
+
+ private final SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext;
+ private final SplitAssigner assigner;
+ private final Map<Integer, String> readersAwaitingSplit;
+ private final AtomicReference<CompletableFuture<Void>> availableFuture;
+
+ AbstractIcebergEnumerator(
+ SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext,
+ SplitAssigner assigner) {
+ this.enumeratorContext = enumeratorContext;
+ this.assigner = assigner;
+ this.readersAwaitingSplit = new LinkedHashMap<>();
+ this.availableFuture = new AtomicReference<>();
+ }
+
+ @Override
+ public void start() {
+ assigner.start();
+ }
+
+ @Override
+ public void close() throws IOException {
+ assigner.close();
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
+ // Iceberg source uses custom split request event to piggyback finished split ids.
+ throw new UnsupportedOperationException(String.format("Received invalid default split request event " +
+ "from subtask %d as Iceberg source uses custom split request event", subtaskId));
+ }
+
+ @Override
+ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+ if (sourceEvent instanceof SplitRequestEvent) {
+ SplitRequestEvent splitRequestEvent =
+ (SplitRequestEvent) sourceEvent;
+ LOG.info("Received request split event from subtask {}", subtaskId);
+ assigner.onCompletedSplits(splitRequestEvent.finishedSplitIds());
+ readersAwaitingSplit.put(subtaskId, splitRequestEvent.requesterHostname());
+ assignSplits();
+ } else {
+ throw new IllegalArgumentException(String.format("Received unknown event from subtask %d: %s",
+ subtaskId, sourceEvent.getClass().getCanonicalName()));
+ }
+ }
+
+ @Override
+ public void addSplitsBack(List<IcebergSourceSplit> splits, int subtaskId) {
+ LOG.info("Add {} splits back to the pool for failed subtask {}",
+ splits.size(), subtaskId);
+ assigner.onUnassignedSplits(splits);
+ assignSplits();
+ }
+
+ @Override
+ public void addReader(int subtaskId) {
+ LOG.info("Added reader: {}", subtaskId);
+ }
+
+ private void assignSplits() {
+ LOG.info("Assigning splits for {} awaiting readers", readersAwaitingSplit.size());
+ Iterator<Map.Entry<Integer, String>> awaitingReader =
+ readersAwaitingSplit.entrySet().iterator();
+ while (awaitingReader.hasNext()) {
+ Map.Entry<Integer, String> nextAwaiting = awaitingReader.next();
+ // if the reader that requested another split has failed in the meantime, remove
+ // it from the list of waiting readers
+ if (!enumeratorContext.registeredReaders().containsKey(nextAwaiting.getKey())) {
+ awaitingReader.remove();
+ continue;
+ }
+
+ int awaitingSubtask = nextAwaiting.getKey();
+ String hostname = nextAwaiting.getValue();
+ GetSplitResult getResult = assigner.getNext(hostname);
+ if (getResult.status() == GetSplitResult.Status.AVAILABLE) {
+ LOG.info("Assign split to subtask {}: {}", awaitingSubtask, getResult.split());
+ enumeratorContext.assignSplit(getResult.split(), awaitingSubtask);
+ awaitingReader.remove();
+ } else if (getResult.status() == GetSplitResult.Status.CONSTRAINED) {
+ getAvailableFutureIfNeeded();
+ break;
+ } else if (getResult.status() == GetSplitResult.Status.UNAVAILABLE) {
+ if (shouldWaitForMoreSplits()) {
+ getAvailableFutureIfNeeded();
+ break;
+ } else {
+ LOG.info("No more splits available for subtask {}", awaitingSubtask);
+ enumeratorContext.signalNoMoreSplits(awaitingSubtask);
+ awaitingReader.remove();
+ }
+ } else {
+ throw new IllegalArgumentException("Unsupported status: " + getResult.status());
+ }
+ }
+ }
+
+ /**
+ * return true if enumerator should wait for splits
+ * like in the continuous enumerator case
+ */
+ protected abstract boolean shouldWaitForMoreSplits();
+
+ private synchronized void getAvailableFutureIfNeeded() {
+ if (availableFuture.get() != null) {
+ return;
+ }
+ CompletableFuture<Void> future = assigner.isAvailable()
+ .thenAccept(ignore ->
+ // Must run assignSplits in coordinator thread
+ // because the future may be completed from other threads.
+ // E.g., in event time alignment assigner,
+ // watermark advancement from another source may
+ // cause the available future to be completed
+ enumeratorContext.runInCoordinatorThread(() -> {
+ LOG.debug("Executing callback of assignSplits");
+ availableFuture.set(null);
+ assignSplits();
+ }));
+ availableFuture.set(future);
+ LOG.debug("Registered callback for future available splits");
+ }
+}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
new file mode 100644
index 0000000000..e2b94b8c3e
--- /dev/null
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.iceberg.flink.source.ScanContext;
+import org.apache.iceberg.flink.source.assigner.SplitAssigner;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public class ContinuousIcebergEnumerator extends AbstractIcebergEnumerator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ContinuousIcebergEnumerator.class);
+
+ private final SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext;
+ private final SplitAssigner assigner;
+ private final ScanContext scanContext;
+ private final ContinuousSplitPlanner splitPlanner;
+
+ /**
+ * snapshotId for the last enumerated snapshot. next incremental enumeration
+ * should be based off this as the starting position.
+ */
+ private final AtomicReference<IcebergEnumeratorPosition> enumeratorPosition;
+
+ public ContinuousIcebergEnumerator(
+ SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext,
+ SplitAssigner assigner,
+ ScanContext scanContext,
+ ContinuousSplitPlanner splitPlanner,
+ @Nullable IcebergEnumeratorState enumState) {
+ super(enumeratorContext, assigner);
+
+ this.enumeratorContext = enumeratorContext;
+ this.assigner = assigner;
+ this.scanContext = scanContext;
+ this.splitPlanner = splitPlanner;
+ this.enumeratorPosition = new AtomicReference<>();
+ if (enumState != null) {
+ this.enumeratorPosition.set(enumState.lastEnumeratedPosition());
+ }
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ enumeratorContext.callAsync(
+ this::discoverSplits,
+ this::processDiscoveredSplits,
+ 0L,
+ scanContext.monitorInterval().toMillis());
+ }
+
+ @Override
+ public void close() throws IOException {
+ splitPlanner.close();
+ super.close();
+ }
+
+ @Override
+ protected boolean shouldWaitForMoreSplits() {
+ return true;
+ }
+
+ @Override
+ public IcebergEnumeratorState snapshotState(long checkpointId) {
+ return new IcebergEnumeratorState(enumeratorPosition.get(), assigner.state());
+ }
+
+ /**
+ * This method is executed in an IO thread pool.
+ */
+ private ContinuousEnumerationResult discoverSplits() {
+ return splitPlanner.planSplits(enumeratorPosition.get());
+ }
+
+ /**
+ * This method is executed in a single coordinator thread.
+ */
+ private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwable error) {
+ if (error == null) {
+ if (!Objects.equals(result.fromPosition(), enumeratorPosition.get())) {
+ // Multiple discoverSplits() may be triggered with the same starting snapshot to the I/O thread pool.
+ // E.g., the splitDiscoveryInterval is very short (like 10 ms in some unit tests) or the thread
+ // pool is busy and multiple discovery actions are executed concurrently. Discovery result should
+ // only be accepted if the starting position matches the enumerator position (like compare-and-swap).
+ LOG.info("Skip {} discovered splits because the scan starting position doesn't match " +
+ "the current enumerator position: enumerator position = {}, scan starting position = {}",
+ result.splits().size(), enumeratorPosition.get(), result.fromPosition());
+ } else {
+ assigner.onDiscoveredSplits(result.splits());
+ LOG.info("Added {} splits discovered between ({}, {}] to the assigner",
+ result.splits().size(), result.fromPosition(), result.toPosition());
+ // update the enumerator position even if there is no split discovered
+ // or the toPosition is empty (e.g. for empty table).
+ enumeratorPosition.set(result.toPosition());
+ LOG.info("Update enumerator position to {}", result.toPosition());
+ }
+ } else {
+ LOG.error("Failed to discover new splits", error);
+ }
+ }
+}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorPositionSerializer.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorPositionSerializer.java
new file mode 100644
index 0000000000..83b230e80e
--- /dev/null
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorPositionSerializer.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.io.IOException;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+class IcebergEnumeratorPositionSerializer implements SimpleVersionedSerializer<IcebergEnumeratorPosition> {
+
+ public static final IcebergEnumeratorPositionSerializer INSTANCE = new IcebergEnumeratorPositionSerializer();
+
+ private static final int VERSION = 1;
+
+ private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
+ ThreadLocal.withInitial(() -> new DataOutputSerializer(128));
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ @Override
+ public byte[] serialize(IcebergEnumeratorPosition position) throws IOException {
+ return serializeV1(position);
+ }
+
+ @Override
+ public IcebergEnumeratorPosition deserialize(int version, byte[] serialized) throws IOException {
+ switch (version) {
+ case 1:
+ return deserializeV1(serialized);
+ default:
+ throw new IOException("Unknown version: " + version);
+ }
+ }
+
+ private byte[] serializeV1(IcebergEnumeratorPosition position) throws IOException {
+ DataOutputSerializer out = SERIALIZER_CACHE.get();
+ out.writeBoolean(position.snapshotId() != null);
+ if (position.snapshotId() != null) {
+ out.writeLong(position.snapshotId());
+ }
+ out.writeBoolean(position.snapshotTimestampMs() != null);
+ if (position.snapshotTimestampMs() != null) {
+ out.writeLong(position.snapshotTimestampMs());
+ }
+ byte[] result = out.getCopyOfBuffer();
+ out.clear();
+ return result;
+ }
+
+ private IcebergEnumeratorPosition deserializeV1(byte[] serialized) throws IOException {
+ DataInputDeserializer in = new DataInputDeserializer(serialized);
+ Long snapshotId = null;
+ if (in.readBoolean()) {
+ snapshotId = in.readLong();
+ }
+
+ Long snapshotTimestampMs = null;
+ if (in.readBoolean()) {
+ snapshotTimestampMs = in.readLong();
+ }
+
+ if (snapshotId != null) {
+ return IcebergEnumeratorPosition.of(snapshotId, snapshotTimestampMs);
+ } else {
+ return IcebergEnumeratorPosition.empty();
+ }
+ }
+}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorState.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorState.java
new file mode 100644
index 0000000000..bd2f44c005
--- /dev/null
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorState.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.io.Serializable;
+import java.util.Collection;
+import javax.annotation.Nullable;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+
+/**
+ * Enumerator state for checkpointing
+ */
+public class IcebergEnumeratorState implements Serializable {
+ @Nullable
+ private final IcebergEnumeratorPosition lastEnumeratedPosition;
+ private final Collection<IcebergSourceSplitState> pendingSplits;
+
+ public IcebergEnumeratorState(Collection<IcebergSourceSplitState> pendingSplits) {
+ this(null, pendingSplits);
+ }
+
+ public IcebergEnumeratorState(
+ @Nullable IcebergEnumeratorPosition lastEnumeratedPosition,
+ Collection<IcebergSourceSplitState> pendingSplits) {
+ this.lastEnumeratedPosition = lastEnumeratedPosition;
+ this.pendingSplits = pendingSplits;
+ }
+
+ @Nullable
+ public IcebergEnumeratorPosition lastEnumeratedPosition() {
+ return lastEnumeratedPosition;
+ }
+
+ public Collection<IcebergSourceSplitState> pendingSplits() {
+ return pendingSplits;
+ }
+}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java
new file mode 100644
index 0000000000..8f020bbe53
--- /dev/null
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.io.IOException;
+import java.util.Collection;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+@Internal
+public class IcebergEnumeratorStateSerializer implements SimpleVersionedSerializer<IcebergEnumeratorState> {
+
+ public static final IcebergEnumeratorStateSerializer INSTANCE = new IcebergEnumeratorStateSerializer();
+
+ private static final int VERSION = 1;
+
+ private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
+ ThreadLocal.withInitial(() -> new DataOutputSerializer(1024));
+
+ private final IcebergEnumeratorPositionSerializer positionSerializer = IcebergEnumeratorPositionSerializer.INSTANCE;
+ private final IcebergSourceSplitSerializer splitSerializer = IcebergSourceSplitSerializer.INSTANCE;
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ @Override
+ public byte[] serialize(IcebergEnumeratorState enumState) throws IOException {
+ return serializeV1(enumState);
+ }
+
+ @Override
+ public IcebergEnumeratorState deserialize(int version, byte[] serialized) throws IOException {
+ switch (version) {
+ case 1:
+ return deserializeV1(serialized);
+ default:
+ throw new IOException("Unknown version: " + version);
+ }
+ }
+
+ private byte[] serializeV1(IcebergEnumeratorState enumState) throws IOException {
+ DataOutputSerializer out = SERIALIZER_CACHE.get();
+
+ out.writeBoolean(enumState.lastEnumeratedPosition() != null);
+ if (enumState.lastEnumeratedPosition() != null) {
+ out.writeInt(positionSerializer.getVersion());
+ byte[] positionBytes = positionSerializer.serialize(enumState.lastEnumeratedPosition());
+ out.writeInt(positionBytes.length);
+ out.write(positionBytes);
+ }
+
+ out.writeInt(splitSerializer.getVersion());
+ out.writeInt(enumState.pendingSplits().size());
+ for (IcebergSourceSplitState splitState : enumState.pendingSplits()) {
+ byte[] splitBytes = splitSerializer.serialize(splitState.split());
+ out.writeInt(splitBytes.length);
+ out.write(splitBytes);
+ out.writeUTF(splitState.status().name());
+ }
+
+ byte[] result = out.getCopyOfBuffer();
+ out.clear();
+ return result;
+ }
+
+ private IcebergEnumeratorState deserializeV1(byte[] serialized) throws IOException {
+ DataInputDeserializer in = new DataInputDeserializer(serialized);
+
+ IcebergEnumeratorPosition enumeratorPosition = null;
+ if (in.readBoolean()) {
+ int version = in.readInt();
+ byte[] positionBytes = new byte[in.readInt()];
+ in.read(positionBytes);
+ enumeratorPosition = positionSerializer.deserialize(version, positionBytes);
+ }
+
+ int splitSerializerVersion = in.readInt();
+ int splitCount = in.readInt();
+ Collection<IcebergSourceSplitState> pendingSplits = Lists.newArrayListWithCapacity(splitCount);
+ for (int i = 0; i < splitCount; ++i) {
+ byte[] splitBytes = new byte[in.readInt()];
+ in.read(splitBytes);
+ IcebergSourceSplit split = splitSerializer.deserialize(splitSerializerVersion, splitBytes);
+ String statusName = in.readUTF();
+ pendingSplits.add(new IcebergSourceSplitState(split, IcebergSourceSplitStatus.valueOf(statusName)));
+ }
+ return new IcebergEnumeratorState(enumeratorPosition, pendingSplits);
+ }
+}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java
new file mode 100644
index 0000000000..0f287864be
--- /dev/null
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.source.FlinkSplitPlanner;
+import org.apache.iceberg.flink.source.ScanContext;
+import org.apache.iceberg.flink.source.assigner.SplitAssigner;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * One-time split enumeration at the start-up
+ */
+@Internal
+public class StaticIcebergEnumerator extends AbstractIcebergEnumerator {
+ private static final Logger LOG = LoggerFactory.getLogger(StaticIcebergEnumerator.class);
+
+ private final SplitAssigner assigner;
+ private final Table table;
+ private final ScanContext scanContext;
+ private final boolean shouldEnumerate;
+
+ public StaticIcebergEnumerator(
+ SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext,
+ SplitAssigner assigner,
+ Table table,
+ ScanContext scanContext,
+ @Nullable IcebergEnumeratorState enumState) {
+ super(enumeratorContext, assigner);
+ this.assigner = assigner;
+ this.table = table;
+ this.scanContext = scanContext;
+ // split enumeration is not needed during restore scenario
+ this.shouldEnumerate = enumState == null;
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ if (shouldEnumerate) {
+ // Ideally, operatorId should be used as the threadPoolName as Flink guarantees its uniqueness within a job.
+ // SplitEnumeratorContext doesn't expose the OperatorCoordinator.Context, which would contain the OperatorID.
+ // Need to discuss with Flink community whether it is ok to expose a public API like the protected method
+ // "OperatorCoordinator.Context getCoordinatorContext()" from SourceCoordinatorContext implementation.
+ // For now, <table name>-<random UUID> is used as the unique thread pool name.
+ String threadName = "iceberg-plan-worker-pool-" + table.name() + "-" + UUID.randomUUID();
+ ExecutorService workerPool = ThreadPools.newWorkerPool(threadName, scanContext.planParallelism());
+ try {
+ List<IcebergSourceSplit> splits = FlinkSplitPlanner.planIcebergSourceSplits(table, scanContext, workerPool);
+ assigner.onDiscoveredSplits(splits);
+ LOG.info("Discovered {} splits from table {} during job initialization",
+ splits.size(), table.name());
+ } finally {
+ workerPool.shutdown();
+ }
+ }
+ }
+
+ @Override
+ protected boolean shouldWaitForMoreSplits() {
+ return false;
+ }
+
+ @Override
+ public IcebergEnumeratorState snapshotState(long checkpointId) {
+ return new IcebergEnumeratorState(null, assigner.state());
+ }
+}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
new file mode 100644
index 0000000000..5e7f926e3a
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
+import org.apache.iceberg.flink.source.ScanContext;
+import org.apache.iceberg.flink.source.SplitHelpers;
+import org.apache.iceberg.flink.source.StreamingStartingStrategy;
+import org.apache.iceberg.flink.source.assigner.SimpleSplitAssigner;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.flink.source.split.SplitRequestEvent;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestContinuousIcebergEnumerator {
+ @ClassRule
+ public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+ @Test
+ public void testDiscoverSplitWhenNoReaderRegistered() throws Exception {
+ ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner();
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext = ScanContext.builder()
+ .streaming(true)
+ .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+ .build();
+ ContinuousIcebergEnumerator enumerator = createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ Collection<IcebergSourceSplitState> pendingSplitsEmpty = enumerator.snapshotState(1).pendingSplits();
+ Assert.assertEquals(0, pendingSplitsEmpty.size());
+
+ // make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits = SplitHelpers
+ .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+ splitPlanner.addSplits(splits, IcebergEnumeratorPosition.of(1L, 1L));
+ enumeratorContext.triggerAllActions();
+
+ Collection<IcebergSourceSplitState> pendingSplits = enumerator.snapshotState(2).pendingSplits();
+ Assert.assertEquals(1, pendingSplits.size());
+ IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+ Assert.assertEquals(splits.get(0).splitId(), pendingSplit.split().splitId());
+ Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, pendingSplit.status());
+ }
+
+ @Test
+ public void testDiscoverWhenReaderRegistered() throws Exception {
+ ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner();
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext scanContext = ScanContext.builder()
+ .streaming(true)
+ .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+ .build();
+ ContinuousIcebergEnumerator enumerator = createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+ // register one reader, and let it request a split
+ enumeratorContext.registerReader(2, "localhost");
+ enumerator.addReader(2);
+ enumerator.handleSourceEvent(2,
+ new SplitRequestEvent());
+
+ // make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits = SplitHelpers
+ .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+ splitPlanner.addSplits(splits, IcebergEnumeratorPosition.of(1L, 1L));
+ enumeratorContext.triggerAllActions();
+
+ Assert.assertTrue(enumerator.snapshotState(1).pendingSplits().isEmpty());
+ MatcherAssert.assertThat(enumeratorContext.getSplitAssignments().get(2).getAssignedSplits(),
+ CoreMatchers.hasItem(splits.get(0)));
+ }
+
+ @Test
+ public void testRequestingReaderUnavailableWhenSplitDiscovered() throws Exception {
+ ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner();
+ TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+ new TestingSplitEnumeratorContext<>(4);
+ ScanContext config = ScanContext.builder()
+ .streaming(true)
+ .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+ .build();
+ ContinuousIcebergEnumerator enumerator = createEnumerator(enumeratorContext, config, splitPlanner);
+
+ // register one reader, and let it request a split
+ enumeratorContext.registerReader(2, "localhost");
+ enumerator.addReader(2);
+ enumerator.handleSourceEvent(2,
+ new SplitRequestEvent());
+
+ // remove the reader (like in a failure)
+ enumeratorContext.registeredReaders().remove(2);
+
+ // make one split available and trigger the periodic discovery
+ List<IcebergSourceSplit> splits = SplitHelpers
+ .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+ Assert.assertEquals(1, splits.size());
+ splitPlanner.addSplits(splits, IcebergEnumeratorPosition.of(1L, 1L));
+ enumeratorContext.triggerAllActions();
+
+ Assert.assertFalse(enumeratorContext.getSplitAssignments().containsKey(2));
+ List<String> pendingSplitIds = enumerator.snapshotState(1).pendingSplits().stream()
+ .map(IcebergSourceSplitState::split)
+ .map(IcebergSourceSplit::splitId)
+ .collect(Collectors.toList());
+ Assert.assertEquals(splits.size(), pendingSplitIds.size());
+ Assert.assertEquals(splits.get(0).splitId(), pendingSplitIds.get(0));
+
+ // register the reader again, and let it request a split
+ enumeratorContext.registerReader(2, "localhost");
+ enumerator.addReader(2);
+ enumerator.handleSourceEvent(2,
+ new SplitRequestEvent());
+
+ Assert.assertTrue(enumerator.snapshotState(2).pendingSplits().isEmpty());
+ MatcherAssert.assertThat(enumeratorContext.getSplitAssignments().get(2).getAssignedSplits(),
+ CoreMatchers.hasItem(splits.get(0)));
+ }
+
+ private static ContinuousIcebergEnumerator createEnumerator(
+ SplitEnumeratorContext<IcebergSourceSplit> context,
+ ScanContext scanContext,
+ ContinuousSplitPlanner splitPlanner) {
+
+ ContinuousIcebergEnumerator enumerator =
+ new ContinuousIcebergEnumerator(
+ context,
+ new SimpleSplitAssigner(Collections.emptyList()),
+ scanContext,
+ splitPlanner,
+ null);
+ enumerator.start();
+ return enumerator;
+ }
+
+}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java
new file mode 100644
index 0000000000..05abe7bc17
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.iceberg.flink.source.SplitHelpers;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergEnumeratorStateSerializer {
+ @ClassRule
+ public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+ private final IcebergEnumeratorStateSerializer serializer = IcebergEnumeratorStateSerializer.INSTANCE;
+
+ @Test
+ public void testEmptySnapshotIdAndPendingSplits() throws Exception {
+ IcebergEnumeratorState enumeratorState = new IcebergEnumeratorState(Collections.emptyList());
+ byte[] result = serializer.serialize(enumeratorState);
+ IcebergEnumeratorState deserialized = serializer.deserialize(serializer.getVersion(), result);
+ assertEnumeratorStateEquals(enumeratorState, deserialized);
+ }
+
+ @Test
+ public void testSomeSnapshotIdAndEmptyPendingSplits() throws Exception {
+ IcebergEnumeratorPosition position = IcebergEnumeratorPosition.of(1L, System.currentTimeMillis());
+ IcebergEnumeratorState enumeratorState = new IcebergEnumeratorState(position, Collections.emptyList());
+ byte[] result = serializer.serialize(enumeratorState);
+ IcebergEnumeratorState deserialized = serializer.deserialize(serializer.getVersion(), result);
+ assertEnumeratorStateEquals(enumeratorState, deserialized);
+ }
+
+ @Test
+ public void testSomeSnapshotIdAndPendingSplits() throws Exception {
+ IcebergEnumeratorPosition position = IcebergEnumeratorPosition.of(2L, System.currentTimeMillis());
+ List<IcebergSourceSplit> splits = SplitHelpers
+ .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 3, 1);
+ Collection<IcebergSourceSplitState> pendingSplits = Lists.newArrayList();
+ pendingSplits.add(new IcebergSourceSplitState(splits.get(0), IcebergSourceSplitStatus.UNASSIGNED));
+ pendingSplits.add(new IcebergSourceSplitState(splits.get(1), IcebergSourceSplitStatus.ASSIGNED));
+ pendingSplits.add(new IcebergSourceSplitState(splits.get(2), IcebergSourceSplitStatus.COMPLETED));
+
+ IcebergEnumeratorState enumeratorState = new IcebergEnumeratorState(position, pendingSplits);
+ byte[] result = serializer.serialize(enumeratorState);
+ IcebergEnumeratorState deserialized = serializer.deserialize(serializer.getVersion(), result);
+ assertEnumeratorStateEquals(enumeratorState, deserialized);
+ }
+
+ private void assertEnumeratorStateEquals(IcebergEnumeratorState expected, IcebergEnumeratorState actual) {
+ Assert.assertEquals(expected.lastEnumeratedPosition(), actual.lastEnumeratedPosition());
+ Assert.assertEquals(expected.pendingSplits().size(), actual.pendingSplits().size());
+ Iterator<IcebergSourceSplitState> expectedIterator = expected.pendingSplits().iterator();
+ Iterator<IcebergSourceSplitState> actualIterator = actual.pendingSplits().iterator();
+ for (int i = 0; i < expected.pendingSplits().size(); ++i) {
+ IcebergSourceSplitState expectedSplitState = expectedIterator.next();
+ IcebergSourceSplitState actualSplitState = actualIterator.next();
+ Assert.assertEquals(expectedSplitState.split().splitId(), actualSplitState.split().splitId());
+ Assert.assertEquals(expectedSplitState.split().fileOffset(), actualSplitState.split().fileOffset());
+ Assert.assertEquals(expectedSplitState.split().recordOffset(), actualSplitState.split().recordOffset());
+ Assert.assertEquals(expectedSplitState.status(), actualSplitState.status());
+ }
+ }
+}