You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/06/17 16:34:52 UTC

[iceberg] branch master updated: Flink: FLIP-27 source enumerator (#4986)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new c9442f1c0f Flink: FLIP-27 source enumerator (#4986)
c9442f1c0f is described below

commit c9442f1c0f0524b7ca5ac3ccbb927cc80b764239
Author: Steven Zhen Wu <st...@gmail.com>
AuthorDate: Fri Jun 17 09:34:47 2022 -0700

    Flink: FLIP-27 source enumerator (#4986)
---
 flink/v1.15/build.gradle                           |   1 +
 .../enumerator/AbstractIcebergEnumerator.java      | 168 +++++++++++++++++++++
 .../enumerator/ContinuousIcebergEnumerator.java    | 127 ++++++++++++++++
 .../IcebergEnumeratorPositionSerializer.java       |  89 +++++++++++
 .../source/enumerator/IcebergEnumeratorState.java  |  54 +++++++
 .../IcebergEnumeratorStateSerializer.java          | 115 ++++++++++++++
 .../source/enumerator/StaticIcebergEnumerator.java |  94 ++++++++++++
 .../TestContinuousIcebergEnumerator.java           | 164 ++++++++++++++++++++
 .../TestIcebergEnumeratorStateSerializer.java      |  89 +++++++++++
 9 files changed, 901 insertions(+)

diff --git a/flink/v1.15/build.gradle b/flink/v1.15/build.gradle
index 46572696f9..5d04cde8b6 100644
--- a/flink/v1.15/build.gradle
+++ b/flink/v1.15/build.gradle
@@ -61,6 +61,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
       exclude group: 'org.apache.hive', module: 'hive-storage-api'
     }
 
+    testImplementation "org.apache.flink:flink-connector-test-utils:${flinkVersion}"
     testImplementation "org.apache.flink:flink-core:${flinkVersion}"
     testImplementation "org.apache.flink:flink-runtime:${flinkVersion}"
     testImplementation ("org.apache.flink:flink-test-utils-junit:${flinkVersion}") {
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
new file mode 100644
index 0000000000..98c5fa3ede
--- /dev/null
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.iceberg.flink.source.assigner.GetSplitResult;
+import org.apache.iceberg.flink.source.assigner.SplitAssigner;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.SplitRequestEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO: publish enumerator monitor metrics like number of pending metrics after FLINK-21000 is resolved
+ */
+abstract class AbstractIcebergEnumerator implements SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> {
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractIcebergEnumerator.class);
+
+  private final SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext;
+  private final SplitAssigner assigner;
+  private final Map<Integer, String> readersAwaitingSplit;
+  private final AtomicReference<CompletableFuture<Void>> availableFuture;
+
+  AbstractIcebergEnumerator(
+      SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext,
+      SplitAssigner assigner) {
+    this.enumeratorContext = enumeratorContext;
+    this.assigner = assigner;
+    this.readersAwaitingSplit = new LinkedHashMap<>();
+    this.availableFuture = new AtomicReference<>();
+  }
+
+  @Override
+  public void start() {
+    assigner.start();
+  }
+
+  @Override
+  public void close() throws IOException {
+    assigner.close();
+  }
+
+  @Override
+  public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
+    // Iceberg source uses custom split request event to piggyback finished split ids.
+    throw new UnsupportedOperationException(String.format("Received invalid default split request event " +
+        "from subtask %d as Iceberg source uses custom split request event", subtaskId));
+  }
+
+  @Override
+  public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+    if (sourceEvent instanceof SplitRequestEvent) {
+      SplitRequestEvent splitRequestEvent =
+          (SplitRequestEvent) sourceEvent;
+      LOG.info("Received request split event from subtask {}", subtaskId);
+      assigner.onCompletedSplits(splitRequestEvent.finishedSplitIds());
+      readersAwaitingSplit.put(subtaskId, splitRequestEvent.requesterHostname());
+      assignSplits();
+    } else {
+      throw new IllegalArgumentException(String.format("Received unknown event from subtask %d: %s",
+          subtaskId, sourceEvent.getClass().getCanonicalName()));
+    }
+  }
+
+  @Override
+  public void addSplitsBack(List<IcebergSourceSplit> splits, int subtaskId) {
+    LOG.info("Add {} splits back to the pool for failed subtask {}",
+        splits.size(), subtaskId);
+    assigner.onUnassignedSplits(splits);
+    assignSplits();
+  }
+
+  @Override
+  public void addReader(int subtaskId) {
+    LOG.info("Added reader: {}", subtaskId);
+  }
+
+  private void assignSplits() {
+    LOG.info("Assigning splits for {} awaiting readers", readersAwaitingSplit.size());
+    Iterator<Map.Entry<Integer, String>> awaitingReader =
+        readersAwaitingSplit.entrySet().iterator();
+    while (awaitingReader.hasNext()) {
+      Map.Entry<Integer, String> nextAwaiting = awaitingReader.next();
+      // if the reader that requested another split has failed in the meantime, remove
+      // it from the list of waiting readers
+      if (!enumeratorContext.registeredReaders().containsKey(nextAwaiting.getKey())) {
+        awaitingReader.remove();
+        continue;
+      }
+
+      int awaitingSubtask = nextAwaiting.getKey();
+      String hostname = nextAwaiting.getValue();
+      GetSplitResult getResult = assigner.getNext(hostname);
+      if (getResult.status() == GetSplitResult.Status.AVAILABLE) {
+        LOG.info("Assign split to subtask {}: {}", awaitingSubtask, getResult.split());
+        enumeratorContext.assignSplit(getResult.split(), awaitingSubtask);
+        awaitingReader.remove();
+      } else if (getResult.status() == GetSplitResult.Status.CONSTRAINED) {
+        getAvailableFutureIfNeeded();
+        break;
+      } else if (getResult.status() == GetSplitResult.Status.UNAVAILABLE) {
+        if (shouldWaitForMoreSplits()) {
+          getAvailableFutureIfNeeded();
+          break;
+        } else {
+          LOG.info("No more splits available for subtask {}", awaitingSubtask);
+          enumeratorContext.signalNoMoreSplits(awaitingSubtask);
+          awaitingReader.remove();
+        }
+      } else {
+        throw new IllegalArgumentException("Unsupported status: " + getResult.status());
+      }
+    }
+  }
+
+  /**
+   * return true if enumerator should wait for splits
+   * like in the continuous enumerator case
+   */
+  protected abstract boolean shouldWaitForMoreSplits();
+
+  private synchronized void getAvailableFutureIfNeeded() {
+    if (availableFuture.get() != null) {
+      return;
+    }
+    CompletableFuture<Void> future = assigner.isAvailable()
+        .thenAccept(ignore ->
+            // Must run assignSplits in coordinator thread
+            // because the future may be completed from other threads.
+            // E.g., in event time alignment assigner,
+            // watermark advancement from another source may
+            // cause the available future to be completed
+            enumeratorContext.runInCoordinatorThread(() -> {
+              LOG.debug("Executing callback of assignSplits");
+              availableFuture.set(null);
+              assignSplits();
+            }));
+    availableFuture.set(future);
+    LOG.debug("Registered callback for future available splits");
+  }
+}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
new file mode 100644
index 0000000000..e2b94b8c3e
--- /dev/null
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.iceberg.flink.source.ScanContext;
+import org.apache.iceberg.flink.source.assigner.SplitAssigner;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public class ContinuousIcebergEnumerator extends AbstractIcebergEnumerator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ContinuousIcebergEnumerator.class);
+
+  private final SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext;
+  private final SplitAssigner assigner;
+  private final ScanContext scanContext;
+  private final ContinuousSplitPlanner splitPlanner;
+
+  /**
+   * snapshotId for the last enumerated snapshot. next incremental enumeration
+   * should be based off this as the starting position.
+   */
+  private final AtomicReference<IcebergEnumeratorPosition> enumeratorPosition;
+
+  public ContinuousIcebergEnumerator(
+      SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext,
+      SplitAssigner assigner,
+      ScanContext scanContext,
+      ContinuousSplitPlanner splitPlanner,
+      @Nullable IcebergEnumeratorState enumState) {
+    super(enumeratorContext, assigner);
+
+    this.enumeratorContext = enumeratorContext;
+    this.assigner = assigner;
+    this.scanContext = scanContext;
+    this.splitPlanner = splitPlanner;
+    this.enumeratorPosition = new AtomicReference<>();
+    if (enumState != null) {
+      this.enumeratorPosition.set(enumState.lastEnumeratedPosition());
+    }
+  }
+
+  @Override
+  public void start() {
+    super.start();
+    enumeratorContext.callAsync(
+        this::discoverSplits,
+        this::processDiscoveredSplits,
+        0L,
+        scanContext.monitorInterval().toMillis());
+  }
+
+  @Override
+  public void close() throws IOException {
+    splitPlanner.close();
+    super.close();
+  }
+
+  @Override
+  protected boolean shouldWaitForMoreSplits() {
+    return true;
+  }
+
+  @Override
+  public IcebergEnumeratorState snapshotState(long checkpointId) {
+    return new IcebergEnumeratorState(enumeratorPosition.get(), assigner.state());
+  }
+
+  /**
+   * This method is executed in an IO thread pool.
+   */
+  private ContinuousEnumerationResult discoverSplits() {
+    return splitPlanner.planSplits(enumeratorPosition.get());
+  }
+
+  /**
+   * This method is executed in a single coordinator thread.
+   */
+  private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwable error) {
+    if (error == null) {
+      if (!Objects.equals(result.fromPosition(), enumeratorPosition.get())) {
+        // Multiple discoverSplits() may be triggered with the same starting snapshot to the I/O thread pool.
+        // E.g., the splitDiscoveryInterval is very short (like 10 ms in some unit tests) or the thread
+        // pool is busy and multiple discovery actions are executed concurrently. Discovery result should
+        // only be accepted if the starting position matches the enumerator position (like compare-and-swap).
+        LOG.info("Skip {} discovered splits because the scan starting position doesn't match " +
+                "the current enumerator position: enumerator position = {}, scan starting position = {}",
+            result.splits().size(), enumeratorPosition.get(), result.fromPosition());
+      } else {
+        assigner.onDiscoveredSplits(result.splits());
+        LOG.info("Added {} splits discovered between ({}, {}] to the assigner",
+            result.splits().size(), result.fromPosition(), result.toPosition());
+        // update the enumerator position even if there is no split discovered
+        // or the toPosition is empty (e.g. for empty table).
+        enumeratorPosition.set(result.toPosition());
+        LOG.info("Update enumerator position to {}", result.toPosition());
+      }
+    } else {
+      LOG.error("Failed to discover new splits", error);
+    }
+  }
+}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorPositionSerializer.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorPositionSerializer.java
new file mode 100644
index 0000000000..83b230e80e
--- /dev/null
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorPositionSerializer.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.io.IOException;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+class IcebergEnumeratorPositionSerializer implements SimpleVersionedSerializer<IcebergEnumeratorPosition> {
+
+  public static final IcebergEnumeratorPositionSerializer INSTANCE = new IcebergEnumeratorPositionSerializer();
+
+  private static final int VERSION = 1;
+
+  private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
+      ThreadLocal.withInitial(() -> new DataOutputSerializer(128));
+
+  @Override
+  public int getVersion() {
+    return VERSION;
+  }
+
+  @Override
+  public byte[] serialize(IcebergEnumeratorPosition position) throws IOException {
+    return serializeV1(position);
+  }
+
+  @Override
+  public IcebergEnumeratorPosition deserialize(int version, byte[] serialized) throws IOException {
+    switch (version) {
+      case 1:
+        return deserializeV1(serialized);
+      default:
+        throw new IOException("Unknown version: " + version);
+    }
+  }
+
+  private byte[] serializeV1(IcebergEnumeratorPosition position) throws IOException {
+    DataOutputSerializer out = SERIALIZER_CACHE.get();
+    out.writeBoolean(position.snapshotId() != null);
+    if (position.snapshotId() != null) {
+      out.writeLong(position.snapshotId());
+    }
+    out.writeBoolean(position.snapshotTimestampMs() != null);
+    if (position.snapshotTimestampMs() != null) {
+      out.writeLong(position.snapshotTimestampMs());
+    }
+    byte[] result = out.getCopyOfBuffer();
+    out.clear();
+    return result;
+  }
+
+  private IcebergEnumeratorPosition deserializeV1(byte[] serialized) throws IOException {
+    DataInputDeserializer in = new DataInputDeserializer(serialized);
+    Long snapshotId = null;
+    if (in.readBoolean()) {
+      snapshotId = in.readLong();
+    }
+
+    Long snapshotTimestampMs = null;
+    if (in.readBoolean()) {
+      snapshotTimestampMs = in.readLong();
+    }
+
+    if (snapshotId != null) {
+      return IcebergEnumeratorPosition.of(snapshotId, snapshotTimestampMs);
+    } else {
+      return IcebergEnumeratorPosition.empty();
+    }
+  }
+}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorState.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorState.java
new file mode 100644
index 0000000000..bd2f44c005
--- /dev/null
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorState.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.io.Serializable;
+import java.util.Collection;
+import javax.annotation.Nullable;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+
+/**
+ * Enumerator state for checkpointing
+ */
+public class IcebergEnumeratorState implements Serializable {
+  @Nullable
+  private final IcebergEnumeratorPosition lastEnumeratedPosition;
+  private final Collection<IcebergSourceSplitState> pendingSplits;
+
+  public IcebergEnumeratorState(Collection<IcebergSourceSplitState> pendingSplits) {
+    this(null, pendingSplits);
+  }
+
+  public IcebergEnumeratorState(
+      @Nullable IcebergEnumeratorPosition lastEnumeratedPosition,
+      Collection<IcebergSourceSplitState> pendingSplits) {
+    this.lastEnumeratedPosition = lastEnumeratedPosition;
+    this.pendingSplits = pendingSplits;
+  }
+
+  @Nullable
+  public IcebergEnumeratorPosition lastEnumeratedPosition() {
+    return lastEnumeratedPosition;
+  }
+
+  public Collection<IcebergSourceSplitState> pendingSplits() {
+    return pendingSplits;
+  }
+}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java
new file mode 100644
index 0000000000..8f020bbe53
--- /dev/null
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.io.IOException;
+import java.util.Collection;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+@Internal
+public class IcebergEnumeratorStateSerializer implements SimpleVersionedSerializer<IcebergEnumeratorState> {
+
+  public static final IcebergEnumeratorStateSerializer INSTANCE = new IcebergEnumeratorStateSerializer();
+
+  private static final int VERSION = 1;
+
+  private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
+      ThreadLocal.withInitial(() -> new DataOutputSerializer(1024));
+
+  private final IcebergEnumeratorPositionSerializer positionSerializer = IcebergEnumeratorPositionSerializer.INSTANCE;
+  private final IcebergSourceSplitSerializer splitSerializer = IcebergSourceSplitSerializer.INSTANCE;
+
+  @Override
+  public int getVersion() {
+    return VERSION;
+  }
+
+  @Override
+  public byte[] serialize(IcebergEnumeratorState enumState) throws IOException {
+    return serializeV1(enumState);
+  }
+
+  @Override
+  public IcebergEnumeratorState deserialize(int version, byte[] serialized) throws IOException {
+    switch (version) {
+      case 1:
+        return deserializeV1(serialized);
+      default:
+        throw new IOException("Unknown version: " + version);
+    }
+  }
+
+  private byte[] serializeV1(IcebergEnumeratorState enumState) throws IOException {
+    DataOutputSerializer out = SERIALIZER_CACHE.get();
+
+    out.writeBoolean(enumState.lastEnumeratedPosition() != null);
+    if (enumState.lastEnumeratedPosition() != null) {
+      out.writeInt(positionSerializer.getVersion());
+      byte[] positionBytes = positionSerializer.serialize(enumState.lastEnumeratedPosition());
+      out.writeInt(positionBytes.length);
+      out.write(positionBytes);
+    }
+
+    out.writeInt(splitSerializer.getVersion());
+    out.writeInt(enumState.pendingSplits().size());
+    for (IcebergSourceSplitState splitState : enumState.pendingSplits()) {
+      byte[] splitBytes = splitSerializer.serialize(splitState.split());
+      out.writeInt(splitBytes.length);
+      out.write(splitBytes);
+      out.writeUTF(splitState.status().name());
+    }
+
+    byte[] result = out.getCopyOfBuffer();
+    out.clear();
+    return result;
+  }
+
+  private IcebergEnumeratorState deserializeV1(byte[] serialized) throws IOException {
+    DataInputDeserializer in = new DataInputDeserializer(serialized);
+
+    IcebergEnumeratorPosition enumeratorPosition = null;
+    if (in.readBoolean()) {
+      int version = in.readInt();
+      byte[] positionBytes = new byte[in.readInt()];
+      in.read(positionBytes);
+      enumeratorPosition = positionSerializer.deserialize(version, positionBytes);
+    }
+
+    int splitSerializerVersion = in.readInt();
+    int splitCount = in.readInt();
+    Collection<IcebergSourceSplitState> pendingSplits = Lists.newArrayListWithCapacity(splitCount);
+    for (int i = 0; i < splitCount; ++i) {
+      byte[] splitBytes = new byte[in.readInt()];
+      in.read(splitBytes);
+      IcebergSourceSplit split = splitSerializer.deserialize(splitSerializerVersion, splitBytes);
+      String statusName = in.readUTF();
+      pendingSplits.add(new IcebergSourceSplitState(split, IcebergSourceSplitStatus.valueOf(statusName)));
+    }
+    return new IcebergEnumeratorState(enumeratorPosition, pendingSplits);
+  }
+}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java
new file mode 100644
index 0000000000..0f287864be
--- /dev/null
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.source.FlinkSplitPlanner;
+import org.apache.iceberg.flink.source.ScanContext;
+import org.apache.iceberg.flink.source.assigner.SplitAssigner;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * One-time split enumeration at the start-up
+ */
+@Internal
+public class StaticIcebergEnumerator extends AbstractIcebergEnumerator {
+  private static final Logger LOG = LoggerFactory.getLogger(StaticIcebergEnumerator.class);
+
+  private final SplitAssigner assigner;
+  private final Table table;
+  private final ScanContext scanContext;
+  private final boolean shouldEnumerate;
+
+  public StaticIcebergEnumerator(
+      SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext,
+      SplitAssigner assigner,
+      Table table,
+      ScanContext scanContext,
+      @Nullable IcebergEnumeratorState enumState) {
+    super(enumeratorContext, assigner);
+    this.assigner = assigner;
+    this.table = table;
+    this.scanContext = scanContext;
+    // split enumeration is not needed during restore scenario
+    this.shouldEnumerate = enumState == null;
+  }
+
+  @Override
+  public void start() {
+    super.start();
+    if (shouldEnumerate) {
+      // Ideally, operatorId should be used as the threadPoolName as Flink guarantees its uniqueness within a job.
+      // SplitEnumeratorContext doesn't expose the OperatorCoordinator.Context, which would contain the OperatorID.
+      // Need to discuss with Flink community whether it is ok to expose a public API like the protected method
+      // "OperatorCoordinator.Context getCoordinatorContext()" from SourceCoordinatorContext implementation.
+      // For now, <table name>-<random UUID> is used as the unique thread pool name.
+      String threadName = "iceberg-plan-worker-pool-" + table.name() + "-" + UUID.randomUUID();
+      ExecutorService workerPool = ThreadPools.newWorkerPool(threadName, scanContext.planParallelism());
+      try {
+        List<IcebergSourceSplit> splits = FlinkSplitPlanner.planIcebergSourceSplits(table, scanContext, workerPool);
+        assigner.onDiscoveredSplits(splits);
+        LOG.info("Discovered {} splits from table {} during job initialization",
+            splits.size(), table.name());
+      } finally {
+        workerPool.shutdown();
+      }
+    }
+  }
+
+  @Override
+  protected boolean shouldWaitForMoreSplits() {
+    return false;
+  }
+
+  @Override
+  public IcebergEnumeratorState snapshotState(long checkpointId) {
+    return new IcebergEnumeratorState(null, assigner.state());
+  }
+}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
new file mode 100644
index 0000000000..5e7f926e3a
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
+import org.apache.iceberg.flink.source.ScanContext;
+import org.apache.iceberg.flink.source.SplitHelpers;
+import org.apache.iceberg.flink.source.StreamingStartingStrategy;
+import org.apache.iceberg.flink.source.assigner.SimpleSplitAssigner;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.flink.source.split.SplitRequestEvent;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestContinuousIcebergEnumerator {
+  @ClassRule
+  public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  @Test
+  public void testDiscoverSplitWhenNoReaderRegistered() throws Exception {
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner();
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext = ScanContext.builder()
+        .streaming(true)
+        .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+        .build();
+    ContinuousIcebergEnumerator enumerator = createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    Collection<IcebergSourceSplitState> pendingSplitsEmpty = enumerator.snapshotState(1).pendingSplits();
+    Assert.assertEquals(0, pendingSplitsEmpty.size());
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits = SplitHelpers
+        .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits, IcebergEnumeratorPosition.of(1L, 1L));
+    enumeratorContext.triggerAllActions();
+
+    Collection<IcebergSourceSplitState> pendingSplits = enumerator.snapshotState(2).pendingSplits();
+    Assert.assertEquals(1, pendingSplits.size());
+    IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+    Assert.assertEquals(splits.get(0).splitId(), pendingSplit.split().splitId());
+    Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, pendingSplit.status());
+  }
+
+  @Test
+  public void testDiscoverWhenReaderRegistered() throws Exception {
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner();
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext = ScanContext.builder()
+        .streaming(true)
+        .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+        .build();
+    ContinuousIcebergEnumerator enumerator = createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // register one reader, and let it request a split
+    enumeratorContext.registerReader(2, "localhost");
+    enumerator.addReader(2);
+    enumerator.handleSourceEvent(2,
+        new SplitRequestEvent());
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits = SplitHelpers
+        .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits, IcebergEnumeratorPosition.of(1L, 1L));
+    enumeratorContext.triggerAllActions();
+
+    Assert.assertTrue(enumerator.snapshotState(1).pendingSplits().isEmpty());
+    MatcherAssert.assertThat(enumeratorContext.getSplitAssignments().get(2).getAssignedSplits(),
+        CoreMatchers.hasItem(splits.get(0)));
+  }
+
+  @Test
+  public void testRequestingReaderUnavailableWhenSplitDiscovered() throws Exception {
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner();
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext config = ScanContext.builder()
+        .streaming(true)
+        .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+        .build();
+    ContinuousIcebergEnumerator enumerator = createEnumerator(enumeratorContext, config, splitPlanner);
+
+    // register one reader, and let it request a split
+    enumeratorContext.registerReader(2, "localhost");
+    enumerator.addReader(2);
+    enumerator.handleSourceEvent(2,
+        new SplitRequestEvent());
+
+    // remove the reader (like in a failure)
+    enumeratorContext.registeredReaders().remove(2);
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits = SplitHelpers
+        .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    Assert.assertEquals(1, splits.size());
+    splitPlanner.addSplits(splits, IcebergEnumeratorPosition.of(1L, 1L));
+    enumeratorContext.triggerAllActions();
+
+    Assert.assertFalse(enumeratorContext.getSplitAssignments().containsKey(2));
+    List<String> pendingSplitIds = enumerator.snapshotState(1).pendingSplits().stream()
+        .map(IcebergSourceSplitState::split)
+        .map(IcebergSourceSplit::splitId)
+        .collect(Collectors.toList());
+    Assert.assertEquals(splits.size(), pendingSplitIds.size());
+    Assert.assertEquals(splits.get(0).splitId(), pendingSplitIds.get(0));
+
+    // register the reader again, and let it request a split
+    enumeratorContext.registerReader(2, "localhost");
+    enumerator.addReader(2);
+    enumerator.handleSourceEvent(2,
+        new SplitRequestEvent());
+
+    Assert.assertTrue(enumerator.snapshotState(2).pendingSplits().isEmpty());
+    MatcherAssert.assertThat(enumeratorContext.getSplitAssignments().get(2).getAssignedSplits(),
+        CoreMatchers.hasItem(splits.get(0)));
+  }
+
+  private static ContinuousIcebergEnumerator createEnumerator(
+      SplitEnumeratorContext<IcebergSourceSplit> context,
+      ScanContext scanContext,
+      ContinuousSplitPlanner splitPlanner) {
+
+    ContinuousIcebergEnumerator enumerator =
+        new ContinuousIcebergEnumerator(
+            context,
+            new SimpleSplitAssigner(Collections.emptyList()),
+            scanContext,
+            splitPlanner,
+            null);
+    enumerator.start();
+    return enumerator;
+  }
+
+}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java
new file mode 100644
index 0000000000..05abe7bc17
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.iceberg.flink.source.SplitHelpers;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergEnumeratorStateSerializer {
+  @ClassRule
+  public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  private final IcebergEnumeratorStateSerializer serializer = IcebergEnumeratorStateSerializer.INSTANCE;
+
+  @Test
+  public void testEmptySnapshotIdAndPendingSplits() throws Exception {
+    IcebergEnumeratorState enumeratorState = new IcebergEnumeratorState(Collections.emptyList());
+    byte[] result = serializer.serialize(enumeratorState);
+    IcebergEnumeratorState deserialized = serializer.deserialize(serializer.getVersion(), result);
+    assertEnumeratorStateEquals(enumeratorState, deserialized);
+  }
+
+  @Test
+  public void testSomeSnapshotIdAndEmptyPendingSplits() throws Exception {
+    IcebergEnumeratorPosition position = IcebergEnumeratorPosition.of(1L, System.currentTimeMillis());
+    IcebergEnumeratorState enumeratorState = new IcebergEnumeratorState(position, Collections.emptyList());
+    byte[] result = serializer.serialize(enumeratorState);
+    IcebergEnumeratorState deserialized = serializer.deserialize(serializer.getVersion(), result);
+    assertEnumeratorStateEquals(enumeratorState, deserialized);
+  }
+
+  @Test
+  public void testSomeSnapshotIdAndPendingSplits() throws Exception {
+    IcebergEnumeratorPosition position = IcebergEnumeratorPosition.of(2L, System.currentTimeMillis());
+    List<IcebergSourceSplit> splits = SplitHelpers
+        .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 3, 1);
+    Collection<IcebergSourceSplitState> pendingSplits = Lists.newArrayList();
+    pendingSplits.add(new IcebergSourceSplitState(splits.get(0), IcebergSourceSplitStatus.UNASSIGNED));
+    pendingSplits.add(new IcebergSourceSplitState(splits.get(1), IcebergSourceSplitStatus.ASSIGNED));
+    pendingSplits.add(new IcebergSourceSplitState(splits.get(2), IcebergSourceSplitStatus.COMPLETED));
+
+    IcebergEnumeratorState enumeratorState = new IcebergEnumeratorState(position, pendingSplits);
+    byte[] result = serializer.serialize(enumeratorState);
+    IcebergEnumeratorState deserialized = serializer.deserialize(serializer.getVersion(), result);
+    assertEnumeratorStateEquals(enumeratorState, deserialized);
+  }
+
+  private void assertEnumeratorStateEquals(IcebergEnumeratorState expected, IcebergEnumeratorState actual) {
+    Assert.assertEquals(expected.lastEnumeratedPosition(), actual.lastEnumeratedPosition());
+    Assert.assertEquals(expected.pendingSplits().size(), actual.pendingSplits().size());
+    Iterator<IcebergSourceSplitState> expectedIterator = expected.pendingSplits().iterator();
+    Iterator<IcebergSourceSplitState> actualIterator = actual.pendingSplits().iterator();
+    for (int i = 0; i < expected.pendingSplits().size(); ++i) {
+      IcebergSourceSplitState expectedSplitState = expectedIterator.next();
+      IcebergSourceSplitState actualSplitState = actualIterator.next();
+      Assert.assertEquals(expectedSplitState.split().splitId(), actualSplitState.split().splitId());
+      Assert.assertEquals(expectedSplitState.split().fileOffset(), actualSplitState.split().fileOffset());
+      Assert.assertEquals(expectedSplitState.split().recordOffset(), actualSplitState.split().recordOffset());
+      Assert.assertEquals(expectedSplitState.status(), actualSplitState.status());
+    }
+  }
+}