You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "xinyuiscool (via GitHub)" <gi...@apache.org> on 2023/02/24 19:28:52 UTC

[GitHub] [beam] xinyuiscool commented on a diff in pull request #25525: [Flink Runner] Add new Source classes that are based on FLIP-27 Source API.

xinyuiscool commented on code in PR #25525:
URL: https://github.com/apache/beam/pull/25525#discussion_r1117439611


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.ValueWithRecordId;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Flink {@link org.apache.flink.api.connector.source.SourceReader SourceReader} implementation
+ * that reads from the assigned {@link
+ * org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit
+ * FlinkSourceSplits} by using Beam {@link org.apache.beam.sdk.io.UnboundedSource.UnboundedReader
+ * UnboundedReaders}.
+ *
+ * <p>This reader consumes all the assigned source splits concurrently.
+ *
+ * @param <T> the output element type of the encapsulated Beam {@link
+ *     org.apache.beam.sdk.io.UnboundedSource.UnboundedReader UnboundedReader}.
+ */
+public class FlinkUnboundedSourceReader<T>
+    extends FlinkSourceReaderBase<T, WindowedValue<ValueWithRecordId<T>>> {
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkUnboundedSourceReader.class);
+  // This name is defined in FLIP-33.
+  @VisibleForTesting protected static final String PENDING_BYTES_METRIC_NAME = "pendingBytes";
+  private static final long SLEEP_ON_IDLE_MS = 50L;
+  private final AtomicReference<CompletableFuture<Void>> dataAvailableFutureRef;
+  private final List<ReaderAndOutput> readers;
+  private int currentReaderIndex;
+  private volatile boolean shouldEmitWatermark;
+
+  public FlinkUnboundedSourceReader(
+      SourceReaderContext context,
+      PipelineOptions pipelineOptions,
+      @Nullable Function<WindowedValue<ValueWithRecordId<T>>, Long> timestampExtractor) {
+    super(context, pipelineOptions, timestampExtractor);
+    this.readers = new ArrayList<>();
+    this.dataAvailableFutureRef = new AtomicReference<>(DUMMY_FUTURE);
+    this.currentReaderIndex = 0;
+  }
+
+  @VisibleForTesting
+  protected FlinkUnboundedSourceReader(
+      SourceReaderContext context,
+      PipelineOptions pipelineOptions,
+      ScheduledExecutorService executor,
+      @Nullable Function<WindowedValue<ValueWithRecordId<T>>, Long> timestampExtractor) {
+    super(executor, context, pipelineOptions, timestampExtractor);
+    this.readers = new ArrayList<>();
+    this.dataAvailableFutureRef = new AtomicReference<>(DUMMY_FUTURE);
+    this.currentReaderIndex = 0;
+  }
+
+  @Override
+  public void start() {
+    createPendingBytesGauge(context);
+    Long watermarkInterval =
+        pipelineOptions.as(FlinkPipelineOptions.class).getAutoWatermarkInterval();
+    if (watermarkInterval != null) {
+      scheduleTaskAtFixedRate(
+          () -> {
+            // Set the watermark emission flag first.
+            shouldEmitWatermark = true;
+            // Wake up the main thread if necessary.
+            CompletableFuture<Void> f = dataAvailableFutureRef.get();
+            if (f != DUMMY_FUTURE) {
+              f.complete(null);
+            }
+          },
+          watermarkInterval,
+          watermarkInterval);
+    } else {
+      LOG.warn("AutoWatermarkInterval is not set, watermarks won't be emitted.");
+    }
+  }
+
+  @Override
+  public InputStatus pollNext(ReaderOutput<WindowedValue<ValueWithRecordId<T>>> output)
+      throws Exception {
+    checkExceptionAndMaybeThrow();
+    maybeEmitWatermark();
+    maybeCreateReaderForNewSplits();
+
+    ReaderAndOutput reader = nextReaderWithData();
+    if (reader != null) {
+      emitRecord(reader, output);
+      return InputStatus.MORE_AVAILABLE;
+    } else {
+      LOG.trace("No data available for now.");
+      return InputStatus.NOTHING_AVAILABLE;
+    }
+  }
+
+  /**
+   * Check whether there are data available from alive readers. If not, set a future and wait for
+   * the periodically running wake-up task to complete that future when the check interval passes.
+   * This method is only called by the main thread, which is the only thread writing to the future
+   * ref. Note that for UnboundedSource, because the splits never finishes, there are always alive
+   * readers after the first split assigment. Hence, the return value of {@link
+   * FlinkSourceReaderBase#isAvailable()} will effectively be determined by this method after the
+   * first split assignment.
+   */
+  @Override
+  protected CompletableFuture<Void> isAvailableForAliveReaders() {
+    CompletableFuture<Void> future = dataAvailableFutureRef.get();
+    if (future == DUMMY_FUTURE) {
+      CompletableFuture<Void> newFuture = new CompletableFuture<>();
+      // Need to set the future first to avoid the race condition of missing the watermark emission
+      // notification.
+      dataAvailableFutureRef.set(newFuture);
+      if (shouldEmitWatermark || hasException()) {
+        // There are exception after we set the new future,
+        // immediately complete the future and return.
+        dataAvailableFutureRef.set(DUMMY_FUTURE);
+        newFuture.complete(null);
+      } else {
+        LOG.debug("There is no data available, scheduling the idle reader checker.");
+        scheduleTask(

Review Comment:
   This part is a bit unclear to me. Seems this checker thread will complete the dataAvailableFuture without start() being called. Does it mean that the reader will start to poll? Or isAvailable() will be invoked again? 



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat.FlinkSourceCompat;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.metrics.Counter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An abstract implementation of {@link SourceReader} which encapsulates {@link Source Beam Sources}
+ * for data reading.
+ *
+ * <ol>
+ *   <li>Idle timeout support.
+ *   <li>Splits addition handling.
+ *   <li>Split reader creation and management.
+ *   <li>checkpoint management
+ * </ol>
+ *
+ * <p>This implementation provides unified logic for both {@link BoundedSource} and {@link
+ * UnboundedSource}. The subclasses are expected to only implement the {@link
+ * #pollNext(ReaderOutput)} method.
+ *
+ * @param <OutputT> the output element type from the encapsulated {@link Source Beam sources.}
+ */
+public abstract class FlinkSourceReaderBase<T, OutputT>
+    implements SourceReader<OutputT, FlinkSourceSplit<T>> {
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkSourceReaderBase.class);
+  protected static final CompletableFuture<Void> AVAILABLE_NOW =
+      CompletableFuture.completedFuture(null);
+  // Some dummy instances to make the annotation checker happy with AtomicReference.
+  protected static final CompletableFuture<Void> DUMMY_FUTURE = new CompletableFuture<>();
+  protected static final Exception NO_EXCEPTION = new Exception();
+
+  protected final PipelineOptions pipelineOptions;
+  protected final @Nullable Function<OutputT, Long> timestampExtractor;
+  private final Queue<FlinkSourceSplit<T>> sourceSplits = new ArrayDeque<>();
+  // This needs to be a ConcurrentHashMap because the metric retrieving thread may access it.
+  private final ConcurrentMap<Integer, ReaderAndOutput> beamSourceReaders;
+  protected final SourceReaderContext context;
+  private final ScheduledExecutorService executor;
+
+  protected final Counter numRecordsInCounter;
+  protected final long idleTimeoutMs;
+  private final CompletableFuture<Void> idleTimeoutFuture;
+  private final AtomicReference<Throwable> exception;
+  private boolean idleTimeoutCountingDown;
+  private CompletableFuture<Void> waitingForSplitChangeFuture;
+  private boolean noMoreSplits;
+
+  protected FlinkSourceReaderBase(
+      SourceReaderContext context,
+      PipelineOptions pipelineOptions,
+      @Nullable Function<OutputT, Long> timestampExtractor) {
+    this(
+        Executors.newSingleThreadScheduledExecutor(
+            r -> new Thread(r, "FlinkSource-Executor-Thread-" + context.getIndexOfSubtask())),
+        context,
+        pipelineOptions,
+        timestampExtractor);
+  }
+
+  protected FlinkSourceReaderBase(
+      ScheduledExecutorService executor,
+      SourceReaderContext context,
+      PipelineOptions pipelineOptions,
+      @Nullable Function<OutputT, Long> timestampExtractor) {
+    this.context = context;
+    this.pipelineOptions = pipelineOptions;
+    this.timestampExtractor = timestampExtractor;
+    this.beamSourceReaders = new ConcurrentHashMap<>();
+    this.exception = new AtomicReference<>(NO_EXCEPTION);
+    this.executor = executor;
+    this.idleTimeoutMs =
+        pipelineOptions.as(FlinkPipelineOptions.class).getShutdownSourcesAfterIdleMs();
+    this.idleTimeoutFuture = new CompletableFuture<>();
+    this.waitingForSplitChangeFuture = new CompletableFuture<>();
+    this.idleTimeoutCountingDown = false;
+    // TODO: Remove the casting and use SourceReaderMetricGroup after minimum FLink version is
+    // upgraded to 1.14 and above.
+    this.numRecordsInCounter = FlinkSourceCompat.getNumRecordsInCounter(context);
+  }
+
+  @Override
+  public void start() {}
+
+  @Override
+  public List<FlinkSourceSplit<T>> snapshotState(long checkpointId) {
+    checkExceptionAndMaybeThrow();
+    // Add all the source splits whose readers haven't been created.
+    List<FlinkSourceSplit<T>> splitsState = new ArrayList<>(sourceSplits);
+
+    // Add all the source splits being actively read.
+    beamSourceReaders.forEach(
+        (splitId, readerAndOutput) -> {
+          Source.Reader<T> reader = readerAndOutput.reader;
+          if (reader instanceof BoundedSource.BoundedReader) {
+            // Sometimes users may decide to run a bounded source in streaming mode as "finite
+            // stream."
+            // For bounded source, the checkpoint granularity is the entire source split.
+            // So, in case of failure, all the data from this split will be consumed again.
+            splitsState.add(new FlinkSourceSplit<>(splitId, reader.getCurrentSource()));
+          } else if (reader instanceof UnboundedSource.UnboundedReader) {
+            // The checkpoint for unbounded sources is fine granular.
+            byte[] checkpointState =
+                getAndEncodeCheckpointMark((UnboundedSource.UnboundedReader<OutputT>) reader);
+            splitsState.add(
+                new FlinkSourceSplit<>(splitId, reader.getCurrentSource(), checkpointState));
+          }
+        });
+    return splitsState;
+  }
+
+  @Override
+  public CompletableFuture<Void> isAvailable() {
+    checkExceptionAndMaybeThrow();
+    if (!sourceSplits.isEmpty() || !beamSourceReaders.isEmpty()) {
+      // There are still live readers.
+      return isAvailableForAliveReaders();
+    } else if (noMoreSplits) {
+      // All the splits have been read, wait for idle timeout.
+      checkIdleTimeoutAndMaybeStartCountdown();
+      return idleTimeoutFuture;
+    } else {
+      // There is no live readers, waiting for new split assignments or no more splits notification.
+      if (waitingForSplitChangeFuture.isDone()) {
+        waitingForSplitChangeFuture = new CompletableFuture<>();
+      }
+      return waitingForSplitChangeFuture;
+    }
+  }
+
+  @Override
+  public void notifyNoMoreSplits() {
+    checkExceptionAndMaybeThrow();
+    LOG.info("Received NoMoreSplits signal from enumerator.");
+    noMoreSplits = true;
+    waitingForSplitChangeFuture.complete(null);
+  }
+
+  @Override
+  public void addSplits(List<FlinkSourceSplit<T>> splits) {
+    checkExceptionAndMaybeThrow();
+    LOG.info("Adding splits {}", splits);
+    sourceSplits.addAll(splits);
+    waitingForSplitChangeFuture.complete(null);
+  }
+
+  @Override
+  public void close() throws Exception {
+    for (ReaderAndOutput readerAndOutput : beamSourceReaders.values()) {
+      readerAndOutput.reader.close();
+    }
+    executor.shutdown();
+  }
+
+  // ----------------- protected abstract methods ----------------------
+
+  /**
+   * This method needs to be overridden by subclasses to determine if data is available when there
+   * are alive readers. For example, an unbounded source may not have any source split ready for
+   * data emission even if all the sources are still alive. Whereas for the bounded source, data is
+   * always available as long as there are alive readers.
+   */
+  protected abstract CompletableFuture<Void> isAvailableForAliveReaders();
+
+  // ----------------- protected helper methods for subclasses --------------------
+
+  protected Optional<ReaderAndOutput> createAndTrackNextReader() throws IOException {
+    FlinkSourceSplit<T> sourceSplit = sourceSplits.poll();
+    if (sourceSplit != null) {
+      Source.Reader<T> reader = createReader(sourceSplit);
+      ReaderAndOutput readerAndOutput = new ReaderAndOutput(sourceSplit.splitId(), reader, false);
+      beamSourceReaders.put(sourceSplit.splitIndex(), readerAndOutput);
+      return Optional.of(readerAndOutput);
+    }
+    return Optional.empty();
+  }
+
+  protected void finishSplit(int splitIndex) throws IOException {
+    ReaderAndOutput readerAndOutput = beamSourceReaders.remove(splitIndex);
+    if (readerAndOutput != null) {
+      LOG.info("Finished reading from split {}", readerAndOutput.splitId);
+      readerAndOutput.reader.close();
+    } else {
+      throw new IllegalStateException(
+          "SourceReader for split " + splitIndex + " should never be null!");
+    }
+  }
+
+  protected boolean checkIdleTimeoutAndMaybeStartCountdown() {
+    if (idleTimeoutMs <= 0) {
+      idleTimeoutFuture.complete(null);
+    } else if (!idleTimeoutCountingDown) {
+      scheduleTask(() -> idleTimeoutFuture.complete(null), idleTimeoutMs);
+      idleTimeoutCountingDown = true;
+    }
+    return idleTimeoutFuture.isDone();
+  }
+
+  protected boolean noMoreSplits() {
+    return noMoreSplits;
+  }
+
+  protected void scheduleTask(Runnable runnable, long delayMs) {
+    ignoreReturnValue(
+        executor.schedule(new ErrorRecordingRunnable(runnable), delayMs, TimeUnit.MILLISECONDS));
+  }
+
+  protected void scheduleTaskAtFixedRate(Runnable runnable, long delayMs, long periodMs) {
+    ignoreReturnValue(
+        executor.scheduleAtFixedRate(
+            new ErrorRecordingRunnable(runnable), delayMs, periodMs, TimeUnit.MILLISECONDS));
+  }
+
+  protected void execute(Runnable runnable) {
+    executor.execute(new ErrorRecordingRunnable(runnable));
+  }
+
+  protected void recordException(Throwable e) {
+    if (!exception.compareAndSet(NO_EXCEPTION, e)) {
+      exception.get().addSuppressed(e);
+    }
+  }
+
+  protected void checkExceptionAndMaybeThrow() {
+    if (exception.get() != NO_EXCEPTION) {
+      throw new RuntimeException("The source reader received exception.", exception.get());
+    }
+  }
+
+  protected boolean hasException() {
+    return exception.get() != NO_EXCEPTION;
+  }
+
+  protected Collection<FlinkSourceSplit<T>> sourceSplits() {
+    return Collections.unmodifiableCollection(sourceSplits);
+  }
+
+  protected Map<Integer, ReaderAndOutput> allReaders() {
+    return Collections.unmodifiableMap(beamSourceReaders);
+  }
+
+  protected static void ignoreReturnValue(Object o) {
+    // do nothing.
+  }
+  // ------------------------------ private methods ------------------------------
+
+  @SuppressWarnings("unchecked")
+  private <CheckpointMarkT extends UnboundedSource.CheckpointMark>

Review Comment:
   static?



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.translation.utils.SerdeUtils;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded.FlinkBoundedSource;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.impulse.BeamImpulseSource;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded.FlinkUnboundedSource;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+/**
+ * The base class for {@link FlinkBoundedSource} and {@link FlinkUnboundedSource}.
+ *
+ * @param <T> The data type of the records emitted by the raw Beam sources.
+ * @param <OutputT> The data type of the records emitted by the Flink Source.
+ */
+public abstract class FlinkSource<T, OutputT>
+    implements Source<OutputT, FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> {
+  protected final org.apache.beam.sdk.io.Source<T> beamSource;
+  protected final Boundedness boundedness;
+  protected final SerializablePipelineOptions serializablePipelineOptions;
+
+  private final int numSplits;
+
+  // ----------------- public static methods to construct sources --------------------
+
+  public static <T> FlinkBoundedSource<T> bounded(
+      BoundedSource<T> boundedSource,
+      SerializablePipelineOptions serializablePipelineOptions,
+      int numSplits) {
+    return new FlinkBoundedSource<>(
+        boundedSource, serializablePipelineOptions, Boundedness.BOUNDED, numSplits);
+  }
+
+  public static <T> FlinkUnboundedSource<T> unbounded(
+      UnboundedSource<T, ?> source,
+      SerializablePipelineOptions serializablePipelineOptions,
+      int numSplits) {
+    return new FlinkUnboundedSource<>(source, serializablePipelineOptions, numSplits);
+  }
+
+  public static FlinkBoundedSource<byte[]> unboundedImpulse(long shutdownSourceAfterIdleMs) {
+    FlinkPipelineOptions flinkPipelineOptions = FlinkPipelineOptions.defaults();
+    flinkPipelineOptions.setShutdownSourcesAfterIdleMs(shutdownSourceAfterIdleMs);
+    return new FlinkBoundedSource<>(

Review Comment:
   Pls add comments about the usage of FlinkBoundedSource here to construct an unbounded source.



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat.FlinkSourceCompat;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.metrics.Counter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An abstract implementation of {@link SourceReader} which encapsulates {@link Source Beam Sources}
+ * for data reading.
+ *
+ * <ol>
+ *   <li>Idle timeout support.
+ *   <li>Splits addition handling.
+ *   <li>Split reader creation and management.
+ *   <li>checkpoint management
+ * </ol>
+ *
+ * <p>This implementation provides unified logic for both {@link BoundedSource} and {@link
+ * UnboundedSource}. The subclasses are expected to only implement the {@link
+ * #pollNext(ReaderOutput)} method.
+ *
+ * @param <OutputT> the output element type from the encapsulated {@link Source Beam sources.}
+ */
+public abstract class FlinkSourceReaderBase<T, OutputT>
+    implements SourceReader<OutputT, FlinkSourceSplit<T>> {
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkSourceReaderBase.class);
+  protected static final CompletableFuture<Void> AVAILABLE_NOW =
+      CompletableFuture.completedFuture(null);
+  // Some dummy instances to make the annotation checker happy with AtomicReference.
+  protected static final CompletableFuture<Void> DUMMY_FUTURE = new CompletableFuture<>();
+  protected static final Exception NO_EXCEPTION = new Exception();
+
+  protected final PipelineOptions pipelineOptions;
+  protected final @Nullable Function<OutputT, Long> timestampExtractor;
+  private final Queue<FlinkSourceSplit<T>> sourceSplits = new ArrayDeque<>();
+  // This needs to be a ConcurrentHashMap because the metric retrieving thread may access it.
+  private final ConcurrentMap<Integer, ReaderAndOutput> beamSourceReaders;
+  protected final SourceReaderContext context;
+  private final ScheduledExecutorService executor;
+
+  protected final Counter numRecordsInCounter;
+  protected final long idleTimeoutMs;
+  private final CompletableFuture<Void> idleTimeoutFuture;
+  private final AtomicReference<Throwable> exception;
+  private boolean idleTimeoutCountingDown;
+  private CompletableFuture<Void> waitingForSplitChangeFuture;
+  private boolean noMoreSplits;
+
+  protected FlinkSourceReaderBase(
+      SourceReaderContext context,
+      PipelineOptions pipelineOptions,
+      @Nullable Function<OutputT, Long> timestampExtractor) {
+    this(
+        Executors.newSingleThreadScheduledExecutor(
+            r -> new Thread(r, "FlinkSource-Executor-Thread-" + context.getIndexOfSubtask())),
+        context,
+        pipelineOptions,
+        timestampExtractor);
+  }
+
+  protected FlinkSourceReaderBase(
+      ScheduledExecutorService executor,
+      SourceReaderContext context,
+      PipelineOptions pipelineOptions,
+      @Nullable Function<OutputT, Long> timestampExtractor) {
+    this.context = context;
+    this.pipelineOptions = pipelineOptions;
+    this.timestampExtractor = timestampExtractor;
+    this.beamSourceReaders = new ConcurrentHashMap<>();
+    this.exception = new AtomicReference<>(NO_EXCEPTION);
+    this.executor = executor;
+    this.idleTimeoutMs =
+        pipelineOptions.as(FlinkPipelineOptions.class).getShutdownSourcesAfterIdleMs();
+    this.idleTimeoutFuture = new CompletableFuture<>();
+    this.waitingForSplitChangeFuture = new CompletableFuture<>();
+    this.idleTimeoutCountingDown = false;
+    // TODO: Remove the casting and use SourceReaderMetricGroup after minimum FLink version is
+    // upgraded to 1.14 and above.
+    this.numRecordsInCounter = FlinkSourceCompat.getNumRecordsInCounter(context);
+  }
+
+  @Override
+  public void start() {}
+
+  @Override
+  public List<FlinkSourceSplit<T>> snapshotState(long checkpointId) {
+    checkExceptionAndMaybeThrow();
+    // Add all the source splits whose readers haven't been created.
+    List<FlinkSourceSplit<T>> splitsState = new ArrayList<>(sourceSplits);
+
+    // Add all the source splits being actively read.
+    beamSourceReaders.forEach(
+        (splitId, readerAndOutput) -> {
+          Source.Reader<T> reader = readerAndOutput.reader;
+          if (reader instanceof BoundedSource.BoundedReader) {
+            // Sometimes users may decide to run a bounded source in streaming mode as "finite
+            // stream."
+            // For bounded source, the checkpoint granularity is the entire source split.
+            // So, in case of failure, all the data from this split will be consumed again.
+            splitsState.add(new FlinkSourceSplit<>(splitId, reader.getCurrentSource()));
+          } else if (reader instanceof UnboundedSource.UnboundedReader) {
+            // The checkpoint for unbounded sources is fine granular.
+            byte[] checkpointState =
+                getAndEncodeCheckpointMark((UnboundedSource.UnboundedReader<OutputT>) reader);
+            splitsState.add(
+                new FlinkSourceSplit<>(splitId, reader.getCurrentSource(), checkpointState));
+          }
+        });
+    return splitsState;
+  }
+
+  @Override
+  public CompletableFuture<Void> isAvailable() {
+    checkExceptionAndMaybeThrow();
+    if (!sourceSplits.isEmpty() || !beamSourceReaders.isEmpty()) {
+      // There are still live readers.
+      return isAvailableForAliveReaders();
+    } else if (noMoreSplits) {
+      // All the splits have been read, wait for idle timeout.
+      checkIdleTimeoutAndMaybeStartCountdown();
+      return idleTimeoutFuture;
+    } else {
+      // There is no live readers, waiting for new split assignments or no more splits notification.
+      if (waitingForSplitChangeFuture.isDone()) {
+        waitingForSplitChangeFuture = new CompletableFuture<>();
+      }
+      return waitingForSplitChangeFuture;
+    }
+  }
+
+  @Override
+  public void notifyNoMoreSplits() {
+    checkExceptionAndMaybeThrow();
+    LOG.info("Received NoMoreSplits signal from enumerator.");
+    noMoreSplits = true;
+    waitingForSplitChangeFuture.complete(null);
+  }
+
+  @Override
+  public void addSplits(List<FlinkSourceSplit<T>> splits) {
+    checkExceptionAndMaybeThrow();
+    LOG.info("Adding splits {}", splits);
+    sourceSplits.addAll(splits);
+    waitingForSplitChangeFuture.complete(null);

Review Comment:
   If notifyNoMoreSplits() is bound to be called after all splits are added,  we only need to complete this future in the notifyNoMoreSplit(), right? it's a bit weird that we complete this future in two places and then in line 174 we recreate the future.
   
   If we only need to complete this future once, then we can mark the var final so it's easier to understand.



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat.FlinkSourceCompat;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.metrics.Counter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An abstract implementation of {@link SourceReader} which encapsulates {@link Source Beam Sources}
+ * for data reading.
+ *
+ * <ol>
+ *   <li>Idle timeout support.
+ *   <li>Splits addition handling.
+ *   <li>Split reader creation and management.
+ *   <li>checkpoint management
+ * </ol>
+ *
+ * <p>This implementation provides unified logic for both {@link BoundedSource} and {@link
+ * UnboundedSource}. The subclasses are expected to only implement the {@link
+ * #pollNext(ReaderOutput)} method.
+ *
+ * @param <OutputT> the output element type from the encapsulated {@link Source Beam sources.}
+ */
+public abstract class FlinkSourceReaderBase<T, OutputT>
+    implements SourceReader<OutputT, FlinkSourceSplit<T>> {
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkSourceReaderBase.class);
+  protected static final CompletableFuture<Void> AVAILABLE_NOW =
+      CompletableFuture.completedFuture(null);
+  // Some dummy instances to make the annotation checker happy with AtomicReference.
+  protected static final CompletableFuture<Void> DUMMY_FUTURE = new CompletableFuture<>();
+  protected static final Exception NO_EXCEPTION = new Exception();
+
+  protected final PipelineOptions pipelineOptions;
+  protected final @Nullable Function<OutputT, Long> timestampExtractor;
+  private final Queue<FlinkSourceSplit<T>> sourceSplits = new ArrayDeque<>();
+  // This needs to be a ConcurrentHashMap because the metric retrieving thread may access it.
+  private final ConcurrentMap<Integer, ReaderAndOutput> beamSourceReaders;
+  protected final SourceReaderContext context;
+  private final ScheduledExecutorService executor;
+
+  protected final Counter numRecordsInCounter;
+  protected final long idleTimeoutMs;
+  private final CompletableFuture<Void> idleTimeoutFuture;
+  private final AtomicReference<Throwable> exception;
+  private boolean idleTimeoutCountingDown;
+  private CompletableFuture<Void> waitingForSplitChangeFuture;
+  private boolean noMoreSplits;
+
+  protected FlinkSourceReaderBase(
+      SourceReaderContext context,
+      PipelineOptions pipelineOptions,
+      @Nullable Function<OutputT, Long> timestampExtractor) {
+    this(
+        Executors.newSingleThreadScheduledExecutor(
+            r -> new Thread(r, "FlinkSource-Executor-Thread-" + context.getIndexOfSubtask())),
+        context,
+        pipelineOptions,
+        timestampExtractor);
+  }
+
+  protected FlinkSourceReaderBase(
+      ScheduledExecutorService executor,
+      SourceReaderContext context,
+      PipelineOptions pipelineOptions,
+      @Nullable Function<OutputT, Long> timestampExtractor) {
+    this.context = context;
+    this.pipelineOptions = pipelineOptions;
+    this.timestampExtractor = timestampExtractor;
+    this.beamSourceReaders = new ConcurrentHashMap<>();
+    this.exception = new AtomicReference<>(NO_EXCEPTION);
+    this.executor = executor;
+    this.idleTimeoutMs =
+        pipelineOptions.as(FlinkPipelineOptions.class).getShutdownSourcesAfterIdleMs();
+    this.idleTimeoutFuture = new CompletableFuture<>();
+    this.waitingForSplitChangeFuture = new CompletableFuture<>();
+    this.idleTimeoutCountingDown = false;
+    // TODO: Remove the casting and use SourceReaderMetricGroup after minimum FLink version is
+    // upgraded to 1.14 and above.
+    this.numRecordsInCounter = FlinkSourceCompat.getNumRecordsInCounter(context);
+  }
+
+  @Override
+  public void start() {}
+
+  @Override
+  public List<FlinkSourceSplit<T>> snapshotState(long checkpointId) {
+    checkExceptionAndMaybeThrow();
+    // Add all the source splits whose readers haven't been created.
+    List<FlinkSourceSplit<T>> splitsState = new ArrayList<>(sourceSplits);
+
+    // Add all the source splits being actively read.
+    beamSourceReaders.forEach(
+        (splitId, readerAndOutput) -> {
+          Source.Reader<T> reader = readerAndOutput.reader;
+          if (reader instanceof BoundedSource.BoundedReader) {
+            // Sometimes users may decide to run a bounded source in streaming mode as "finite
+            // stream."
+            // For bounded source, the checkpoint granularity is the entire source split.
+            // So, in case of failure, all the data from this split will be consumed again.
+            splitsState.add(new FlinkSourceSplit<>(splitId, reader.getCurrentSource()));
+          } else if (reader instanceof UnboundedSource.UnboundedReader) {
+            // The checkpoint for unbounded sources is fine granular.
+            byte[] checkpointState =
+                getAndEncodeCheckpointMark((UnboundedSource.UnboundedReader<OutputT>) reader);
+            splitsState.add(
+                new FlinkSourceSplit<>(splitId, reader.getCurrentSource(), checkpointState));
+          }
+        });
+    return splitsState;
+  }
+
+  @Override
+  public CompletableFuture<Void> isAvailable() {
+    checkExceptionAndMaybeThrow();
+    if (!sourceSplits.isEmpty() || !beamSourceReaders.isEmpty()) {
+      // There are still live readers.
+      return isAvailableForAliveReaders();
+    } else if (noMoreSplits) {
+      // All the splits have been read, wait for idle timeout.
+      checkIdleTimeoutAndMaybeStartCountdown();
+      return idleTimeoutFuture;
+    } else {
+      // There is no live readers, waiting for new split assignments or no more splits notification.
+      if (waitingForSplitChangeFuture.isDone()) {
+        waitingForSplitChangeFuture = new CompletableFuture<>();
+      }
+      return waitingForSplitChangeFuture;
+    }
+  }
+
+  @Override
+  public void notifyNoMoreSplits() {
+    checkExceptionAndMaybeThrow();
+    LOG.info("Received NoMoreSplits signal from enumerator.");
+    noMoreSplits = true;
+    waitingForSplitChangeFuture.complete(null);
+  }
+
+  @Override
+  public void addSplits(List<FlinkSourceSplit<T>> splits) {
+    checkExceptionAndMaybeThrow();
+    LOG.info("Adding splits {}", splits);
+    sourceSplits.addAll(splits);
+    waitingForSplitChangeFuture.complete(null);
+  }
+
+  @Override
+  public void close() throws Exception {
+    for (ReaderAndOutput readerAndOutput : beamSourceReaders.values()) {
+      readerAndOutput.reader.close();
+    }
+    executor.shutdown();
+  }
+
+  // ----------------- protected abstract methods ----------------------
+
+  /**
+   * This method needs to be overridden by subclasses to determine if data is available when there
+   * are alive readers. For example, an unbounded source may not have any source split ready for
+   * data emission even if all the sources are still alive. Whereas for the bounded source, data is
+   * always available as long as there are alive readers.
+   */
+  protected abstract CompletableFuture<Void> isAvailableForAliveReaders();
+
+  // ----------------- protected helper methods for subclasses --------------------
+
+  protected Optional<ReaderAndOutput> createAndTrackNextReader() throws IOException {
+    FlinkSourceSplit<T> sourceSplit = sourceSplits.poll();
+    if (sourceSplit != null) {
+      Source.Reader<T> reader = createReader(sourceSplit);
+      ReaderAndOutput readerAndOutput = new ReaderAndOutput(sourceSplit.splitId(), reader, false);
+      beamSourceReaders.put(sourceSplit.splitIndex(), readerAndOutput);
+      return Optional.of(readerAndOutput);
+    }
+    return Optional.empty();
+  }
+
+  protected void finishSplit(int splitIndex) throws IOException {
+    ReaderAndOutput readerAndOutput = beamSourceReaders.remove(splitIndex);
+    if (readerAndOutput != null) {
+      LOG.info("Finished reading from split {}", readerAndOutput.splitId);
+      readerAndOutput.reader.close();
+    } else {
+      throw new IllegalStateException(
+          "SourceReader for split " + splitIndex + " should never be null!");
+    }
+  }
+
+  protected boolean checkIdleTimeoutAndMaybeStartCountdown() {
+    if (idleTimeoutMs <= 0) {
+      idleTimeoutFuture.complete(null);
+    } else if (!idleTimeoutCountingDown) {
+      scheduleTask(() -> idleTimeoutFuture.complete(null), idleTimeoutMs);
+      idleTimeoutCountingDown = true;
+    }
+    return idleTimeoutFuture.isDone();
+  }
+
+  protected boolean noMoreSplits() {
+    return noMoreSplits;
+  }
+
+  protected void scheduleTask(Runnable runnable, long delayMs) {
+    ignoreReturnValue(
+        executor.schedule(new ErrorRecordingRunnable(runnable), delayMs, TimeUnit.MILLISECONDS));
+  }
+
+  protected void scheduleTaskAtFixedRate(Runnable runnable, long delayMs, long periodMs) {
+    ignoreReturnValue(
+        executor.scheduleAtFixedRate(
+            new ErrorRecordingRunnable(runnable), delayMs, periodMs, TimeUnit.MILLISECONDS));
+  }
+
+  protected void execute(Runnable runnable) {
+    executor.execute(new ErrorRecordingRunnable(runnable));
+  }
+
+  protected void recordException(Throwable e) {
+    if (!exception.compareAndSet(NO_EXCEPTION, e)) {
+      exception.get().addSuppressed(e);
+    }
+  }
+
+  protected void checkExceptionAndMaybeThrow() {
+    if (exception.get() != NO_EXCEPTION) {
+      throw new RuntimeException("The source reader received exception.", exception.get());
+    }
+  }
+
+  protected boolean hasException() {
+    return exception.get() != NO_EXCEPTION;
+  }
+
+  protected Collection<FlinkSourceSplit<T>> sourceSplits() {
+    return Collections.unmodifiableCollection(sourceSplits);
+  }
+
+  protected Map<Integer, ReaderAndOutput> allReaders() {
+    return Collections.unmodifiableMap(beamSourceReaders);
+  }
+
+  protected static void ignoreReturnValue(Object o) {
+    // do nothing.
+  }
+  // ------------------------------ private methods ------------------------------
+
+  @SuppressWarnings("unchecked")
+  private <CheckpointMarkT extends UnboundedSource.CheckpointMark>
+      byte[] getAndEncodeCheckpointMark(UnboundedSource.UnboundedReader<OutputT> reader) {
+    UnboundedSource<OutputT, CheckpointMarkT> source =
+        (UnboundedSource<OutputT, CheckpointMarkT>) reader.getCurrentSource();
+    CheckpointMarkT checkpointMark = (CheckpointMarkT) reader.getCheckpointMark();
+    Coder<CheckpointMarkT> coder = source.getCheckpointMarkCoder();
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+      coder.encode(checkpointMark, baos);
+      return baos.toByteArray();
+    } catch (IOException ioe) {
+      throw new RuntimeException("Failed to encode checkpoint mark.", ioe);
+    }
+  }
+
+  private Source.Reader<T> createReader(@Nonnull FlinkSourceSplit<T> sourceSplit)
+      throws IOException {
+    Source<T> beamSource = sourceSplit.getBeamSplitSource();
+    if (beamSource instanceof BoundedSource) {
+      return ((BoundedSource<T>) beamSource).createReader(pipelineOptions);
+    } else if (beamSource instanceof UnboundedSource) {
+      return createUnboundedSourceReader(beamSource, sourceSplit.getSplitState());
+    } else {
+      throw new IllegalStateException("Unknown source type " + beamSource.getClass());
+    }
+  }
+
+  private <CheckpointMarkT extends UnboundedSource.CheckpointMark>

Review Comment:
   static?



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat.SplitEnumeratorCompat;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Flink {@link org.apache.flink.api.connector.source.SplitEnumerator SplitEnumerator}
+ * implementation that holds a Beam {@link Source} and does the following:
+ *
+ * <ul>
+ *   <li>Split the Beam {@link Source} to desired number of splits.
+ *   <li>Assign the splits to the Flink Source Reader.
+ * </ul>
+ *
+ * <p>Note that at this point, this class has a static round-robin split assignment strategy.
+ *
+ * @param <T> The output type of the encapsulated Beam {@link Source}.
+ */
+public class FlinkSourceSplitEnumerator<T>
+    implements SplitEnumeratorCompat<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> {
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkSourceSplitEnumerator.class);
+  private final SplitEnumeratorContext<FlinkSourceSplit<T>> context;
+  private final Source<T> beamSource;
+  private final PipelineOptions pipelineOptions;
+  private final int numSplits;
+  private final Map<Integer, List<FlinkSourceSplit<T>>> pendingSplits;
+  private boolean splitsInitialized;
+
+  public FlinkSourceSplitEnumerator(
+      SplitEnumeratorContext<FlinkSourceSplit<T>> context,
+      Source<T> beamSource,
+      PipelineOptions pipelineOptions,
+      int numSplits) {
+    this.context = context;
+    this.beamSource = beamSource;
+    this.pipelineOptions = pipelineOptions;
+    this.numSplits = numSplits;
+    this.pendingSplits = new HashMap<>(numSplits);
+    this.splitsInitialized = false;
+  }
+
+  @Override
+  public void start() {
+    context.callAsync(
+        () -> {
+          try {
+            List<? extends Source<T>> beamSplitSourceList = splitBeamSource();
+            Map<Integer, List<FlinkSourceSplit<T>>> flinkSourceSplitsList = new HashMap<>();
+            int i = 0;
+            for (Source<T> beamSplitSource : beamSplitSourceList) {
+              int targetSubtask = i % context.currentParallelism();
+              List<FlinkSourceSplit<T>> splitsForTask =
+                  flinkSourceSplitsList.computeIfAbsent(
+                      targetSubtask, ignored -> new ArrayList<>());
+              splitsForTask.add(new FlinkSourceSplit<>(i, beamSplitSource));
+              i++;
+            }
+            return flinkSourceSplitsList;
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        },
+        (sourceSplits, error) -> {
+          if (error != null) {
+            throw new RuntimeException("Failed to start source enumerator.", error);
+          } else {
+            pendingSplits.putAll(sourceSplits);
+            splitsInitialized = true;
+            sendPendingSplitsToSourceReaders();
+          }
+        });
+  }
+
+  @Override
+  public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
+    // Not used.
+  }
+
+  @Override
+  public void addSplitsBack(List<FlinkSourceSplit<T>> splits, int subtaskId) {
+    LOG.info("Adding splits {} back from subtask {}", splits, subtaskId);
+    List<FlinkSourceSplit<T>> splitsForSubtask =
+        pendingSplits.computeIfAbsent(subtaskId, ignored -> new ArrayList<>());
+    splitsForSubtask.addAll(splits);
+  }
+
+  @Override
+  public void addReader(int subtaskId) {
+    List<FlinkSourceSplit<T>> splitsForSubtask = pendingSplits.remove(subtaskId);
+    if (splitsForSubtask != null) {
+      assignSplitsAndLog(splitsForSubtask, subtaskId);
+      pendingSplits.remove(subtaskId);
+    } else {
+      if (splitsInitialized) {
+        LOG.info("There is no split for subtask {}. Signaling no more splits.", subtaskId);
+        context.signalNoMoreSplits(subtaskId);
+      }
+    }
+  }
+
+  @Override
+  public Map<Integer, List<FlinkSourceSplit<T>>> snapshotState(long checkpointId) throws Exception {
+    LOG.info("Taking snapshot for checkpoint {}", checkpointId);
+    return snapshotState();
+  }
+
+  @Override
+  public Map<Integer, List<FlinkSourceSplit<T>>> snapshotState() throws Exception {
+    return pendingSplits;
+  }
+
+  @Override
+  public void close() throws IOException {
+    // NoOp
+  }
+
+  // -------------- Private helper methods ----------------------
+  private List<? extends Source<T>> splitBeamSource() throws Exception {
+    if (beamSource instanceof BoundedSource) {
+      BoundedSource<T> boundedSource = (BoundedSource<T>) beamSource;
+      long desiredSizeBytes = boundedSource.getEstimatedSizeBytes(pipelineOptions) / numSplits;
+      return boundedSource.split(desiredSizeBytes, pipelineOptions);
+    } else if (beamSource instanceof UnboundedSource) {
+      return ((UnboundedSource<T, ?>) beamSource).split(numSplits, pipelineOptions);
+    } else {
+      throw new IllegalStateException("Unknown source type " + beamSource.getClass());
+    }
+  }
+
+  private void sendPendingSplitsToSourceReaders() {
+    Iterator<Map.Entry<Integer, List<FlinkSourceSplit<T>>>> splitIter =
+        pendingSplits.entrySet().iterator();
+    while (splitIter.hasNext()) {
+      Map.Entry<Integer, List<FlinkSourceSplit<T>>> entry = splitIter.next();
+      int readerIndex = entry.getKey();
+      int targetSubtask = readerIndex % context.currentParallelism();

Review Comment:
   By reading the above code, seems the key is already the targetSubtask, so maybe we don't need to do this again?



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat.FlinkSourceCompat;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.metrics.Counter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An abstract implementation of {@link SourceReader} which encapsulates {@link Source Beam Sources}
+ * for data reading.
+ *
+ * <ol>
+ *   <li>Idle timeout support.
+ *   <li>Splits addition handling.
+ *   <li>Split reader creation and management.
+ *   <li>checkpoint management
+ * </ol>
+ *
+ * <p>This implementation provides unified logic for both {@link BoundedSource} and {@link
+ * UnboundedSource}. The subclasses are expected to only implement the {@link
+ * #pollNext(ReaderOutput)} method.
+ *
+ * @param <OutputT> the output element type from the encapsulated {@link Source Beam sources.}
+ */
+public abstract class FlinkSourceReaderBase<T, OutputT>
+    implements SourceReader<OutputT, FlinkSourceSplit<T>> {
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkSourceReaderBase.class);
+  protected static final CompletableFuture<Void> AVAILABLE_NOW =
+      CompletableFuture.completedFuture(null);
+  // Some dummy instances to make the annotation checker happy with AtomicReference.
+  protected static final CompletableFuture<Void> DUMMY_FUTURE = new CompletableFuture<>();
+  protected static final Exception NO_EXCEPTION = new Exception();
+
+  protected final PipelineOptions pipelineOptions;
+  protected final @Nullable Function<OutputT, Long> timestampExtractor;
+  private final Queue<FlinkSourceSplit<T>> sourceSplits = new ArrayDeque<>();
+  // This needs to be a ConcurrentHashMap because the metric retrieving thread may access it.
+  private final ConcurrentMap<Integer, ReaderAndOutput> beamSourceReaders;
+  protected final SourceReaderContext context;
+  private final ScheduledExecutorService executor;
+
+  protected final Counter numRecordsInCounter;
+  protected final long idleTimeoutMs;
+  private final CompletableFuture<Void> idleTimeoutFuture;
+  private final AtomicReference<Throwable> exception;
+  private boolean idleTimeoutCountingDown;
+  private CompletableFuture<Void> waitingForSplitChangeFuture;
+  private boolean noMoreSplits;
+
+  protected FlinkSourceReaderBase(
+      SourceReaderContext context,
+      PipelineOptions pipelineOptions,
+      @Nullable Function<OutputT, Long> timestampExtractor) {
+    this(
+        Executors.newSingleThreadScheduledExecutor(
+            r -> new Thread(r, "FlinkSource-Executor-Thread-" + context.getIndexOfSubtask())),
+        context,
+        pipelineOptions,
+        timestampExtractor);
+  }
+
+  protected FlinkSourceReaderBase(
+      ScheduledExecutorService executor,
+      SourceReaderContext context,
+      PipelineOptions pipelineOptions,
+      @Nullable Function<OutputT, Long> timestampExtractor) {
+    this.context = context;
+    this.pipelineOptions = pipelineOptions;
+    this.timestampExtractor = timestampExtractor;
+    this.beamSourceReaders = new ConcurrentHashMap<>();
+    this.exception = new AtomicReference<>(NO_EXCEPTION);
+    this.executor = executor;
+    this.idleTimeoutMs =
+        pipelineOptions.as(FlinkPipelineOptions.class).getShutdownSourcesAfterIdleMs();
+    this.idleTimeoutFuture = new CompletableFuture<>();
+    this.waitingForSplitChangeFuture = new CompletableFuture<>();
+    this.idleTimeoutCountingDown = false;
+    // TODO: Remove the casting and use SourceReaderMetricGroup after minimum FLink version is
+    // upgraded to 1.14 and above.
+    this.numRecordsInCounter = FlinkSourceCompat.getNumRecordsInCounter(context);
+  }
+
+  @Override
+  public void start() {}
+
+  @Override
+  public List<FlinkSourceSplit<T>> snapshotState(long checkpointId) {
+    checkExceptionAndMaybeThrow();
+    // Add all the source splits whose readers haven't been created.
+    List<FlinkSourceSplit<T>> splitsState = new ArrayList<>(sourceSplits);
+
+    // Add all the source splits being actively read.
+    beamSourceReaders.forEach(
+        (splitId, readerAndOutput) -> {
+          Source.Reader<T> reader = readerAndOutput.reader;
+          if (reader instanceof BoundedSource.BoundedReader) {
+            // Sometimes users may decide to run a bounded source in streaming mode as "finite
+            // stream."
+            // For bounded source, the checkpoint granularity is the entire source split.
+            // So, in case of failure, all the data from this split will be consumed again.
+            splitsState.add(new FlinkSourceSplit<>(splitId, reader.getCurrentSource()));
+          } else if (reader instanceof UnboundedSource.UnboundedReader) {
+            // The checkpoint for unbounded sources is fine granular.
+            byte[] checkpointState =
+                getAndEncodeCheckpointMark((UnboundedSource.UnboundedReader<OutputT>) reader);
+            splitsState.add(
+                new FlinkSourceSplit<>(splitId, reader.getCurrentSource(), checkpointState));
+          }
+        });
+    return splitsState;
+  }
+
+  @Override
+  public CompletableFuture<Void> isAvailable() {
+    checkExceptionAndMaybeThrow();
+    if (!sourceSplits.isEmpty() || !beamSourceReaders.isEmpty()) {
+      // There are still live readers.
+      return isAvailableForAliveReaders();
+    } else if (noMoreSplits) {
+      // All the splits have been read, wait for idle timeout.
+      checkIdleTimeoutAndMaybeStartCountdown();
+      return idleTimeoutFuture;
+    } else {
+      // There is no live readers, waiting for new split assignments or no more splits notification.
+      if (waitingForSplitChangeFuture.isDone()) {
+        waitingForSplitChangeFuture = new CompletableFuture<>();
+      }
+      return waitingForSplitChangeFuture;
+    }
+  }
+
+  @Override
+  public void notifyNoMoreSplits() {
+    checkExceptionAndMaybeThrow();
+    LOG.info("Received NoMoreSplits signal from enumerator.");
+    noMoreSplits = true;
+    waitingForSplitChangeFuture.complete(null);
+  }
+
+  @Override
+  public void addSplits(List<FlinkSourceSplit<T>> splits) {
+    checkExceptionAndMaybeThrow();
+    LOG.info("Adding splits {}", splits);
+    sourceSplits.addAll(splits);
+    waitingForSplitChangeFuture.complete(null);
+  }
+
+  @Override
+  public void close() throws Exception {
+    for (ReaderAndOutput readerAndOutput : beamSourceReaders.values()) {
+      readerAndOutput.reader.close();
+    }
+    executor.shutdown();
+  }
+
+  // ----------------- protected abstract methods ----------------------
+
+  /**
+   * This method needs to be overridden by subclasses to determine if data is available when there
+   * are alive readers. For example, an unbounded source may not have any source split ready for
+   * data emission even if all the sources are still alive. Whereas for the bounded source, data is
+   * always available as long as there are alive readers.
+   */
+  protected abstract CompletableFuture<Void> isAvailableForAliveReaders();
+
+  // ----------------- protected helper methods for subclasses --------------------
+
+  protected Optional<ReaderAndOutput> createAndTrackNextReader() throws IOException {
+    FlinkSourceSplit<T> sourceSplit = sourceSplits.poll();
+    if (sourceSplit != null) {
+      Source.Reader<T> reader = createReader(sourceSplit);
+      ReaderAndOutput readerAndOutput = new ReaderAndOutput(sourceSplit.splitId(), reader, false);
+      beamSourceReaders.put(sourceSplit.splitIndex(), readerAndOutput);
+      return Optional.of(readerAndOutput);
+    }
+    return Optional.empty();
+  }
+
+  protected void finishSplit(int splitIndex) throws IOException {
+    ReaderAndOutput readerAndOutput = beamSourceReaders.remove(splitIndex);
+    if (readerAndOutput != null) {
+      LOG.info("Finished reading from split {}", readerAndOutput.splitId);
+      readerAndOutput.reader.close();
+    } else {
+      throw new IllegalStateException(
+          "SourceReader for split " + splitIndex + " should never be null!");
+    }
+  }
+
+  protected boolean checkIdleTimeoutAndMaybeStartCountdown() {
+    if (idleTimeoutMs <= 0) {
+      idleTimeoutFuture.complete(null);
+    } else if (!idleTimeoutCountingDown) {
+      scheduleTask(() -> idleTimeoutFuture.complete(null), idleTimeoutMs);
+      idleTimeoutCountingDown = true;
+    }
+    return idleTimeoutFuture.isDone();
+  }
+
+  protected boolean noMoreSplits() {
+    return noMoreSplits;
+  }
+
+  protected void scheduleTask(Runnable runnable, long delayMs) {
+    ignoreReturnValue(
+        executor.schedule(new ErrorRecordingRunnable(runnable), delayMs, TimeUnit.MILLISECONDS));
+  }
+
+  protected void scheduleTaskAtFixedRate(Runnable runnable, long delayMs, long periodMs) {
+    ignoreReturnValue(
+        executor.scheduleAtFixedRate(
+            new ErrorRecordingRunnable(runnable), delayMs, periodMs, TimeUnit.MILLISECONDS));
+  }
+
+  protected void execute(Runnable runnable) {
+    executor.execute(new ErrorRecordingRunnable(runnable));
+  }
+
+  protected void recordException(Throwable e) {
+    if (!exception.compareAndSet(NO_EXCEPTION, e)) {
+      exception.get().addSuppressed(e);
+    }
+  }
+
+  protected void checkExceptionAndMaybeThrow() {
+    if (exception.get() != NO_EXCEPTION) {
+      throw new RuntimeException("The source reader received exception.", exception.get());
+    }
+  }
+
+  protected boolean hasException() {
+    return exception.get() != NO_EXCEPTION;
+  }
+
+  protected Collection<FlinkSourceSplit<T>> sourceSplits() {
+    return Collections.unmodifiableCollection(sourceSplits);
+  }
+
+  protected Map<Integer, ReaderAndOutput> allReaders() {
+    return Collections.unmodifiableMap(beamSourceReaders);
+  }
+
+  protected static void ignoreReturnValue(Object o) {
+    // do nothing.
+  }
+  // ------------------------------ private methods ------------------------------
+
+  @SuppressWarnings("unchecked")
+  private <CheckpointMarkT extends UnboundedSource.CheckpointMark>
+      byte[] getAndEncodeCheckpointMark(UnboundedSource.UnboundedReader<OutputT> reader) {
+    UnboundedSource<OutputT, CheckpointMarkT> source =
+        (UnboundedSource<OutputT, CheckpointMarkT>) reader.getCurrentSource();
+    CheckpointMarkT checkpointMark = (CheckpointMarkT) reader.getCheckpointMark();
+    Coder<CheckpointMarkT> coder = source.getCheckpointMarkCoder();
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+      coder.encode(checkpointMark, baos);
+      return baos.toByteArray();
+    } catch (IOException ioe) {
+      throw new RuntimeException("Failed to encode checkpoint mark.", ioe);
+    }
+  }
+
+  private Source.Reader<T> createReader(@Nonnull FlinkSourceSplit<T> sourceSplit)

Review Comment:
   static?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org