You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/06/20 15:55:57 UTC

[iceberg] branch master updated: Flink: Port #4986, FLIP-27 source enumerator to 1.14 module (#5078)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new a64417c5ce Flink: Port #4986, FLIP-27 source enumerator to 1.14 module (#5078)
a64417c5ce is described below

commit a64417c5ce6ae9aac2cf6e5f5675d224a9846afd
Author: Steven Zhen Wu <st...@gmail.com>
AuthorDate: Mon Jun 20 08:55:51 2022 -0700

    Flink: Port #4986, FLIP-27 source enumerator to 1.14 module (#5078)
---
 flink/v1.14/build.gradle                           |   1 +
 .../enumerator/AbstractIcebergEnumerator.java      |   1 +
 .../enumerator/ContinuousIcebergEnumerator.java    | 127 ++++++++++++++++
 .../IcebergEnumeratorPositionSerializer.java       |  89 +++++++++++
 .../source/enumerator/IcebergEnumeratorState.java  |  54 +++++++
 .../IcebergEnumeratorStateSerializer.java          | 115 +++++++++++++++
 .../source/enumerator/StaticIcebergEnumerator.java |   2 +-
 .../TestContinuousIcebergEnumerator.java           | 164 +++++++++++++++++++++
 .../TestIcebergEnumeratorStateSerializer.java      |  89 +++++++++++
 .../enumerator/AbstractIcebergEnumerator.java      |   1 +
 .../source/enumerator/StaticIcebergEnumerator.java |   2 +-
 11 files changed, 643 insertions(+), 2 deletions(-)

diff --git a/flink/v1.14/build.gradle b/flink/v1.14/build.gradle
index a0e01c8a4c..537c977e19 100644
--- a/flink/v1.14/build.gradle
+++ b/flink/v1.14/build.gradle
@@ -60,6 +60,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
       exclude group: 'org.apache.hive', module: 'hive-storage-api'
     }
 
+    testImplementation "org.apache.flink:flink-connector-test-utils:${flinkVersion}"
     testImplementation "org.apache.flink:flink-core:${flinkVersion}"
     testImplementation "org.apache.flink:flink-runtime:${flinkVersion}"
     testImplementation ("org.apache.flink:flink-test-utils-junit:${flinkVersion}") {
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
similarity index 99%
copy from flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
copy to flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
index 98c5fa3ede..8c9be862bf 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
@@ -150,6 +150,7 @@ abstract class AbstractIcebergEnumerator implements SplitEnumerator<IcebergSourc
     if (availableFuture.get() != null) {
       return;
     }
+
     CompletableFuture<Void> future = assigner.isAvailable()
         .thenAccept(ignore ->
             // Must run assignSplits in coordinator thread
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
new file mode 100644
index 0000000000..e2b94b8c3e
--- /dev/null
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.iceberg.flink.source.ScanContext;
+import org.apache.iceberg.flink.source.assigner.SplitAssigner;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public class ContinuousIcebergEnumerator extends AbstractIcebergEnumerator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ContinuousIcebergEnumerator.class);
+
+  private final SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext;
+  private final SplitAssigner assigner;
+  private final ScanContext scanContext;
+  private final ContinuousSplitPlanner splitPlanner;
+
+  /**
+   * snapshotId for the last enumerated snapshot. next incremental enumeration
+   * should be based off this as the starting position.
+   */
+  private final AtomicReference<IcebergEnumeratorPosition> enumeratorPosition;
+
+  public ContinuousIcebergEnumerator(
+      SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext,
+      SplitAssigner assigner,
+      ScanContext scanContext,
+      ContinuousSplitPlanner splitPlanner,
+      @Nullable IcebergEnumeratorState enumState) {
+    super(enumeratorContext, assigner);
+
+    this.enumeratorContext = enumeratorContext;
+    this.assigner = assigner;
+    this.scanContext = scanContext;
+    this.splitPlanner = splitPlanner;
+    this.enumeratorPosition = new AtomicReference<>();
+    if (enumState != null) {
+      this.enumeratorPosition.set(enumState.lastEnumeratedPosition());
+    }
+  }
+
+  @Override
+  public void start() {
+    super.start();
+    enumeratorContext.callAsync(
+        this::discoverSplits,
+        this::processDiscoveredSplits,
+        0L,
+        scanContext.monitorInterval().toMillis());
+  }
+
+  @Override
+  public void close() throws IOException {
+    splitPlanner.close();
+    super.close();
+  }
+
+  @Override
+  protected boolean shouldWaitForMoreSplits() {
+    return true;
+  }
+
+  @Override
+  public IcebergEnumeratorState snapshotState(long checkpointId) {
+    return new IcebergEnumeratorState(enumeratorPosition.get(), assigner.state());
+  }
+
+  /**
+   * This method is executed in an IO thread pool.
+   */
+  private ContinuousEnumerationResult discoverSplits() {
+    return splitPlanner.planSplits(enumeratorPosition.get());
+  }
+
+  /**
+   * This method is executed in a single coordinator thread.
+   */
+  private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwable error) {
+    if (error == null) {
+      if (!Objects.equals(result.fromPosition(), enumeratorPosition.get())) {
+        // Multiple discoverSplits() may be triggered with the same starting snapshot to the I/O thread pool.
+        // E.g., the splitDiscoveryInterval is very short (like 10 ms in some unit tests) or the thread
+        // pool is busy and multiple discovery actions are executed concurrently. Discovery result should
+        // only be accepted if the starting position matches the enumerator position (like compare-and-swap).
+        LOG.info("Skip {} discovered splits because the scan starting position doesn't match " +
+                "the current enumerator position: enumerator position = {}, scan starting position = {}",
+            result.splits().size(), enumeratorPosition.get(), result.fromPosition());
+      } else {
+        assigner.onDiscoveredSplits(result.splits());
+        LOG.info("Added {} splits discovered between ({}, {}] to the assigner",
+            result.splits().size(), result.fromPosition(), result.toPosition());
+        // update the enumerator position even if there is no split discovered
+        // or the toPosition is empty (e.g. for empty table).
+        enumeratorPosition.set(result.toPosition());
+        LOG.info("Update enumerator position to {}", result.toPosition());
+      }
+    } else {
+      LOG.error("Failed to discover new splits", error);
+    }
+  }
+}
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorPositionSerializer.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorPositionSerializer.java
new file mode 100644
index 0000000000..83b230e80e
--- /dev/null
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorPositionSerializer.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.io.IOException;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+class IcebergEnumeratorPositionSerializer implements SimpleVersionedSerializer<IcebergEnumeratorPosition> {
+
+  public static final IcebergEnumeratorPositionSerializer INSTANCE = new IcebergEnumeratorPositionSerializer();
+
+  private static final int VERSION = 1;
+
+  private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
+      ThreadLocal.withInitial(() -> new DataOutputSerializer(128));
+
+  @Override
+  public int getVersion() {
+    return VERSION;
+  }
+
+  @Override
+  public byte[] serialize(IcebergEnumeratorPosition position) throws IOException {
+    return serializeV1(position);
+  }
+
+  @Override
+  public IcebergEnumeratorPosition deserialize(int version, byte[] serialized) throws IOException {
+    switch (version) {
+      case 1:
+        return deserializeV1(serialized);
+      default:
+        throw new IOException("Unknown version: " + version);
+    }
+  }
+
+  private byte[] serializeV1(IcebergEnumeratorPosition position) throws IOException {
+    DataOutputSerializer out = SERIALIZER_CACHE.get();
+    out.writeBoolean(position.snapshotId() != null);
+    if (position.snapshotId() != null) {
+      out.writeLong(position.snapshotId());
+    }
+    out.writeBoolean(position.snapshotTimestampMs() != null);
+    if (position.snapshotTimestampMs() != null) {
+      out.writeLong(position.snapshotTimestampMs());
+    }
+    byte[] result = out.getCopyOfBuffer();
+    out.clear();
+    return result;
+  }
+
+  private IcebergEnumeratorPosition deserializeV1(byte[] serialized) throws IOException {
+    DataInputDeserializer in = new DataInputDeserializer(serialized);
+    Long snapshotId = null;
+    if (in.readBoolean()) {
+      snapshotId = in.readLong();
+    }
+
+    Long snapshotTimestampMs = null;
+    if (in.readBoolean()) {
+      snapshotTimestampMs = in.readLong();
+    }
+
+    if (snapshotId != null) {
+      return IcebergEnumeratorPosition.of(snapshotId, snapshotTimestampMs);
+    } else {
+      return IcebergEnumeratorPosition.empty();
+    }
+  }
+}
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorState.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorState.java
new file mode 100644
index 0000000000..bd2f44c005
--- /dev/null
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorState.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.io.Serializable;
+import java.util.Collection;
+import javax.annotation.Nullable;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+
+/**
+ * Enumerator state for checkpointing
+ */
+public class IcebergEnumeratorState implements Serializable {
+  @Nullable
+  private final IcebergEnumeratorPosition lastEnumeratedPosition;
+  private final Collection<IcebergSourceSplitState> pendingSplits;
+
+  public IcebergEnumeratorState(Collection<IcebergSourceSplitState> pendingSplits) {
+    this(null, pendingSplits);
+  }
+
+  public IcebergEnumeratorState(
+      @Nullable IcebergEnumeratorPosition lastEnumeratedPosition,
+      Collection<IcebergSourceSplitState> pendingSplits) {
+    this.lastEnumeratedPosition = lastEnumeratedPosition;
+    this.pendingSplits = pendingSplits;
+  }
+
+  @Nullable
+  public IcebergEnumeratorPosition lastEnumeratedPosition() {
+    return lastEnumeratedPosition;
+  }
+
+  public Collection<IcebergSourceSplitState> pendingSplits() {
+    return pendingSplits;
+  }
+}
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java
new file mode 100644
index 0000000000..8f020bbe53
--- /dev/null
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.io.IOException;
+import java.util.Collection;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+@Internal
+public class IcebergEnumeratorStateSerializer implements SimpleVersionedSerializer<IcebergEnumeratorState> {
+
+  public static final IcebergEnumeratorStateSerializer INSTANCE = new IcebergEnumeratorStateSerializer();
+
+  private static final int VERSION = 1;
+
+  private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
+      ThreadLocal.withInitial(() -> new DataOutputSerializer(1024));
+
+  private final IcebergEnumeratorPositionSerializer positionSerializer = IcebergEnumeratorPositionSerializer.INSTANCE;
+  private final IcebergSourceSplitSerializer splitSerializer = IcebergSourceSplitSerializer.INSTANCE;
+
+  @Override
+  public int getVersion() {
+    return VERSION;
+  }
+
+  @Override
+  public byte[] serialize(IcebergEnumeratorState enumState) throws IOException {
+    return serializeV1(enumState);
+  }
+
+  @Override
+  public IcebergEnumeratorState deserialize(int version, byte[] serialized) throws IOException {
+    switch (version) {
+      case 1:
+        return deserializeV1(serialized);
+      default:
+        throw new IOException("Unknown version: " + version);
+    }
+  }
+
+  private byte[] serializeV1(IcebergEnumeratorState enumState) throws IOException {
+    DataOutputSerializer out = SERIALIZER_CACHE.get();
+
+    out.writeBoolean(enumState.lastEnumeratedPosition() != null);
+    if (enumState.lastEnumeratedPosition() != null) {
+      out.writeInt(positionSerializer.getVersion());
+      byte[] positionBytes = positionSerializer.serialize(enumState.lastEnumeratedPosition());
+      out.writeInt(positionBytes.length);
+      out.write(positionBytes);
+    }
+
+    out.writeInt(splitSerializer.getVersion());
+    out.writeInt(enumState.pendingSplits().size());
+    for (IcebergSourceSplitState splitState : enumState.pendingSplits()) {
+      byte[] splitBytes = splitSerializer.serialize(splitState.split());
+      out.writeInt(splitBytes.length);
+      out.write(splitBytes);
+      out.writeUTF(splitState.status().name());
+    }
+
+    byte[] result = out.getCopyOfBuffer();
+    out.clear();
+    return result;
+  }
+
+  private IcebergEnumeratorState deserializeV1(byte[] serialized) throws IOException {
+    DataInputDeserializer in = new DataInputDeserializer(serialized);
+
+    IcebergEnumeratorPosition enumeratorPosition = null;
+    if (in.readBoolean()) {
+      int version = in.readInt();
+      byte[] positionBytes = new byte[in.readInt()];
+      in.read(positionBytes);
+      enumeratorPosition = positionSerializer.deserialize(version, positionBytes);
+    }
+
+    int splitSerializerVersion = in.readInt();
+    int splitCount = in.readInt();
+    Collection<IcebergSourceSplitState> pendingSplits = Lists.newArrayListWithCapacity(splitCount);
+    for (int i = 0; i < splitCount; ++i) {
+      byte[] splitBytes = new byte[in.readInt()];
+      in.read(splitBytes);
+      IcebergSourceSplit split = splitSerializer.deserialize(splitSerializerVersion, splitBytes);
+      String statusName = in.readUTF();
+      pendingSplits.add(new IcebergSourceSplitState(split, IcebergSourceSplitStatus.valueOf(statusName)));
+    }
+    return new IcebergEnumeratorState(enumeratorPosition, pendingSplits);
+  }
+}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java
similarity index 98%
copy from flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java
copy to flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java
index 0f287864be..8d33b6f073 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java
@@ -35,7 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * One-time split enumeration at the start-up
+ * One-time split enumeration at the start-up for batch execution
  */
 @Internal
 public class StaticIcebergEnumerator extends AbstractIcebergEnumerator {
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
new file mode 100644
index 0000000000..5e7f926e3a
--- /dev/null
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
+import org.apache.iceberg.flink.source.ScanContext;
+import org.apache.iceberg.flink.source.SplitHelpers;
+import org.apache.iceberg.flink.source.StreamingStartingStrategy;
+import org.apache.iceberg.flink.source.assigner.SimpleSplitAssigner;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.flink.source.split.SplitRequestEvent;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestContinuousIcebergEnumerator {
+  @ClassRule
+  public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  @Test
+  public void testDiscoverSplitWhenNoReaderRegistered() throws Exception {
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner();
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext = ScanContext.builder()
+        .streaming(true)
+        .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+        .build();
+    ContinuousIcebergEnumerator enumerator = createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    Collection<IcebergSourceSplitState> pendingSplitsEmpty = enumerator.snapshotState(1).pendingSplits();
+    Assert.assertEquals(0, pendingSplitsEmpty.size());
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits = SplitHelpers
+        .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits, IcebergEnumeratorPosition.of(1L, 1L));
+    enumeratorContext.triggerAllActions();
+
+    Collection<IcebergSourceSplitState> pendingSplits = enumerator.snapshotState(2).pendingSplits();
+    Assert.assertEquals(1, pendingSplits.size());
+    IcebergSourceSplitState pendingSplit = pendingSplits.iterator().next();
+    Assert.assertEquals(splits.get(0).splitId(), pendingSplit.split().splitId());
+    Assert.assertEquals(IcebergSourceSplitStatus.UNASSIGNED, pendingSplit.status());
+  }
+
+  @Test
+  public void testDiscoverWhenReaderRegistered() throws Exception {
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner();
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext = ScanContext.builder()
+        .streaming(true)
+        .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+        .build();
+    ContinuousIcebergEnumerator enumerator = createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // register one reader, and let it request a split
+    enumeratorContext.registerReader(2, "localhost");
+    enumerator.addReader(2);
+    enumerator.handleSourceEvent(2,
+        new SplitRequestEvent());
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits = SplitHelpers
+        .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    splitPlanner.addSplits(splits, IcebergEnumeratorPosition.of(1L, 1L));
+    enumeratorContext.triggerAllActions();
+
+    Assert.assertTrue(enumerator.snapshotState(1).pendingSplits().isEmpty());
+    MatcherAssert.assertThat(enumeratorContext.getSplitAssignments().get(2).getAssignedSplits(),
+        CoreMatchers.hasItem(splits.get(0)));
+  }
+
+  @Test
+  public void testRequestingReaderUnavailableWhenSplitDiscovered() throws Exception {
+    ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner();
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext config = ScanContext.builder()
+        .streaming(true)
+        .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+        .build();
+    ContinuousIcebergEnumerator enumerator = createEnumerator(enumeratorContext, config, splitPlanner);
+
+    // register one reader, and let it request a split
+    enumeratorContext.registerReader(2, "localhost");
+    enumerator.addReader(2);
+    enumerator.handleSourceEvent(2,
+        new SplitRequestEvent());
+
+    // remove the reader (like in a failure)
+    enumeratorContext.registeredReaders().remove(2);
+
+    // make one split available and trigger the periodic discovery
+    List<IcebergSourceSplit> splits = SplitHelpers
+        .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
+    Assert.assertEquals(1, splits.size());
+    splitPlanner.addSplits(splits, IcebergEnumeratorPosition.of(1L, 1L));
+    enumeratorContext.triggerAllActions();
+
+    Assert.assertFalse(enumeratorContext.getSplitAssignments().containsKey(2));
+    List<String> pendingSplitIds = enumerator.snapshotState(1).pendingSplits().stream()
+        .map(IcebergSourceSplitState::split)
+        .map(IcebergSourceSplit::splitId)
+        .collect(Collectors.toList());
+    Assert.assertEquals(splits.size(), pendingSplitIds.size());
+    Assert.assertEquals(splits.get(0).splitId(), pendingSplitIds.get(0));
+
+    // register the reader again, and let it request a split
+    enumeratorContext.registerReader(2, "localhost");
+    enumerator.addReader(2);
+    enumerator.handleSourceEvent(2,
+        new SplitRequestEvent());
+
+    Assert.assertTrue(enumerator.snapshotState(2).pendingSplits().isEmpty());
+    MatcherAssert.assertThat(enumeratorContext.getSplitAssignments().get(2).getAssignedSplits(),
+        CoreMatchers.hasItem(splits.get(0)));
+  }
+
+  private static ContinuousIcebergEnumerator createEnumerator(
+      SplitEnumeratorContext<IcebergSourceSplit> context,
+      ScanContext scanContext,
+      ContinuousSplitPlanner splitPlanner) {
+
+    ContinuousIcebergEnumerator enumerator =
+        new ContinuousIcebergEnumerator(
+            context,
+            new SimpleSplitAssigner(Collections.emptyList()),
+            scanContext,
+            splitPlanner,
+            null);
+    enumerator.start();
+    return enumerator;
+  }
+
+}
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java
new file mode 100644
index 0000000000..05abe7bc17
--- /dev/null
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source.enumerator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.iceberg.flink.source.SplitHelpers;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestIcebergEnumeratorStateSerializer {
+  @ClassRule
+  public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  private final IcebergEnumeratorStateSerializer serializer = IcebergEnumeratorStateSerializer.INSTANCE;
+
+  @Test
+  public void testEmptySnapshotIdAndPendingSplits() throws Exception {
+    IcebergEnumeratorState enumeratorState = new IcebergEnumeratorState(Collections.emptyList());
+    byte[] result = serializer.serialize(enumeratorState);
+    IcebergEnumeratorState deserialized = serializer.deserialize(serializer.getVersion(), result);
+    assertEnumeratorStateEquals(enumeratorState, deserialized);
+  }
+
+  @Test
+  public void testSomeSnapshotIdAndEmptyPendingSplits() throws Exception {
+    IcebergEnumeratorPosition position = IcebergEnumeratorPosition.of(1L, System.currentTimeMillis());
+    IcebergEnumeratorState enumeratorState = new IcebergEnumeratorState(position, Collections.emptyList());
+    byte[] result = serializer.serialize(enumeratorState);
+    IcebergEnumeratorState deserialized = serializer.deserialize(serializer.getVersion(), result);
+    assertEnumeratorStateEquals(enumeratorState, deserialized);
+  }
+
+  @Test
+  public void testSomeSnapshotIdAndPendingSplits() throws Exception {
+    IcebergEnumeratorPosition position = IcebergEnumeratorPosition.of(2L, System.currentTimeMillis());
+    List<IcebergSourceSplit> splits = SplitHelpers
+        .createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 3, 1);
+    Collection<IcebergSourceSplitState> pendingSplits = Lists.newArrayList();
+    pendingSplits.add(new IcebergSourceSplitState(splits.get(0), IcebergSourceSplitStatus.UNASSIGNED));
+    pendingSplits.add(new IcebergSourceSplitState(splits.get(1), IcebergSourceSplitStatus.ASSIGNED));
+    pendingSplits.add(new IcebergSourceSplitState(splits.get(2), IcebergSourceSplitStatus.COMPLETED));
+
+    IcebergEnumeratorState enumeratorState = new IcebergEnumeratorState(position, pendingSplits);
+    byte[] result = serializer.serialize(enumeratorState);
+    IcebergEnumeratorState deserialized = serializer.deserialize(serializer.getVersion(), result);
+    assertEnumeratorStateEquals(enumeratorState, deserialized);
+  }
+
+  private void assertEnumeratorStateEquals(IcebergEnumeratorState expected, IcebergEnumeratorState actual) {
+    Assert.assertEquals(expected.lastEnumeratedPosition(), actual.lastEnumeratedPosition());
+    Assert.assertEquals(expected.pendingSplits().size(), actual.pendingSplits().size());
+    Iterator<IcebergSourceSplitState> expectedIterator = expected.pendingSplits().iterator();
+    Iterator<IcebergSourceSplitState> actualIterator = actual.pendingSplits().iterator();
+    for (int i = 0; i < expected.pendingSplits().size(); ++i) {
+      IcebergSourceSplitState expectedSplitState = expectedIterator.next();
+      IcebergSourceSplitState actualSplitState = actualIterator.next();
+      Assert.assertEquals(expectedSplitState.split().splitId(), actualSplitState.split().splitId());
+      Assert.assertEquals(expectedSplitState.split().fileOffset(), actualSplitState.split().fileOffset());
+      Assert.assertEquals(expectedSplitState.split().recordOffset(), actualSplitState.split().recordOffset());
+      Assert.assertEquals(expectedSplitState.status(), actualSplitState.status());
+    }
+  }
+}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
index 98c5fa3ede..8c9be862bf 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
@@ -150,6 +150,7 @@ abstract class AbstractIcebergEnumerator implements SplitEnumerator<IcebergSourc
     if (availableFuture.get() != null) {
       return;
     }
+
     CompletableFuture<Void> future = assigner.isAvailable()
         .thenAccept(ignore ->
             // Must run assignSplits in coordinator thread
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java
index 0f287864be..8d33b6f073 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/StaticIcebergEnumerator.java
@@ -35,7 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * One-time split enumeration at the start-up
+ * One-time split enumeration at the start-up for batch execution
  */
 @Internal
 public class StaticIcebergEnumerator extends AbstractIcebergEnumerator {