You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "mosche (via GitHub)" <gi...@apache.org> on 2023/04/04 10:38:02 UTC

[GitHub] [beam] mosche commented on a diff in pull request #23540: Kinesis enhanced fanout

mosche commented on code in PR #23540:
URL: https://github.com/apache/beam/pull/23540#discussion_r1156763384


##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOKinesisReader.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.kinesis;
+
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+
+class EFOKinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(EFOKinesisReader.class);
+
+  private final KinesisIO.Read spec;
+  private final String consumerArn;
+  private final KinesisAsyncClient kinesis;
+  private final KinesisSource source;
+  private final KinesisReaderCheckpoint initCheckpoint;
+
+  private @Nullable KinesisRecord currentRecord = null;
+  private @Nullable EFOShardSubscribersPool shardSubscribersPool = null;
+
+  EFOKinesisReader(
+      KinesisIO.Read spec,
+      String consumerArn,
+      KinesisAsyncClient kinesis,
+      KinesisReaderCheckpoint initCheckpoint,
+      KinesisSource source) {
+    this.spec = checkArgumentNotNull(spec);

Review Comment:
   We should be able to rely on checker here. These are all not nullable, so you could remove `checkArgumentNotNull`.



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOKinesisReader.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.kinesis;
+
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+
+class EFOKinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(EFOKinesisReader.class);
+
+  private final KinesisIO.Read spec;
+  private final String consumerArn;
+  private final KinesisAsyncClient kinesis;
+  private final KinesisSource source;
+  private final KinesisReaderCheckpoint initCheckpoint;
+
+  private @Nullable KinesisRecord currentRecord = null;
+  private @Nullable EFOShardSubscribersPool shardSubscribersPool = null;
+
+  EFOKinesisReader(
+      KinesisIO.Read spec,
+      String consumerArn,
+      KinesisAsyncClient kinesis,
+      KinesisReaderCheckpoint initCheckpoint,
+      KinesisSource source) {
+    this.spec = checkArgumentNotNull(spec);
+    this.consumerArn = checkArgumentNotNull(consumerArn);
+    this.kinesis = checkArgumentNotNull(kinesis);
+    this.initCheckpoint = checkArgumentNotNull(initCheckpoint);
+    this.source = source;
+  }
+
+  @Override
+  @SuppressWarnings("dereference.of.nullable")
+  public boolean start() throws IOException {
+    LOG.info("Starting reader using {}", initCheckpoint);
+    try {
+      shardSubscribersPool = createPool();
+      shardSubscribersPool.start(initCheckpoint);

Review Comment:
   So you can remove `@ SuppressWarnings`
   ```suggestion
         shardSubscribersPool().start(initCheckpoint);
   ```



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscribersPool.java:
##########
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.kinesis;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ForwardingIterator;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ChildShard;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import software.amazon.kinesis.common.InitialPositionInStream;
+import software.amazon.kinesis.retrieval.AggregatorUtil;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+@SuppressWarnings({"nullness"})
+class EFOShardSubscribersPool {
+  private static final Logger LOG = LoggerFactory.getLogger(EFOShardSubscribersPool.class);
+  private static final int ON_ERROR_COOL_DOWN_MS_DEFAULT = 1_000;

Review Comment:
   Nitpick, considering this is just a constant at this point, how about moving it into the subscriber to make things more transparent?



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscriber.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.kinesis;
+
+import static org.apache.beam.sdk.io.aws2.kinesis.EFOShardSubscriber.State.INITIALIZED;
+import static org.apache.beam.sdk.io.aws2.kinesis.EFOShardSubscriber.State.PAUSED;
+import static org.apache.beam.sdk.io.aws2.kinesis.EFOShardSubscriber.State.RUNNING;
+import static org.apache.beam.sdk.io.aws2.kinesis.EFOShardSubscriber.State.STOPPED;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import io.netty.channel.ChannelException;
+import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+class EFOShardSubscriber {
+  enum State {
+    INITIALIZED, // Initialized, but not started yet
+    RUNNING, // Subscriber started
+    PAUSED, // Subscriber paused due to backpressure
+    STOPPED // Subscriber stopped
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(EFOShardSubscriber.class);
+  private static final Integer IN_FLIGHT_LIMIT = 10;
+
+  private final EFOShardSubscribersPool pool;
+  private final String consumerArn;
+
+  // Shard id of this subscriber
+  private final String shardId;
+
+  private final KinesisAsyncClient kinesis;
+
+  /** Internal subscriber state. */
+  private volatile State state = INITIALIZED;
+
+  /**
+   * Kept only for cases when a subscription starts and then fails with a non-critical error, before
+   * any event updates {@link ShardEventsSubscriber#sequenceNumber}.
+   */
+  private @MonotonicNonNull StartingPosition initialPosition;
+
+  /**
+   * Completes once this shard subscriber is done, either normally (stopped or shard is completely
+   * consumed) or exceptionally due to a non retry-able error.
+   */
+  private final CompletableFuture<Void> done = new CompletableFuture<>();
+
+  private final ShardEventsSubscriber eventsSubscriber = new ShardEventsSubscriber();
+
+  /** Tracks number of delivered events in flight (until ack-ed). */
+  private final AtomicInteger inFlight = new AtomicInteger();
+
+  /**
+   * Async completion handler for {@link KinesisAsyncClient#subscribeToShard} that:
+   * <li>exists immediately if {@link #done} is already completed (exceptionally),
+   * <li>re-subscribes at {@link ShardEventsSubscriber#sequenceNumber} for retryable errors such as
+   *     retryable {@link SdkException}, {@link ClosedChannelException}, {@link ChannelException},
+   *     {@link TimeoutException} (any of these might be wrapped in {@link CompletionException}s)
+   * <li>or completes {@link #done} exceptionally for any other error,
+   * <li>completes {@link #done} normally if subscriber {@link #state} is {@link State#STOPPED} or
+   *     if shard completed (no further {@link ShardEventsSubscriber#sequenceNumber}),
+   * <li>or otherwise re-subscribes at {@link ShardEventsSubscriber#sequenceNumber}.
+   */
+  private final BiConsumer<Void, Throwable> reSubscriptionHandler;
+
+  private static boolean isRetryable(Throwable error) {
+    Throwable cause = unwrapCompletionException(error);
+    if (cause instanceof SdkException && ((SdkException) cause).retryable()) {
+      return true; // retryable SDK exception
+    }
+    // check the root cause for issues that can be addressed using retries
+    cause = Throwables.getRootCause(cause);
+    return cause instanceof ClosedChannelException // Java Nio
+        || cause instanceof TimeoutException // Java
+        || cause instanceof ChannelException; // Netty (e.g. ReadTimeoutException)
+  }
+
+  /** Loops through completion exceptions until we get the underlying cause. */
+  private static Throwable unwrapCompletionException(Throwable completionException) {
+    Throwable current = completionException;
+    while (current instanceof CompletionException) {
+      Throwable cause = current.getCause();
+      if (cause != null) {
+        current = cause;
+      } else {
+        return current;
+      }
+    }
+    return current;
+  }
+
+  @SuppressWarnings({"FutureReturnValueIgnored", "all"})
+  EFOShardSubscriber(
+      EFOShardSubscribersPool pool,
+      String shardId,
+      KinesisIO.Read read,
+      String consumerArn,
+      KinesisAsyncClient kinesis,
+      int onErrorCoolDownMs) {
+    this.pool = pool;
+    this.consumerArn = consumerArn;
+    this.shardId = shardId;
+    this.kinesis = kinesis;
+    this.reSubscriptionHandler =
+        (Void unused, Throwable error) -> {
+          eventsSubscriber.cancel();
+
+          if (error != null && !isRetryable(error)) {
+            done.completeExceptionally(error);
+            return;
+          }
+
+          if (error != null && isRetryable(error) && state != STOPPED) {
+            String lastContinuationSequenceNumber = eventsSubscriber.sequenceNumber;
+            if (inFlight.get() == IN_FLIGHT_LIMIT) {
+              state = PAUSED;
+            } else {
+              if (lastContinuationSequenceNumber != null) {
+                pool.delayedTask(
+                    () -> internalReSubscribe(lastContinuationSequenceNumber), onErrorCoolDownMs);
+              } else {
+                pool.delayedTask(() -> internalSubscribe(initialPosition), onErrorCoolDownMs);
+              }
+            }
+            return;
+          }
+
+          String lastContinuationSequenceNumber = eventsSubscriber.sequenceNumber;
+
+          // happy-path re-subscribe, subscription was complete by the SDK after 5 mins
+          if (error == null && state != STOPPED && lastContinuationSequenceNumber != null) {
+            pool.delayedTask(() -> internalReSubscribe(lastContinuationSequenceNumber), 0);

Review Comment:
   Nitpick, why using `delayedTask` with 0 instead of directly triggering `internalReSubscribe`?



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscribersPool.java:
##########
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.kinesis;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ForwardingIterator;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ChildShard;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import software.amazon.kinesis.common.InitialPositionInStream;
+import software.amazon.kinesis.retrieval.AggregatorUtil;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+@SuppressWarnings({"nullness"})
+class EFOShardSubscribersPool {
+  private static final Logger LOG = LoggerFactory.getLogger(EFOShardSubscribersPool.class);
+  private static final int ON_ERROR_COOL_DOWN_MS_DEFAULT = 1_000;
+  private final int onErrorCoolDownMs;
+
+  /**
+   * Identifier of the current subscribers pool.
+   *
+   * <p>Injected into other objects which belong to this pool to ease tracing with logs.
+   */
+  private final UUID poolId;

Review Comment:
   Nitpick, if this is just for correlation in logs uuids are overkill and hard to read. You could just generate short (3?) char id using `RandomStringUtils` or even use incremental ids from a static atomicinteger



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscriber.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.kinesis;
+
+import static org.apache.beam.sdk.io.aws2.kinesis.EFOShardSubscriber.State.INITIALIZED;
+import static org.apache.beam.sdk.io.aws2.kinesis.EFOShardSubscriber.State.PAUSED;
+import static org.apache.beam.sdk.io.aws2.kinesis.EFOShardSubscriber.State.RUNNING;
+import static org.apache.beam.sdk.io.aws2.kinesis.EFOShardSubscriber.State.STOPPED;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import io.netty.channel.ChannelException;
+import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+class EFOShardSubscriber {
+  enum State {
+    INITIALIZED, // Initialized, but not started yet
+    RUNNING, // Subscriber started
+    PAUSED, // Subscriber paused due to backpressure
+    STOPPED // Subscriber stopped
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(EFOShardSubscriber.class);
+  private static final Integer IN_FLIGHT_LIMIT = 10;
+
+  private final EFOShardSubscribersPool pool;
+  private final String consumerArn;
+
+  // Shard id of this subscriber
+  private final String shardId;
+
+  private final KinesisAsyncClient kinesis;
+
+  /** Internal subscriber state. */
+  private volatile State state = INITIALIZED;
+
+  /**
+   * Kept only for cases when a subscription starts and then fails with a non-critical error, before
+   * any event updates {@link ShardEventsSubscriber#sequenceNumber}.
+   */
+  private @MonotonicNonNull StartingPosition initialPosition;
+
+  /**
+   * Completes once this shard subscriber is done, either normally (stopped or shard is completely
+   * consumed) or exceptionally due to a non retry-able error.
+   */
+  private final CompletableFuture<Void> done = new CompletableFuture<>();
+
+  private final ShardEventsSubscriber eventsSubscriber = new ShardEventsSubscriber();
+
+  /** Tracks number of delivered events in flight (until ack-ed). */
+  private final AtomicInteger inFlight = new AtomicInteger();
+
+  /**
+   * Async completion handler for {@link KinesisAsyncClient#subscribeToShard} that:
+   * <li>exists immediately if {@link #done} is already completed (exceptionally),
+   * <li>re-subscribes at {@link ShardEventsSubscriber#sequenceNumber} for retryable errors such as
+   *     retryable {@link SdkException}, {@link ClosedChannelException}, {@link ChannelException},
+   *     {@link TimeoutException} (any of these might be wrapped in {@link CompletionException}s)
+   * <li>or completes {@link #done} exceptionally for any other error,
+   * <li>completes {@link #done} normally if subscriber {@link #state} is {@link State#STOPPED} or
+   *     if shard completed (no further {@link ShardEventsSubscriber#sequenceNumber}),
+   * <li>or otherwise re-subscribes at {@link ShardEventsSubscriber#sequenceNumber}.
+   */
+  private final BiConsumer<Void, Throwable> reSubscriptionHandler;
+
+  private static boolean isRetryable(Throwable error) {
+    Throwable cause = unwrapCompletionException(error);
+    if (cause instanceof SdkException && ((SdkException) cause).retryable()) {
+      return true; // retryable SDK exception
+    }
+    // check the root cause for issues that can be addressed using retries
+    cause = Throwables.getRootCause(cause);
+    return cause instanceof ClosedChannelException // Java Nio
+        || cause instanceof TimeoutException // Java
+        || cause instanceof ChannelException; // Netty (e.g. ReadTimeoutException)
+  }
+
+  /** Loops through completion exceptions until we get the underlying cause. */
+  private static Throwable unwrapCompletionException(Throwable completionException) {
+    Throwable current = completionException;
+    while (current instanceof CompletionException) {
+      Throwable cause = current.getCause();
+      if (cause != null) {
+        current = cause;
+      } else {
+        return current;
+      }
+    }
+    return current;
+  }
+
+  @SuppressWarnings({"FutureReturnValueIgnored", "all"})
+  EFOShardSubscriber(
+      EFOShardSubscribersPool pool,
+      String shardId,
+      KinesisIO.Read read,
+      String consumerArn,
+      KinesisAsyncClient kinesis,
+      int onErrorCoolDownMs) {
+    this.pool = pool;
+    this.consumerArn = consumerArn;
+    this.shardId = shardId;
+    this.kinesis = kinesis;
+    this.reSubscriptionHandler =
+        (Void unused, Throwable error) -> {
+          eventsSubscriber.cancel();
+
+          if (error != null && !isRetryable(error)) {
+            done.completeExceptionally(error);
+            return;
+          }
+
+          if (error != null && isRetryable(error) && state != STOPPED) {
+            String lastContinuationSequenceNumber = eventsSubscriber.sequenceNumber;
+            if (inFlight.get() == IN_FLIGHT_LIMIT) {
+              state = PAUSED;
+            } else {
+              if (lastContinuationSequenceNumber != null) {
+                pool.delayedTask(
+                    () -> internalReSubscribe(lastContinuationSequenceNumber), onErrorCoolDownMs);
+              } else {
+                pool.delayedTask(() -> internalSubscribe(initialPosition), onErrorCoolDownMs);
+              }
+            }
+            return;
+          }
+
+          String lastContinuationSequenceNumber = eventsSubscriber.sequenceNumber;
+
+          // happy-path re-subscribe, subscription was complete by the SDK after 5 mins
+          if (error == null && state != STOPPED && lastContinuationSequenceNumber != null) {
+            pool.delayedTask(() -> internalReSubscribe(lastContinuationSequenceNumber), 0);
+            return;
+          }
+
+          // shard is fully consumed - re-shard happened
+          if (error == null && state != STOPPED && lastContinuationSequenceNumber == null) {
+            done.complete(null);
+            return;
+          }
+
+          String msg =
+              String.format(
+                  "Unknown case which is likely a bug: state=%s seqnum=%s",
+                  state, lastContinuationSequenceNumber);
+          LOG.warn(msg);
+          done.completeExceptionally(new IllegalStateException(msg));
+        };
+  }
+
+  /**
+   * Subscribes to shard {@link #shardId} at starting position and automatically re-subscribes when
+   * necessary using {@link #reSubscriptionHandler}.
+   *
+   * <p>Note:
+   * <li>{@link #subscribe} may only ever be invoked once by an external caller.
+   * <li>The re-subscription is hidden from the external caller. To the outside it looks this
+   *     subscriber is always subscribed to the shard once {@link #subscribe} was called.
+   *
+   * @return {@link #done} to signal completion of this subscriber, normally (stopped or shard is
+   *     completely consumed) or exceptionally due to a non retryable error.
+   */
+  CompletableFuture<Void> subscribe(StartingPosition position) {
+    checkState(state == INITIALIZED, "Subscriber was already started");
+    initialPosition = position;
+    return internalSubscribe(position);
+  }
+
+  private CompletableFuture<Void> internalReSubscribe(String sequenceNumber) {
+    return internalSubscribe(
+        StartingPosition.builder()
+            .type(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
+            .sequenceNumber(sequenceNumber)
+            .build());
+  }
+
+  @SuppressWarnings("FutureReturnValueIgnored")
+  private CompletableFuture<Void> internalSubscribe(StartingPosition position) {
+    SubscribeToShardRequest request = subscribeRequest(position);
+    LOG.info("Pool {} Shard {} starting subscribe request {}", pool.getPoolId(), shardId, request);
+    try {
+      kinesis.subscribeToShard(request, responseHandler()).whenComplete(reSubscriptionHandler);
+      return done;
+    } catch (Exception e) {
+      done.completeExceptionally(e);
+      return done;
+    }
+  }
+
+  private SubscribeToShardRequest subscribeRequest(StartingPosition position) {
+    return SubscribeToShardRequest.builder()
+        .consumerARN(consumerArn)
+        .shardId(shardId)
+        .startingPosition(position)
+        .build();
+  }
+
+  private SubscribeToShardResponseHandler responseHandler() {
+    return SubscribeToShardResponseHandler.builder().subscriber(() -> eventsSubscriber).build();
+  }
+
+  /**
+   * Cancels shard subscriber. Sets {@link #state} to {@link State#STOPPED} and invokes {@link
+   * ShardEventsSubscriber#cancel()} if defined.
+   */
+  void cancel() {
+    LOG.info("Pool {} Shard {} cancelling", pool.getPoolId(), shardId);
+    if (state != STOPPED && eventsSubscriber != null) {
+      eventsSubscriber.cancel();
+      state = STOPPED;
+    }
+  }
+
+  /**
+   * Decrements events {@link #inFlight} and, if previously at the limit, requests a next event from
+   * {@link ShardEventsSubscriber#subscription} (if active).
+   */
+  @SuppressWarnings("FutureReturnValueIgnored")
+  void ackEvent() {
+    int prevInFlight = inFlight.getAndDecrement();
+    if (state == PAUSED) {
+      state = RUNNING;
+      internalReSubscribe(checkStateNotNull(eventsSubscriber.sequenceNumber));
+    } else if (prevInFlight == IN_FLIGHT_LIMIT) {
+      Subscription s = eventsSubscriber.subscription;
+      if (s != null) {
+        s.request(1);
+      }
+    }
+  }
+
+  /** Subscriber to the Kinesis event stream. */
+  private class ShardEventsSubscriber
+      implements Subscriber<SubscribeToShardEventStream>, SubscribeToShardResponseHandler.Visitor {
+    /** Tracks continuation sequence number. */
+    @Nullable String sequenceNumber;
+    /** Current active subscription to request more events or cancel it. */
+    @Nullable Subscription subscription;
+
+    /** Cancels {@link #subscription}. */
+    void cancel() {
+      if (subscription != null) {
+        subscription.cancel();
+      }
+      subscription = null;
+    }
+
+    /**
+     * Handles new established {@link Subscription}.
+     *
+     * <p>Cancels subscription immediately if {@link EFOShardSubscriber#state} is {@link
+     * State#STOPPED} already. Otherwise, if below the {@link #inFlight} limit, the first event is
+     * requested.
+     */
+    @Override
+    public void onSubscribe(Subscription subscription) {
+      this.subscription = subscription;
+      if (state == STOPPED) {
+        cancel();
+      } else if (inFlight.get() < IN_FLIGHT_LIMIT) {
+        subscription.request(1);
+      }
+    }
+
+    /**
+     * Handles {@link SubscribeToShardEvent} and forwards it to {@link
+     * EFOShardSubscribersPool#enqueueEvent}.
+     *
+     * <p>This increments {@link #inFlight} and immediately {@link Subscription#request requests}
+     * another event if below the {@link #inFlight} limit. Otherwise this is done on the next {@link
+     * #ackEvent()}.
+     *
+     * <p>This continuously updates {@link #sequenceNumber}. In case of any exception, {@link

Review Comment:
   Pls remove the 2nd sentence, looks like we've changed the initial behavior here.



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscribersPool.java:
##########
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.kinesis;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ForwardingIterator;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ChildShard;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import software.amazon.kinesis.common.InitialPositionInStream;
+import software.amazon.kinesis.retrieval.AggregatorUtil;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+@SuppressWarnings({"nullness"})
+class EFOShardSubscribersPool {
+  private static final Logger LOG = LoggerFactory.getLogger(EFOShardSubscribersPool.class);
+  private static final int ON_ERROR_COOL_DOWN_MS_DEFAULT = 1_000;
+  private final int onErrorCoolDownMs;
+
+  /**
+   * Identifier of the current subscribers pool.
+   *
+   * <p>Injected into other objects which belong to this pool to ease tracing with logs.
+   */
+  private final UUID poolId;
+
+  private final KinesisIO.Read read;
+  private final String consumerArn;
+  private final KinesisAsyncClient kinesis;
+
+  /**
+   * Unbounded queue of events, but events in-flight are limited by the {@link EFOShardSubscriber}.
+   */
+  private final ConcurrentLinkedQueue<EventRecords> eventQueue = new ConcurrentLinkedQueue<>();
+
+  /**
+   * State map of currently active shards that can be checkpoint-ed.
+   *
+   * <p>This map may only be accessed and updated from within {@link #start}, {@link #getNextRecord}
+   * and dependent {@link #onEventDone} to prevent race conditions.
+   */
+  private final Map<String, ShardState> state = new HashMap<>();
+
+  /**
+   * Async subscription error (as first seen), if set all subscribers must be cancelled and no new
+   * ones started.
+   *
+   * <p>Must be volatile as it is accessed from various threads. But it's best effort, setting this
+   * doesn't have to be atomic.
+   */
+  private volatile @MonotonicNonNull Throwable subscriptionError;
+
+  /**
+   * May only ever be altered from within {@link #stop()} or {@link #getNextRecord()} to prevent
+   * race conditions when cancelling subscribers.
+   */
+  private boolean isStopped = false;
+
+  /**
+   * Async completion callback handling {@link EFOShardSubscriber#subscribe supscriptions} that
+   * terminate exceptionally.
+   *
+   * <p>Unless already in error state, stores error as {@link #subscriptionError}. This pool will be
+   * stopped when {@link #getNextRecord()} is called next, but allowing the {@link #eventQueue} to
+   * be drained. Only once empty any {@link #subscriptionError} is propagated. This simplifies state
+   * management and checkpointing a lot.
+   */
+  private final BiConsumer<Void, Throwable> errorHandler =
+      (Void unused, Throwable error) -> {
+        if (error != null && subscriptionError == null) {
+          subscriptionError = error;
+        }
+      };
+
+  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+  // EventRecords iterator that is currently consumed
+  @Nullable EventRecords current = null;
+
+  private final WatermarkPolicy watermarkPolicy;
+
+  EFOShardSubscribersPool(KinesisIO.Read readSpec, String consumerArn, KinesisAsyncClient kinesis) {
+    this.poolId = UUID.randomUUID();
+    this.read = readSpec;
+    this.consumerArn = consumerArn;
+    this.kinesis = kinesis;
+    this.watermarkPolicy = read.getWatermarkPolicyFactory().createWatermarkPolicy();
+    this.onErrorCoolDownMs = ON_ERROR_COOL_DOWN_MS_DEFAULT;
+  }
+
+  EFOShardSubscribersPool(
+      KinesisIO.Read readSpec,
+      String consumerArn,
+      KinesisAsyncClient kinesis,
+      int onErrorCoolDownMs) {
+    this.poolId = UUID.randomUUID();
+    this.read = readSpec;
+    this.consumerArn = consumerArn;
+    this.kinesis = kinesis;
+    this.watermarkPolicy = read.getWatermarkPolicyFactory().createWatermarkPolicy();
+    this.onErrorCoolDownMs = onErrorCoolDownMs;
+  }
+
+  /**
+   * Starts a subscribers pool by starting a {@link EFOShardSubscriber#subscribe shard subscription}
+   * for each {@link ShardCheckpoint} with the subscription {@link #errorHandler} callback.
+   *
+   * <p>{@link EFOShardSubscriber}s with their respective state are tracked in {@link #state}.
+   */
+  void start(Iterable<ShardCheckpoint> checkpoints) {
+    LOG.info(
+        "Starting pool {} {} {}. Checkpoints = {}",
+        poolId,
+        read.getStreamName(),
+        consumerArn,
+        checkpoints);
+    for (ShardCheckpoint shardCheckpoint : checkpoints) {
+      checkState(
+          !state.containsKey(shardCheckpoint.getShardId()),
+          "Duplicate shard id %s",
+          shardCheckpoint.getShardId());
+      ShardState shardState = new ShardState(initShardSubscriber(shardCheckpoint), shardCheckpoint);
+      state.put(shardCheckpoint.getShardId(), shardState);
+    }
+  }
+
+  /**
+   * Returns the next disaggregated {@link KinesisRecord} if available and updates {@link #state}
+   * accordingly so that it reflects a mutable checkpoint AFTER returning that record.
+   *
+   * <p>Async subscription errors are delayed until {@link #eventQueue} is completely drained and
+   * then rethrown here.
+   *
+   * <p>This repeats the following steps until a record or {@code null} was returned:
+   *
+   * <ol>
+   *   <li>If {@link #current} is null and {@link #eventQueue} is empty, return {@code null} unless
+   *       {@link #subscriptionError} is set: in that case rethrow.
+   *   <li>Otherwise if {@link #current} is null, poll next from {@link #eventQueue}.
+   *   <li>If {@link #current} has a next {@link KinesisClientRecord}, update {@link #state}
+   *       accordingly and return the corresponding converted {@link KinesisRecord}, optionally
+   *       triggering {@link #onEventDone} if that was the last record of {@link #current}.
+   *   <li>Finally, if nothing was returned yet, trigger {@link #onEventDone} and continue loop.
+   * </ol>
+   *
+   * <p>It polls the {@link #eventQueue} in a while loop to avoid returning null immediately if an
+   * event without records arrived. There may be events with records after the {@link #current}, and
+   * it is better to poll again instead of having {@link EFOKinesisReader#advance()} signalling
+   * false to Beam. Otherwise, Beam would poll again later, which would introduce unnecessary delay.
+   */
+  @Nullable
+  KinesisRecord getNextRecord() throws IOException {
+    while (true) {
+      if (!isStopped && subscriptionError != null) {
+        // Stop the pool to cancel all subscribers and prevent new subscriptions.
+        // Doing this as part of getNextRecord() avoids concurrent access to the state map and
+        // prevents any related issues.
+        stop();
+      }
+
+      if (current == null) {
+        current = eventQueue.poll();
+      }
+
+      if (current != null) {
+        String shardId = current.shardId;
+        ShardState shardState = Preconditions.checkStateNotNull(state.get(shardId));
+        if (current.hasNext()) {
+          KinesisClientRecord r = current.next();
+          KinesisRecord kinesisRecord = new KinesisRecord(r, read.getStreamName(), shardId);
+          if (shardState.recordWasNotCheckPointedYet(kinesisRecord)) {
+            shardState.update(r);
+            watermarkPolicy.update(kinesisRecord);
+            return kinesisRecord;
+          }
+          // Make sure to update shard state accordingly if `current` does not contain any more
+          // events. This is necessary to account for any re-sharding, so we could correctly resume
+          // from a checkpoint if taken once we advanced to the record returned by getNextRecord().
+          if (!current.hasNext()) {
+            onEventDone(shardState, current);
+            current = null;
+          }
+        } else {
+          onEventDone(shardState, current);
+          current = null;
+        }
+      } else if (subscriptionError != null) {
+        stop();
+        throw new IOException(subscriptionError);
+      } else {
+        return null; // no record available, queue is empty
+      }
+    }
+  }
+
+  /**
+   * Unsets {@link #current} and updates {@link #state} accordingly.
+   *
+   * <p>If {@link SubscribeToShardEvent#continuationSequenceNumber()} is defined, update {@link
+   * ShardState} accordingly. Otherwise, or if {@link SubscribeToShardEvent#childShards()} exists,
+   * handle re-sharding: remove old shard from {@link #state} and add new ones at TRIM_HORIZON.
+   *
+   * <p>In case of re-sharding, start all new {@link EFOShardSubscriber#subscribe subscriptions}
+   * with the subscription {@link #errorHandler} if there is no {@link #subscriptionError} yet.
+   */
+  private void onEventDone(ShardState shardState, EventRecords noRecordsEvent) {
+    if (noRecordsEvent.event.continuationSequenceNumber() == null
+        && noRecordsEvent.event.hasChildShards()) {
+      LOG.info("Processing re-shard signal {}", noRecordsEvent.event);

Review Comment:
   Nitpick, add poolid?



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscribersPool.java:
##########
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.kinesis;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ForwardingIterator;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ChildShard;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import software.amazon.kinesis.common.InitialPositionInStream;
+import software.amazon.kinesis.retrieval.AggregatorUtil;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+@SuppressWarnings({"nullness"})
+class EFOShardSubscribersPool {
+  private static final Logger LOG = LoggerFactory.getLogger(EFOShardSubscribersPool.class);
+  private static final int ON_ERROR_COOL_DOWN_MS_DEFAULT = 1_000;
+  private final int onErrorCoolDownMs;
+
+  /**
+   * Identifier of the current subscribers pool.
+   *
+   * <p>Injected into other objects which belong to this pool to ease tracing with logs.
+   */
+  private final UUID poolId;
+
+  private final KinesisIO.Read read;
+  private final String consumerArn;
+  private final KinesisAsyncClient kinesis;
+
+  /**
+   * Unbounded queue of events, but events in-flight are limited by the {@link EFOShardSubscriber}.
+   */
+  private final ConcurrentLinkedQueue<EventRecords> eventQueue = new ConcurrentLinkedQueue<>();
+
+  /**
+   * State map of currently active shards that can be checkpoint-ed.
+   *
+   * <p>This map may only be accessed and updated from within {@link #start}, {@link #getNextRecord}
+   * and dependent {@link #onEventDone} to prevent race conditions.
+   */
+  private final Map<String, ShardState> state = new HashMap<>();
+
+  /**
+   * Async subscription error (as first seen), if set all subscribers must be cancelled and no new
+   * ones started.
+   *
+   * <p>Must be volatile as it is accessed from various threads. But it's best effort, setting this
+   * doesn't have to be atomic.
+   */
+  private volatile @MonotonicNonNull Throwable subscriptionError;
+
+  /**
+   * May only ever be altered from within {@link #stop()} or {@link #getNextRecord()} to prevent
+   * race conditions when cancelling subscribers.
+   */
+  private boolean isStopped = false;
+
+  /**
+   * Async completion callback handling {@link EFOShardSubscriber#subscribe supscriptions} that
+   * terminate exceptionally.
+   *
+   * <p>Unless already in error state, stores error as {@link #subscriptionError}. This pool will be
+   * stopped when {@link #getNextRecord()} is called next, but allowing the {@link #eventQueue} to
+   * be drained. Only once empty any {@link #subscriptionError} is propagated. This simplifies state
+   * management and checkpointing a lot.
+   */
+  private final BiConsumer<Void, Throwable> errorHandler =
+      (Void unused, Throwable error) -> {
+        if (error != null && subscriptionError == null) {
+          subscriptionError = error;
+        }
+      };
+
+  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+  // EventRecords iterator that is currently consumed
+  @Nullable EventRecords current = null;
+
+  private final WatermarkPolicy watermarkPolicy;
+
+  EFOShardSubscribersPool(KinesisIO.Read readSpec, String consumerArn, KinesisAsyncClient kinesis) {
+    this.poolId = UUID.randomUUID();
+    this.read = readSpec;
+    this.consumerArn = consumerArn;
+    this.kinesis = kinesis;
+    this.watermarkPolicy = read.getWatermarkPolicyFactory().createWatermarkPolicy();
+    this.onErrorCoolDownMs = ON_ERROR_COOL_DOWN_MS_DEFAULT;
+  }
+
+  EFOShardSubscribersPool(
+      KinesisIO.Read readSpec,
+      String consumerArn,
+      KinesisAsyncClient kinesis,
+      int onErrorCoolDownMs) {
+    this.poolId = UUID.randomUUID();
+    this.read = readSpec;
+    this.consumerArn = consumerArn;
+    this.kinesis = kinesis;
+    this.watermarkPolicy = read.getWatermarkPolicyFactory().createWatermarkPolicy();
+    this.onErrorCoolDownMs = onErrorCoolDownMs;
+  }
+
+  /**
+   * Starts a subscribers pool by starting a {@link EFOShardSubscriber#subscribe shard subscription}
+   * for each {@link ShardCheckpoint} with the subscription {@link #errorHandler} callback.
+   *
+   * <p>{@link EFOShardSubscriber}s with their respective state are tracked in {@link #state}.
+   */
+  void start(Iterable<ShardCheckpoint> checkpoints) {
+    LOG.info(
+        "Starting pool {} {} {}. Checkpoints = {}",
+        poolId,
+        read.getStreamName(),
+        consumerArn,
+        checkpoints);
+    for (ShardCheckpoint shardCheckpoint : checkpoints) {
+      checkState(
+          !state.containsKey(shardCheckpoint.getShardId()),
+          "Duplicate shard id %s",
+          shardCheckpoint.getShardId());
+      ShardState shardState = new ShardState(initShardSubscriber(shardCheckpoint), shardCheckpoint);
+      state.put(shardCheckpoint.getShardId(), shardState);
+    }
+  }
+
+  /**
+   * Returns the next disaggregated {@link KinesisRecord} if available and updates {@link #state}
+   * accordingly so that it reflects a mutable checkpoint AFTER returning that record.
+   *
+   * <p>Async subscription errors are delayed until {@link #eventQueue} is completely drained and
+   * then rethrown here.
+   *
+   * <p>This repeats the following steps until a record or {@code null} was returned:
+   *
+   * <ol>
+   *   <li>If {@link #current} is null and {@link #eventQueue} is empty, return {@code null} unless
+   *       {@link #subscriptionError} is set: in that case rethrow.
+   *   <li>Otherwise if {@link #current} is null, poll next from {@link #eventQueue}.
+   *   <li>If {@link #current} has a next {@link KinesisClientRecord}, update {@link #state}
+   *       accordingly and return the corresponding converted {@link KinesisRecord}, optionally
+   *       triggering {@link #onEventDone} if that was the last record of {@link #current}.
+   *   <li>Finally, if nothing was returned yet, trigger {@link #onEventDone} and continue loop.
+   * </ol>
+   *
+   * <p>It polls the {@link #eventQueue} in a while loop to avoid returning null immediately if an
+   * event without records arrived. There may be events with records after the {@link #current}, and
+   * it is better to poll again instead of having {@link EFOKinesisReader#advance()} signalling
+   * false to Beam. Otherwise, Beam would poll again later, which would introduce unnecessary delay.
+   */
+  @Nullable
+  KinesisRecord getNextRecord() throws IOException {
+    while (true) {
+      if (!isStopped && subscriptionError != null) {
+        // Stop the pool to cancel all subscribers and prevent new subscriptions.
+        // Doing this as part of getNextRecord() avoids concurrent access to the state map and
+        // prevents any related issues.
+        stop();
+      }
+
+      if (current == null) {
+        current = eventQueue.poll();
+      }
+
+      if (current != null) {
+        String shardId = current.shardId;
+        ShardState shardState = Preconditions.checkStateNotNull(state.get(shardId));
+        if (current.hasNext()) {
+          KinesisClientRecord r = current.next();
+          KinesisRecord kinesisRecord = new KinesisRecord(r, read.getStreamName(), shardId);
+          if (shardState.recordWasNotCheckPointedYet(kinesisRecord)) {
+            shardState.update(r);
+            watermarkPolicy.update(kinesisRecord);
+            return kinesisRecord;
+          }
+          // Make sure to update shard state accordingly if `current` does not contain any more
+          // events. This is necessary to account for any re-sharding, so we could correctly resume
+          // from a checkpoint if taken once we advanced to the record returned by getNextRecord().
+          if (!current.hasNext()) {
+            onEventDone(shardState, current);
+            current = null;
+          }
+        } else {
+          onEventDone(shardState, current);
+          current = null;
+        }
+      } else if (subscriptionError != null) {
+        stop();
+        throw new IOException(subscriptionError);
+      } else {
+        return null; // no record available, queue is empty
+      }
+    }
+  }
+
+  /**
+   * Unsets {@link #current} and updates {@link #state} accordingly.
+   *
+   * <p>If {@link SubscribeToShardEvent#continuationSequenceNumber()} is defined, update {@link
+   * ShardState} accordingly. Otherwise, or if {@link SubscribeToShardEvent#childShards()} exists,
+   * handle re-sharding: remove old shard from {@link #state} and add new ones at TRIM_HORIZON.
+   *
+   * <p>In case of re-sharding, start all new {@link EFOShardSubscriber#subscribe subscriptions}
+   * with the subscription {@link #errorHandler} if there is no {@link #subscriptionError} yet.
+   */
+  private void onEventDone(ShardState shardState, EventRecords noRecordsEvent) {
+    if (noRecordsEvent.event.continuationSequenceNumber() == null
+        && noRecordsEvent.event.hasChildShards()) {
+      LOG.info("Processing re-shard signal {}", noRecordsEvent.event);
+      List<String> successorShardsIds = computeSuccessorShardsIds(noRecordsEvent);
+      for (String successorShardId : successorShardsIds) {
+        ShardCheckpoint newCheckpoint =
+            new ShardCheckpoint(
+                read.getStreamName(),
+                successorShardId,
+                new StartingPoint(InitialPositionInStream.TRIM_HORIZON));
+        state.computeIfAbsent(
+            successorShardId,
+            id -> new ShardState(initShardSubscriber(newCheckpoint), newCheckpoint));
+      }
+
+      state.remove(noRecordsEvent.shardId);
+    } else {
+      shardState.update(noRecordsEvent);
+    }
+  }
+
+  /**
+   * Always initialize a new subscriber to make sure checkpoints will be correct. But only start the
+   * subscription if there is no {@link #subscriptionError}.
+   */
+  @SuppressWarnings("FutureReturnValueIgnored")
+  private EFOShardSubscriber initShardSubscriber(ShardCheckpoint cp) {
+    EFOShardSubscriber subscriber =
+        new EFOShardSubscriber(
+            this, cp.getShardId(), read, consumerArn, kinesis, onErrorCoolDownMs);
+    StartingPosition startingPosition = cp.toEFOStartingPosition();
+    if (subscriptionError == null) {
+      subscriber.subscribe(startingPosition).whenCompleteAsync(errorHandler);
+    }
+    return subscriber;
+  }
+
+  private static List<String> computeSuccessorShardsIds(EventRecords records) {
+    List<String> successorShardsIds = new ArrayList<>();
+    SubscribeToShardEvent event = records.event;
+    for (ChildShard childShard : event.childShards()) {
+      if (childShard.parentShards().contains(records.shardId)) {
+        if (childShard.parentShards().size() > 1) {
+          // This is the case of merging two shards into one.
+          // When there are 2 parent shards, we only pick it up if
+          // its max shard equals to sender shard ID.
+          String maxParentId = childShard.parentShards().stream().max(String::compareTo).get();
+          if (records.shardId.equals(maxParentId)) {
+            successorShardsIds.add(childShard.shardId());
+          }
+        } else {
+          // This is the case when shard is split - we must add both
+          // and start subscriptions for them.
+          successorShardsIds.add(childShard.shardId());
+        }
+      }
+    }
+
+    if (successorShardsIds.isEmpty()) {
+      LOG.info("Found no successors for shard {}", records.shardId);
+    } else {
+      LOG.info("Found successors for shard {}: {}", records.shardId, successorShardsIds);
+    }
+    return successorShardsIds;
+  }
+
+  /** Adds a {@link EventRecords} iterator for shardId and event to {@link #eventQueue}. */
+  void enqueueEvent(String shardId, SubscribeToShardEvent event) {
+    eventQueue.offer(new EventRecords(shardId, event));
+  }
+
+  Instant getWatermark() {
+    return watermarkPolicy.getWatermark();
+  }
+
+  /** This is assumed to be never called before {@link #start} is called. */
+  KinesisReaderCheckpoint getCheckpointMark() {
+    List<ShardCheckpoint> checkpoints = new ArrayList<>();
+    for (ShardState shardState : state.values()) {
+      checkpoints.add(shardState.toCheckpoint());
+    }
+
+    return new KinesisReaderCheckpoint(checkpoints);
+  }
+
+  void stop() {
+    LOG.info("Stopping pool {}", poolId);
+    isStopped = true;
+    state.forEach((shardId, st) -> st.subscriber.cancel());
+    scheduler.shutdownNow(); // immediately discard all scheduled tasks
+  }
+
+  /**
+   * Mutable class tracking state and progress per shard.
+   *
+   * <p>When {@link #getCheckpointMark()} is called, {@link ShardCheckpoint} instances are created
+   * from these objects, and 3 cases are possible:
+   *
+   * <ul>
+   *   <li>Pool is just created, and a shard never gave out any record - {@link ShardCheckpoint}
+   *       falls back to {@link ShardState#initCheckpoint}
+   *   <li>Pool was running and got re-shard events - same as above
+   *   <li>Pool was running, and gave out events - use {@link ShardState#sequenceNumber} and {@link
+   *       ShardState#subSequenceNumber}
+   * </ul>
+   */
+  private static class ShardState {
+    final EFOShardSubscriber subscriber;
+    final ShardCheckpoint initCheckpoint;
+
+    @Nullable String sequenceNumber = null;
+    long subSequenceNumber = 0L;
+
+    ShardState(EFOShardSubscriber subscriber, ShardCheckpoint initCheckpoint) {
+      this.subscriber = subscriber;
+      this.initCheckpoint = initCheckpoint;
+    }
+
+    void update(KinesisClientRecord r) {
+      sequenceNumber = checkNotNull(r.sequenceNumber());
+      subSequenceNumber = r.subSequenceNumber();
+    }
+
+    /**
+     * To be used for end-of-record handling / heartbeat records.
+     *
+     * <p>{@link #subSequenceNumber} can not be re-set to 0 here, cause otherwise
+     * end-of-aggregated-record would erase progress in consuming aggregated records.
+     *
+     * @param eventRecords
+     */
+    void update(EventRecords eventRecords) {
+      sequenceNumber = checkNotNull(eventRecords.event.continuationSequenceNumber());
+      subscriber.ackEvent();
+    }
+
+    /**
+     * Follows semantics of {@link ShardCheckpoint#moveAfter(KinesisRecord)}, e.g. it will always
+     * persist {@link ShardIteratorType#AFTER_SEQUENCE_NUMBER} as soon as some record gets its
+     * {@link #sequenceNumber} registered.
+     */
+    ShardCheckpoint toCheckpoint() {
+      if (sequenceNumber != null) {
+        return new ShardCheckpoint(
+            initCheckpoint.getStreamName(),
+            initCheckpoint.getShardId(),
+            ShardIteratorType.AFTER_SEQUENCE_NUMBER,
+            sequenceNumber,
+            subSequenceNumber);
+      } else {
+        // sequenceNumber was never updated for this shard,
+        // fall back to its init checkpoint
+        return initCheckpoint;
+      }
+    }
+
+    boolean recordWasNotCheckPointedYet(KinesisRecord r) {

Review Comment:
   This name is a bit confusing, there might have been other checkpoints after the initial checkpoint.
   How about renaming to `isAfterInitialCheckpoint`? Also a quick javadoc would be helpful why this is necessary.



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscribersPool.java:
##########
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.kinesis;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ForwardingIterator;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ChildShard;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import software.amazon.kinesis.common.InitialPositionInStream;
+import software.amazon.kinesis.retrieval.AggregatorUtil;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+@SuppressWarnings({"nullness"})
+class EFOShardSubscribersPool {
+  private static final Logger LOG = LoggerFactory.getLogger(EFOShardSubscribersPool.class);
+  private static final int ON_ERROR_COOL_DOWN_MS_DEFAULT = 1_000;
+  private final int onErrorCoolDownMs;
+
+  /**
+   * Identifier of the current subscribers pool.
+   *
+   * <p>Injected into other objects which belong to this pool to ease tracing with logs.
+   */
+  private final UUID poolId;
+
+  private final KinesisIO.Read read;
+  private final String consumerArn;
+  private final KinesisAsyncClient kinesis;
+
+  /**
+   * Unbounded queue of events, but events in-flight are limited by the {@link EFOShardSubscriber}.
+   */
+  private final ConcurrentLinkedQueue<EventRecords> eventQueue = new ConcurrentLinkedQueue<>();
+
+  /**
+   * State map of currently active shards that can be checkpoint-ed.
+   *
+   * <p>This map may only be accessed and updated from within {@link #start}, {@link #getNextRecord}
+   * and dependent {@link #onEventDone} to prevent race conditions.
+   */
+  private final Map<String, ShardState> state = new HashMap<>();
+
+  /**
+   * Async subscription error (as first seen), if set all subscribers must be cancelled and no new
+   * ones started.
+   *
+   * <p>Must be volatile as it is accessed from various threads. But it's best effort, setting this
+   * doesn't have to be atomic.
+   */
+  private volatile @MonotonicNonNull Throwable subscriptionError;
+
+  /**
+   * May only ever be altered from within {@link #stop()} or {@link #getNextRecord()} to prevent
+   * race conditions when cancelling subscribers.
+   */
+  private boolean isStopped = false;
+
+  /**
+   * Async completion callback handling {@link EFOShardSubscriber#subscribe supscriptions} that
+   * terminate exceptionally.
+   *
+   * <p>Unless already in error state, stores error as {@link #subscriptionError}. This pool will be
+   * stopped when {@link #getNextRecord()} is called next, but allowing the {@link #eventQueue} to
+   * be drained. Only once empty any {@link #subscriptionError} is propagated. This simplifies state
+   * management and checkpointing a lot.
+   */
+  private final BiConsumer<Void, Throwable> errorHandler =
+      (Void unused, Throwable error) -> {
+        if (error != null && subscriptionError == null) {
+          subscriptionError = error;
+        }
+      };
+
+  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+  // EventRecords iterator that is currently consumed
+  @Nullable EventRecords current = null;
+
+  private final WatermarkPolicy watermarkPolicy;
+
+  EFOShardSubscribersPool(KinesisIO.Read readSpec, String consumerArn, KinesisAsyncClient kinesis) {
+    this.poolId = UUID.randomUUID();
+    this.read = readSpec;
+    this.consumerArn = consumerArn;
+    this.kinesis = kinesis;
+    this.watermarkPolicy = read.getWatermarkPolicyFactory().createWatermarkPolicy();
+    this.onErrorCoolDownMs = ON_ERROR_COOL_DOWN_MS_DEFAULT;
+  }
+
+  EFOShardSubscribersPool(
+      KinesisIO.Read readSpec,
+      String consumerArn,
+      KinesisAsyncClient kinesis,
+      int onErrorCoolDownMs) {
+    this.poolId = UUID.randomUUID();
+    this.read = readSpec;
+    this.consumerArn = consumerArn;
+    this.kinesis = kinesis;
+    this.watermarkPolicy = read.getWatermarkPolicyFactory().createWatermarkPolicy();
+    this.onErrorCoolDownMs = onErrorCoolDownMs;
+  }
+
+  /**
+   * Starts a subscribers pool by starting a {@link EFOShardSubscriber#subscribe shard subscription}
+   * for each {@link ShardCheckpoint} with the subscription {@link #errorHandler} callback.
+   *
+   * <p>{@link EFOShardSubscriber}s with their respective state are tracked in {@link #state}.
+   */
+  void start(Iterable<ShardCheckpoint> checkpoints) {
+    LOG.info(
+        "Starting pool {} {} {}. Checkpoints = {}",
+        poolId,
+        read.getStreamName(),
+        consumerArn,
+        checkpoints);
+    for (ShardCheckpoint shardCheckpoint : checkpoints) {
+      checkState(
+          !state.containsKey(shardCheckpoint.getShardId()),
+          "Duplicate shard id %s",
+          shardCheckpoint.getShardId());
+      ShardState shardState = new ShardState(initShardSubscriber(shardCheckpoint), shardCheckpoint);
+      state.put(shardCheckpoint.getShardId(), shardState);
+    }
+  }
+
+  /**
+   * Returns the next disaggregated {@link KinesisRecord} if available and updates {@link #state}
+   * accordingly so that it reflects a mutable checkpoint AFTER returning that record.
+   *
+   * <p>Async subscription errors are delayed until {@link #eventQueue} is completely drained and
+   * then rethrown here.
+   *
+   * <p>This repeats the following steps until a record or {@code null} was returned:
+   *
+   * <ol>
+   *   <li>If {@link #current} is null and {@link #eventQueue} is empty, return {@code null} unless
+   *       {@link #subscriptionError} is set: in that case rethrow.
+   *   <li>Otherwise if {@link #current} is null, poll next from {@link #eventQueue}.
+   *   <li>If {@link #current} has a next {@link KinesisClientRecord}, update {@link #state}
+   *       accordingly and return the corresponding converted {@link KinesisRecord}, optionally
+   *       triggering {@link #onEventDone} if that was the last record of {@link #current}.
+   *   <li>Finally, if nothing was returned yet, trigger {@link #onEventDone} and continue loop.
+   * </ol>
+   *
+   * <p>It polls the {@link #eventQueue} in a while loop to avoid returning null immediately if an
+   * event without records arrived. There may be events with records after the {@link #current}, and
+   * it is better to poll again instead of having {@link EFOKinesisReader#advance()} signalling
+   * false to Beam. Otherwise, Beam would poll again later, which would introduce unnecessary delay.
+   */
+  @Nullable
+  KinesisRecord getNextRecord() throws IOException {
+    while (true) {
+      if (!isStopped && subscriptionError != null) {
+        // Stop the pool to cancel all subscribers and prevent new subscriptions.
+        // Doing this as part of getNextRecord() avoids concurrent access to the state map and
+        // prevents any related issues.
+        stop();
+      }
+
+      if (current == null) {
+        current = eventQueue.poll();
+      }
+
+      if (current != null) {
+        String shardId = current.shardId;
+        ShardState shardState = Preconditions.checkStateNotNull(state.get(shardId));
+        if (current.hasNext()) {
+          KinesisClientRecord r = current.next();
+          KinesisRecord kinesisRecord = new KinesisRecord(r, read.getStreamName(), shardId);
+          if (shardState.recordWasNotCheckPointedYet(kinesisRecord)) {

Review Comment:
   I'm pretty sure this if block must be moved behind the block below where `onEventDone` is called.



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java:
##########
@@ -147,6 +149,92 @@
  * Read#withCustomRateLimitPolicy(RateLimitPolicyFactory)}. This requires implementing {@link
  * RateLimitPolicy} with a corresponding {@link RateLimitPolicyFactory}.
  *
+ * <h4>Enhanced Fan-Out</h4>
+ *
+ * Kinesis IO supports Consumers with Dedicated Throughput (Enhanced Fan-Out, EFO). This type of
+ * consumer doesn't have to contend with other consumers that are receiving data from the stream.
+ *
+ * <p>More details can be found here: <a
+ * href="https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html">Consumers with
+ * Dedicated Throughput</a>
+ *
+ * <p>Primary method of enabling EFO is setting {@link Read#withConsumerArn(String)}:
+ *
+ * <pre>{@code
+ * p.apply(KinesisIO.read()
+ *    .withStreamName("streamName")
+ *    .withInitialPositionInStream(InitialPositionInStream.LATEST)
+ *    .withConsumerArn("arn:aws:kinesis:.../streamConsumer:12345678"))
+ * }</pre>
+ *
+ * <p>Alternatively, EFO can be enabled for one or more {@link Read} instances via pipeline options:
+ *
+ * <pre>{@code --kinesisIOConsumerArns{
+ *   "stream-01": "arn:aws:kinesis:...:stream/stream-01/consumer/consumer-01:1678576714",
+ *   "stream-02": "arn:aws:kinesis:...:stream/stream-02/consumer/my-consumer:1679576982",
+ *   ...
+ * }}</pre>
+ *
+ * <p>If set, pipeline options will overwrite {@link Read#withConsumerArn(String)} setting. Check
+ * {@link KinesisIOOptions} for more details.
+ *
+ * <p>Depending on the downstream processing performance, the EFO consumer will back-pressure
+ * internally.
+ *
+ * <p>Adjusting runner's settings is recommended - such that it does not (re)start EFO consumer(s)
+ * faster than once per ~ 10 seconds. Internal calls to {@link
+ * KinesisAsyncClient#subscribeToShard(SubscribeToShardRequest, SubscribeToShardResponseHandler)}
+ * may throw ResourceInUseException otherwise, which will cause a crash loop.

Review Comment:
   Tuning the retry policy of the client should help here.



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java:
##########
@@ -147,6 +149,92 @@
  * Read#withCustomRateLimitPolicy(RateLimitPolicyFactory)}. This requires implementing {@link
  * RateLimitPolicy} with a corresponding {@link RateLimitPolicyFactory}.
  *
+ * <h4>Enhanced Fan-Out</h4>
+ *
+ * Kinesis IO supports Consumers with Dedicated Throughput (Enhanced Fan-Out, EFO). This type of
+ * consumer doesn't have to contend with other consumers that are receiving data from the stream.
+ *
+ * <p>More details can be found here: <a
+ * href="https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html">Consumers with
+ * Dedicated Throughput</a>
+ *
+ * <p>Primary method of enabling EFO is setting {@link Read#withConsumerArn(String)}:
+ *
+ * <pre>{@code
+ * p.apply(KinesisIO.read()
+ *    .withStreamName("streamName")
+ *    .withInitialPositionInStream(InitialPositionInStream.LATEST)
+ *    .withConsumerArn("arn:aws:kinesis:.../streamConsumer:12345678"))
+ * }</pre>
+ *
+ * <p>Alternatively, EFO can be enabled for one or more {@link Read} instances via pipeline options:
+ *
+ * <pre>{@code --kinesisIOConsumerArns{
+ *   "stream-01": "arn:aws:kinesis:...:stream/stream-01/consumer/consumer-01:1678576714",
+ *   "stream-02": "arn:aws:kinesis:...:stream/stream-02/consumer/my-consumer:1679576982",
+ *   ...
+ * }}</pre>
+ *
+ * <p>If set, pipeline options will overwrite {@link Read#withConsumerArn(String)} setting. Check
+ * {@link KinesisIOOptions} for more details.
+ *
+ * <p>Depending on the downstream processing performance, the EFO consumer will back-pressure
+ * internally.
+ *
+ * <p>Adjusting runner's settings is recommended - such that it does not (re)start EFO consumer(s)
+ * faster than once per ~ 10 seconds. Internal calls to {@link
+ * KinesisAsyncClient#subscribeToShard(SubscribeToShardRequest, SubscribeToShardResponseHandler)}
+ * may throw ResourceInUseException otherwise, which will cause a crash loop.
+ *
+ * <p>EFO source, when consuming from a stream with often re-sharding, may eventually get skewed
+ * load among runner workers: some may end up with no active shard subscriptions at all.
+ *
+ * <h5>Enhanced Fan-Out and KinesisIO state management</h5>
+ *
+ * <p>Different runners may behave differently when a Beam application is started from a persisted
+ * state. Examples of persisted state are:
+ *
+ * <ul>
+ *   <li><a href="https://cloud.google.com/dataflow/docs/guides/using-snapshots">GCP Dataflow
+ *       snapshots</a>
+ *   <li><a
+ *       href="https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/">Flink
+ *       savepoints</a>
+ *   <li><a
+ *       href="https://docs.aws.amazon.com/kinesisanalytics/latest/java/how-fault-snapshot.html">Kinesis
+ *       Data Analytics snapshots</a>
+ * </ul>
+ *
+ * <p>Depending on their internals, runners may persist <b>entire</b> {@link Read} object inside the
+ * state, like Flink runner does. It means that, once enabled via {@link
+ * Read#withConsumerArn(String)} in Flink runner, as long as the Beam application starts from a
+ * savepoint, further changes to {@link Read#withConsumerArn(String)} won't take effect.
+ *
+ * <p>If your runner persists {@link Read} object, disabling / changing consumer ARN and restoring
+ * from persisted state can be done via {@link KinesisIOOptions#setKinesisIOConsumerArns(Map)}:
+ *
+ * <pre>{@code --kinesisIOConsumerArns={
+ *   "stream-01": " < new consumer ARN > ",  <- updated ARN
+ *   "stream-02": null,  <- disabling EFO
+ *   ...
+ * }}</pre>
+ *
+ * <p>EFO can be enabled / disabled any time without loosing consumer's positions in shards which
+ * were already checkpoint-ed. Consumer ARN for a given stream can be changed any time, too.
+ *
+ * <h5>Enhanced Fan-Out and other KinesisIO settings</h5>
+ *
+ * <p>When EFO is enabled, the following configurations are ignored:
+ *
+ * <ul>
+ *   <li>{@link Read#withMaxCapacityPerShard(Integer)}

Review Comment:
   Actually, this logically corresponds to our in-flight limit per shard. We might as well just use the configuration.
   Though the current default (10k) is probably way to large in our case. But if made nullable, the default could be based on the use case.
   
   This could also be a follow up ticket.



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java:
##########
@@ -147,6 +149,92 @@
  * Read#withCustomRateLimitPolicy(RateLimitPolicyFactory)}. This requires implementing {@link
  * RateLimitPolicy} with a corresponding {@link RateLimitPolicyFactory}.
  *
+ * <h4>Enhanced Fan-Out</h4>
+ *
+ * Kinesis IO supports Consumers with Dedicated Throughput (Enhanced Fan-Out, EFO). This type of
+ * consumer doesn't have to contend with other consumers that are receiving data from the stream.
+ *
+ * <p>More details can be found here: <a
+ * href="https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html">Consumers with
+ * Dedicated Throughput</a>
+ *
+ * <p>Primary method of enabling EFO is setting {@link Read#withConsumerArn(String)}:
+ *
+ * <pre>{@code
+ * p.apply(KinesisIO.read()
+ *    .withStreamName("streamName")
+ *    .withInitialPositionInStream(InitialPositionInStream.LATEST)
+ *    .withConsumerArn("arn:aws:kinesis:.../streamConsumer:12345678"))
+ * }</pre>
+ *
+ * <p>Alternatively, EFO can be enabled for one or more {@link Read} instances via pipeline options:
+ *
+ * <pre>{@code --kinesisIOConsumerArns{
+ *   "stream-01": "arn:aws:kinesis:...:stream/stream-01/consumer/consumer-01:1678576714",
+ *   "stream-02": "arn:aws:kinesis:...:stream/stream-02/consumer/my-consumer:1679576982",
+ *   ...
+ * }}</pre>
+ *
+ * <p>If set, pipeline options will overwrite {@link Read#withConsumerArn(String)} setting. Check
+ * {@link KinesisIOOptions} for more details.
+ *
+ * <p>Depending on the downstream processing performance, the EFO consumer will back-pressure
+ * internally.
+ *
+ * <p>Adjusting runner's settings is recommended - such that it does not (re)start EFO consumer(s)
+ * faster than once per ~ 10 seconds. Internal calls to {@link
+ * KinesisAsyncClient#subscribeToShard(SubscribeToShardRequest, SubscribeToShardResponseHandler)}
+ * may throw ResourceInUseException otherwise, which will cause a crash loop.
+ *
+ * <p>EFO source, when consuming from a stream with often re-sharding, may eventually get skewed
+ * load among runner workers: some may end up with no active shard subscriptions at all.
+ *
+ * <h5>Enhanced Fan-Out and KinesisIO state management</h5>
+ *
+ * <p>Different runners may behave differently when a Beam application is started from a persisted
+ * state. Examples of persisted state are:
+ *
+ * <ul>
+ *   <li><a href="https://cloud.google.com/dataflow/docs/guides/using-snapshots">GCP Dataflow
+ *       snapshots</a>
+ *   <li><a
+ *       href="https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/">Flink
+ *       savepoints</a>
+ *   <li><a
+ *       href="https://docs.aws.amazon.com/kinesisanalytics/latest/java/how-fault-snapshot.html">Kinesis
+ *       Data Analytics snapshots</a>
+ * </ul>
+ *
+ * <p>Depending on their internals, runners may persist <b>entire</b> {@link Read} object inside the
+ * state, like Flink runner does. It means that, once enabled via {@link
+ * Read#withConsumerArn(String)} in Flink runner, as long as the Beam application starts from a
+ * savepoint, further changes to {@link Read#withConsumerArn(String)} won't take effect.
+ *
+ * <p>If your runner persists {@link Read} object, disabling / changing consumer ARN and restoring
+ * from persisted state can be done via {@link KinesisIOOptions#setKinesisIOConsumerArns(Map)}:
+ *
+ * <pre>{@code --kinesisIOConsumerArns={
+ *   "stream-01": " < new consumer ARN > ",  <- updated ARN
+ *   "stream-02": null,  <- disabling EFO
+ *   ...
+ * }}</pre>
+ *
+ * <p>EFO can be enabled / disabled any time without loosing consumer's positions in shards which
+ * were already checkpoint-ed. Consumer ARN for a given stream can be changed any time, too.
+ *
+ * <h5>Enhanced Fan-Out and other KinesisIO settings</h5>
+ *
+ * <p>When EFO is enabled, the following configurations are ignored:
+ *
+ * <ul>
+ *   <li>{@link Read#withMaxCapacityPerShard(Integer)}
+ *   <li>{@link Read#withRequestRecordsLimit(int)}
+ *   <li>{@link Read#withCustomRateLimitPolicy(RateLimitPolicyFactory)}
+ *   <li>{@link Read#withFixedDelayRateLimitPolicy()}
+ *   <li>{@link Read#withDynamicDelayRateLimitPolicy(Supplier)}
+ *   <li>{@link Read#withUpToDateThreshold(Duration)}

Review Comment:
   This also requires a follow up ticket to support dynamic scaling, see `getSplitBacklogBytes` in `KinesisSource`.



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOKinesisReader.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.kinesis;
+
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+
+class EFOKinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(EFOKinesisReader.class);
+
+  private final KinesisIO.Read spec;
+  private final String consumerArn;
+  private final KinesisAsyncClient kinesis;
+  private final KinesisSource source;
+  private final KinesisReaderCheckpoint initCheckpoint;
+
+  private @Nullable KinesisRecord currentRecord = null;
+  private @Nullable EFOShardSubscribersPool shardSubscribersPool = null;
+
+  EFOKinesisReader(
+      KinesisIO.Read spec,
+      String consumerArn,
+      KinesisAsyncClient kinesis,
+      KinesisReaderCheckpoint initCheckpoint,
+      KinesisSource source) {
+    this.spec = checkArgumentNotNull(spec);
+    this.consumerArn = checkArgumentNotNull(consumerArn);
+    this.kinesis = checkArgumentNotNull(kinesis);
+    this.initCheckpoint = checkArgumentNotNull(initCheckpoint);
+    this.source = source;
+  }
+
+  @Override
+  @SuppressWarnings("dereference.of.nullable")
+  public boolean start() throws IOException {
+    LOG.info("Starting reader using {}", initCheckpoint);
+    try {
+      shardSubscribersPool = createPool();
+      shardSubscribersPool.start(initCheckpoint);

Review Comment:
   Or alternatively you could use `MonotonicNotNull` instead of `Nullable`



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscribersPool.java:
##########
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.kinesis;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ForwardingIterator;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ChildShard;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import software.amazon.kinesis.common.InitialPositionInStream;
+import software.amazon.kinesis.retrieval.AggregatorUtil;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+@SuppressWarnings({"nullness"})
+class EFOShardSubscribersPool {
+  private static final Logger LOG = LoggerFactory.getLogger(EFOShardSubscribersPool.class);
+  private static final int ON_ERROR_COOL_DOWN_MS_DEFAULT = 1_000;
+  private final int onErrorCoolDownMs;
+
+  /**
+   * Identifier of the current subscribers pool.
+   *
+   * <p>Injected into other objects which belong to this pool to ease tracing with logs.
+   */
+  private final UUID poolId;
+
+  private final KinesisIO.Read read;
+  private final String consumerArn;
+  private final KinesisAsyncClient kinesis;
+
+  /**
+   * Unbounded queue of events, but events in-flight are limited by the {@link EFOShardSubscriber}.
+   */
+  private final ConcurrentLinkedQueue<EventRecords> eventQueue = new ConcurrentLinkedQueue<>();
+
+  /**
+   * State map of currently active shards that can be checkpoint-ed.
+   *
+   * <p>This map may only be accessed and updated from within {@link #start}, {@link #getNextRecord}
+   * and dependent {@link #onEventDone} to prevent race conditions.
+   */
+  private final Map<String, ShardState> state = new HashMap<>();
+
+  /**
+   * Async subscription error (as first seen), if set all subscribers must be cancelled and no new
+   * ones started.
+   *
+   * <p>Must be volatile as it is accessed from various threads. But it's best effort, setting this
+   * doesn't have to be atomic.
+   */
+  private volatile @MonotonicNonNull Throwable subscriptionError;
+
+  /**
+   * May only ever be altered from within {@link #stop()} or {@link #getNextRecord()} to prevent
+   * race conditions when cancelling subscribers.
+   */
+  private boolean isStopped = false;
+
+  /**
+   * Async completion callback handling {@link EFOShardSubscriber#subscribe supscriptions} that
+   * terminate exceptionally.
+   *
+   * <p>Unless already in error state, stores error as {@link #subscriptionError}. This pool will be
+   * stopped when {@link #getNextRecord()} is called next, but allowing the {@link #eventQueue} to
+   * be drained. Only once empty any {@link #subscriptionError} is propagated. This simplifies state
+   * management and checkpointing a lot.
+   */
+  private final BiConsumer<Void, Throwable> errorHandler =
+      (Void unused, Throwable error) -> {
+        if (error != null && subscriptionError == null) {
+          subscriptionError = error;
+        }
+      };
+
+  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+  // EventRecords iterator that is currently consumed
+  @Nullable EventRecords current = null;
+
+  private final WatermarkPolicy watermarkPolicy;
+
+  EFOShardSubscribersPool(KinesisIO.Read readSpec, String consumerArn, KinesisAsyncClient kinesis) {
+    this.poolId = UUID.randomUUID();
+    this.read = readSpec;
+    this.consumerArn = consumerArn;
+    this.kinesis = kinesis;
+    this.watermarkPolicy = read.getWatermarkPolicyFactory().createWatermarkPolicy();
+    this.onErrorCoolDownMs = ON_ERROR_COOL_DOWN_MS_DEFAULT;
+  }
+
+  EFOShardSubscribersPool(
+      KinesisIO.Read readSpec,
+      String consumerArn,
+      KinesisAsyncClient kinesis,
+      int onErrorCoolDownMs) {
+    this.poolId = UUID.randomUUID();
+    this.read = readSpec;
+    this.consumerArn = consumerArn;
+    this.kinesis = kinesis;
+    this.watermarkPolicy = read.getWatermarkPolicyFactory().createWatermarkPolicy();
+    this.onErrorCoolDownMs = onErrorCoolDownMs;
+  }
+
+  /**
+   * Starts a subscribers pool by starting a {@link EFOShardSubscriber#subscribe shard subscription}
+   * for each {@link ShardCheckpoint} with the subscription {@link #errorHandler} callback.
+   *
+   * <p>{@link EFOShardSubscriber}s with their respective state are tracked in {@link #state}.
+   */
+  void start(Iterable<ShardCheckpoint> checkpoints) {
+    LOG.info(
+        "Starting pool {} {} {}. Checkpoints = {}",
+        poolId,
+        read.getStreamName(),
+        consumerArn,
+        checkpoints);
+    for (ShardCheckpoint shardCheckpoint : checkpoints) {
+      checkState(
+          !state.containsKey(shardCheckpoint.getShardId()),
+          "Duplicate shard id %s",
+          shardCheckpoint.getShardId());
+      ShardState shardState = new ShardState(initShardSubscriber(shardCheckpoint), shardCheckpoint);
+      state.put(shardCheckpoint.getShardId(), shardState);
+    }
+  }
+
+  /**
+   * Returns the next disaggregated {@link KinesisRecord} if available and updates {@link #state}
+   * accordingly so that it reflects a mutable checkpoint AFTER returning that record.
+   *
+   * <p>Async subscription errors are delayed until {@link #eventQueue} is completely drained and
+   * then rethrown here.
+   *
+   * <p>This repeats the following steps until a record or {@code null} was returned:
+   *
+   * <ol>
+   *   <li>If {@link #current} is null and {@link #eventQueue} is empty, return {@code null} unless
+   *       {@link #subscriptionError} is set: in that case rethrow.
+   *   <li>Otherwise if {@link #current} is null, poll next from {@link #eventQueue}.
+   *   <li>If {@link #current} has a next {@link KinesisClientRecord}, update {@link #state}
+   *       accordingly and return the corresponding converted {@link KinesisRecord}, optionally
+   *       triggering {@link #onEventDone} if that was the last record of {@link #current}.
+   *   <li>Finally, if nothing was returned yet, trigger {@link #onEventDone} and continue loop.
+   * </ol>
+   *
+   * <p>It polls the {@link #eventQueue} in a while loop to avoid returning null immediately if an
+   * event without records arrived. There may be events with records after the {@link #current}, and
+   * it is better to poll again instead of having {@link EFOKinesisReader#advance()} signalling
+   * false to Beam. Otherwise, Beam would poll again later, which would introduce unnecessary delay.
+   */
+  @Nullable
+  KinesisRecord getNextRecord() throws IOException {
+    while (true) {
+      if (!isStopped && subscriptionError != null) {
+        // Stop the pool to cancel all subscribers and prevent new subscriptions.
+        // Doing this as part of getNextRecord() avoids concurrent access to the state map and
+        // prevents any related issues.
+        stop();
+      }
+
+      if (current == null) {
+        current = eventQueue.poll();
+      }
+
+      if (current != null) {
+        String shardId = current.shardId;
+        ShardState shardState = Preconditions.checkStateNotNull(state.get(shardId));
+        if (current.hasNext()) {
+          KinesisClientRecord r = current.next();
+          KinesisRecord kinesisRecord = new KinesisRecord(r, read.getStreamName(), shardId);
+          if (shardState.recordWasNotCheckPointedYet(kinesisRecord)) {
+            shardState.update(r);
+            watermarkPolicy.update(kinesisRecord);
+            return kinesisRecord;
+          }
+          // Make sure to update shard state accordingly if `current` does not contain any more
+          // events. This is necessary to account for any re-sharding, so we could correctly resume
+          // from a checkpoint if taken once we advanced to the record returned by getNextRecord().
+          if (!current.hasNext()) {
+            onEventDone(shardState, current);
+            current = null;
+          }
+        } else {
+          onEventDone(shardState, current);
+          current = null;
+        }
+      } else if (subscriptionError != null) {
+        stop();
+        throw new IOException(subscriptionError);
+      } else {
+        return null; // no record available, queue is empty
+      }
+    }
+  }
+
+  /**
+   * Unsets {@link #current} and updates {@link #state} accordingly.
+   *
+   * <p>If {@link SubscribeToShardEvent#continuationSequenceNumber()} is defined, update {@link
+   * ShardState} accordingly. Otherwise, or if {@link SubscribeToShardEvent#childShards()} exists,
+   * handle re-sharding: remove old shard from {@link #state} and add new ones at TRIM_HORIZON.
+   *
+   * <p>In case of re-sharding, start all new {@link EFOShardSubscriber#subscribe subscriptions}
+   * with the subscription {@link #errorHandler} if there is no {@link #subscriptionError} yet.
+   */
+  private void onEventDone(ShardState shardState, EventRecords noRecordsEvent) {

Review Comment:
   `noRecordsEvent` might be confusing, there might have been records in the event. But at this point everything is processed. Maybe just rename to `event`?



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscriber.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.kinesis;
+
+import static org.apache.beam.sdk.io.aws2.kinesis.EFOShardSubscriber.State.INITIALIZED;
+import static org.apache.beam.sdk.io.aws2.kinesis.EFOShardSubscriber.State.PAUSED;
+import static org.apache.beam.sdk.io.aws2.kinesis.EFOShardSubscriber.State.RUNNING;
+import static org.apache.beam.sdk.io.aws2.kinesis.EFOShardSubscriber.State.STOPPED;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import io.netty.channel.ChannelException;
+import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+class EFOShardSubscriber {
+  enum State {
+    INITIALIZED, // Initialized, but not started yet
+    RUNNING, // Subscriber started
+    PAUSED, // Subscriber paused due to backpressure
+    STOPPED // Subscriber stopped
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(EFOShardSubscriber.class);
+  private static final Integer IN_FLIGHT_LIMIT = 10;
+
+  private final EFOShardSubscribersPool pool;
+  private final String consumerArn;
+
+  // Shard id of this subscriber
+  private final String shardId;
+
+  private final KinesisAsyncClient kinesis;
+
+  /** Internal subscriber state. */
+  private volatile State state = INITIALIZED;
+
+  /**
+   * Kept only for cases when a subscription starts and then fails with a non-critical error, before
+   * any event updates {@link ShardEventsSubscriber#sequenceNumber}.
+   */
+  private @MonotonicNonNull StartingPosition initialPosition;
+
+  /**
+   * Completes once this shard subscriber is done, either normally (stopped or shard is completely
+   * consumed) or exceptionally due to a non retry-able error.
+   */
+  private final CompletableFuture<Void> done = new CompletableFuture<>();
+
+  private final ShardEventsSubscriber eventsSubscriber = new ShardEventsSubscriber();
+
+  /** Tracks number of delivered events in flight (until ack-ed). */
+  private final AtomicInteger inFlight = new AtomicInteger();
+
+  /**
+   * Async completion handler for {@link KinesisAsyncClient#subscribeToShard} that:
+   * <li>exists immediately if {@link #done} is already completed (exceptionally),
+   * <li>re-subscribes at {@link ShardEventsSubscriber#sequenceNumber} for retryable errors such as
+   *     retryable {@link SdkException}, {@link ClosedChannelException}, {@link ChannelException},
+   *     {@link TimeoutException} (any of these might be wrapped in {@link CompletionException}s)
+   * <li>or completes {@link #done} exceptionally for any other error,
+   * <li>completes {@link #done} normally if subscriber {@link #state} is {@link State#STOPPED} or
+   *     if shard completed (no further {@link ShardEventsSubscriber#sequenceNumber}),
+   * <li>or otherwise re-subscribes at {@link ShardEventsSubscriber#sequenceNumber}.
+   */
+  private final BiConsumer<Void, Throwable> reSubscriptionHandler;
+
+  private static boolean isRetryable(Throwable error) {
+    Throwable cause = unwrapCompletionException(error);
+    if (cause instanceof SdkException && ((SdkException) cause).retryable()) {
+      return true; // retryable SDK exception
+    }
+    // check the root cause for issues that can be addressed using retries
+    cause = Throwables.getRootCause(cause);
+    return cause instanceof ClosedChannelException // Java Nio
+        || cause instanceof TimeoutException // Java
+        || cause instanceof ChannelException; // Netty (e.g. ReadTimeoutException)
+  }
+
+  /** Loops through completion exceptions until we get the underlying cause. */
+  private static Throwable unwrapCompletionException(Throwable completionException) {
+    Throwable current = completionException;
+    while (current instanceof CompletionException) {
+      Throwable cause = current.getCause();
+      if (cause != null) {
+        current = cause;
+      } else {
+        return current;
+      }
+    }
+    return current;
+  }
+
+  @SuppressWarnings({"FutureReturnValueIgnored", "all"})

Review Comment:
   If possible please don't suppress `all`. What issues do remain here?



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscriber.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.kinesis;
+
+import static org.apache.beam.sdk.io.aws2.kinesis.EFOShardSubscriber.State.INITIALIZED;
+import static org.apache.beam.sdk.io.aws2.kinesis.EFOShardSubscriber.State.PAUSED;
+import static org.apache.beam.sdk.io.aws2.kinesis.EFOShardSubscriber.State.RUNNING;
+import static org.apache.beam.sdk.io.aws2.kinesis.EFOShardSubscriber.State.STOPPED;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import io.netty.channel.ChannelException;
+import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+class EFOShardSubscriber {
+  enum State {
+    INITIALIZED, // Initialized, but not started yet
+    RUNNING, // Subscriber started
+    PAUSED, // Subscriber paused due to backpressure
+    STOPPED // Subscriber stopped
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(EFOShardSubscriber.class);
+  private static final Integer IN_FLIGHT_LIMIT = 10;
+
+  private final EFOShardSubscribersPool pool;
+  private final String consumerArn;
+
+  // Shard id of this subscriber
+  private final String shardId;
+
+  private final KinesisAsyncClient kinesis;
+
+  /** Internal subscriber state. */
+  private volatile State state = INITIALIZED;
+
+  /**
+   * Kept only for cases when a subscription starts and then fails with a non-critical error, before
+   * any event updates {@link ShardEventsSubscriber#sequenceNumber}.
+   */
+  private @MonotonicNonNull StartingPosition initialPosition;
+
+  /**
+   * Completes once this shard subscriber is done, either normally (stopped or shard is completely
+   * consumed) or exceptionally due to a non retry-able error.
+   */
+  private final CompletableFuture<Void> done = new CompletableFuture<>();
+
+  private final ShardEventsSubscriber eventsSubscriber = new ShardEventsSubscriber();
+
+  /** Tracks number of delivered events in flight (until ack-ed). */
+  private final AtomicInteger inFlight = new AtomicInteger();
+
+  /**
+   * Async completion handler for {@link KinesisAsyncClient#subscribeToShard} that:
+   * <li>exists immediately if {@link #done} is already completed (exceptionally),
+   * <li>re-subscribes at {@link ShardEventsSubscriber#sequenceNumber} for retryable errors such as
+   *     retryable {@link SdkException}, {@link ClosedChannelException}, {@link ChannelException},
+   *     {@link TimeoutException} (any of these might be wrapped in {@link CompletionException}s)
+   * <li>or completes {@link #done} exceptionally for any other error,
+   * <li>completes {@link #done} normally if subscriber {@link #state} is {@link State#STOPPED} or
+   *     if shard completed (no further {@link ShardEventsSubscriber#sequenceNumber}),
+   * <li>or otherwise re-subscribes at {@link ShardEventsSubscriber#sequenceNumber}.
+   */
+  private final BiConsumer<Void, Throwable> reSubscriptionHandler;
+
+  private static boolean isRetryable(Throwable error) {
+    Throwable cause = unwrapCompletionException(error);
+    if (cause instanceof SdkException && ((SdkException) cause).retryable()) {
+      return true; // retryable SDK exception
+    }
+    // check the root cause for issues that can be addressed using retries
+    cause = Throwables.getRootCause(cause);
+    return cause instanceof ClosedChannelException // Java Nio
+        || cause instanceof TimeoutException // Java
+        || cause instanceof ChannelException; // Netty (e.g. ReadTimeoutException)
+  }
+
+  /** Loops through completion exceptions until we get the underlying cause. */
+  private static Throwable unwrapCompletionException(Throwable completionException) {
+    Throwable current = completionException;
+    while (current instanceof CompletionException) {
+      Throwable cause = current.getCause();
+      if (cause != null) {
+        current = cause;
+      } else {
+        return current;
+      }
+    }
+    return current;
+  }
+
+  @SuppressWarnings({"FutureReturnValueIgnored", "all"})
+  EFOShardSubscriber(
+      EFOShardSubscribersPool pool,
+      String shardId,
+      KinesisIO.Read read,
+      String consumerArn,
+      KinesisAsyncClient kinesis,
+      int onErrorCoolDownMs) {
+    this.pool = pool;
+    this.consumerArn = consumerArn;
+    this.shardId = shardId;
+    this.kinesis = kinesis;
+    this.reSubscriptionHandler =
+        (Void unused, Throwable error) -> {
+          eventsSubscriber.cancel();
+
+          if (error != null && !isRetryable(error)) {
+            done.completeExceptionally(error);
+            return;
+          }
+
+          if (error != null && isRetryable(error) && state != STOPPED) {
+            String lastContinuationSequenceNumber = eventsSubscriber.sequenceNumber;
+            if (inFlight.get() == IN_FLIGHT_LIMIT) {
+              state = PAUSED;
+            } else {
+              if (lastContinuationSequenceNumber != null) {
+                pool.delayedTask(

Review Comment:
   Nitpick, I'd suggest to add comments explaining both delayed re-subscriptions



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIOOptions.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.kinesis;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.auto.service.AutoService;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+
+/**
+ * PipelineOptions for {@link KinesisIO}.
+ *
+ * <p>Allows passing modify-able configurations in cases when some runner implementation persists
+ * {@link KinesisIO.Read} serialized objects. Adding new configurations to this class should be
+ * exceptional, and standard {@link KinesisIO.Read} / {@link KinesisIO.Write} should be preferred in
+ * most of the cases.
+ *
+ * <p>This class appeared during the implementation of EFO consumer. In Flink runner, {@link
+ * KinesisIO.Read} is serialized with the entire {@link KinesisSource} object which was a trouble
+ * for EFO feature design: if consumer ARN is part of KinesisIO.Read object, when started from a
+ * Flink savepoint, consumer ARN string or null value would be forced from the savepoint.
+ *
+ * <p>Consequences of this are:
+ *
+ * <ol>
+ *   <li>Once a Kinesis source is started, its consumer ARN can't be changed without loosing state
+ *       (checkpoint-ed shard progress).
+ *   <li>Kinesis source can not have seamless enabling / disabling of EFO feature without loosing
+ *       state (checkpoint-ed shard progress).
+ * </ol>
+ *
+ * <p>This {@link PipelineOptions} extension allows having modifiable configurations for {@link
+ * org.apache.beam.sdk.io.UnboundedSource#split(int, PipelineOptions)} and {@link
+ * org.apache.beam.sdk.io.UnboundedSource#createReader(PipelineOptions,
+ * UnboundedSource.CheckpointMark)}, which is essential for seamless EFO switch on / off.
+ */
+@Experimental(Kind.SOURCE_SINK)
+public interface KinesisIOOptions extends PipelineOptions {
+  /**
+   * Used to enable / disable EFO.
+   *
+   * <p>Example:
+   *
+   * <pre>{@code --kinesisIOConsumerArns={
+   *   "stream-01": "arn:aws:kinesis:...:stream/stream-01/consumer/consumer-01:1678576714",
+   *   "stream-02": "arn:aws:kinesis:...:stream/stream-02/consumer/my-consumer:1679576982",
+   *   ...
+   * }}</pre>
+   */
+  @Description("Mapping of streams' names to consumer ARNs of those streams.")
+  @Default.InstanceFactory(MapFactory.class)
+  Map<String, String> getKinesisIOConsumerArns();
+
+  void setKinesisIOConsumerArns(Map<String, String> value);
+
+  class MapFactory implements DefaultValueFactory<HashMap<String, String>> {
+
+    @Override
+    public HashMap<String, String> create(PipelineOptions options) {
+      return new HashMap<>();

Review Comment:
   Could be `ImmutableMap.of()`



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscribersPool.java:
##########
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.kinesis;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ForwardingIterator;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ChildShard;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import software.amazon.kinesis.common.InitialPositionInStream;
+import software.amazon.kinesis.retrieval.AggregatorUtil;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+@SuppressWarnings({"nullness"})
+class EFOShardSubscribersPool {
+  private static final Logger LOG = LoggerFactory.getLogger(EFOShardSubscribersPool.class);
+  private static final int ON_ERROR_COOL_DOWN_MS_DEFAULT = 1_000;
+  private final int onErrorCoolDownMs;
+
+  /**
+   * Identifier of the current subscribers pool.
+   *
+   * <p>Injected into other objects which belong to this pool to ease tracing with logs.
+   */
+  private final UUID poolId;
+
+  private final KinesisIO.Read read;
+  private final String consumerArn;
+  private final KinesisAsyncClient kinesis;
+
+  /**
+   * Unbounded queue of events, but events in-flight are limited by the {@link EFOShardSubscriber}.
+   */
+  private final ConcurrentLinkedQueue<EventRecords> eventQueue = new ConcurrentLinkedQueue<>();
+
+  /**
+   * State map of currently active shards that can be checkpoint-ed.
+   *
+   * <p>This map may only be accessed and updated from within {@link #start}, {@link #getNextRecord}
+   * and dependent {@link #onEventDone} to prevent race conditions.
+   */
+  private final Map<String, ShardState> state = new HashMap<>();
+
+  /**
+   * Async subscription error (as first seen), if set all subscribers must be cancelled and no new
+   * ones started.
+   *
+   * <p>Must be volatile as it is accessed from various threads. But it's best effort, setting this
+   * doesn't have to be atomic.
+   */
+  private volatile @MonotonicNonNull Throwable subscriptionError;
+
+  /**
+   * May only ever be altered from within {@link #stop()} or {@link #getNextRecord()} to prevent
+   * race conditions when cancelling subscribers.
+   */
+  private boolean isStopped = false;
+
+  /**
+   * Async completion callback handling {@link EFOShardSubscriber#subscribe supscriptions} that
+   * terminate exceptionally.
+   *
+   * <p>Unless already in error state, stores error as {@link #subscriptionError}. This pool will be
+   * stopped when {@link #getNextRecord()} is called next, but allowing the {@link #eventQueue} to
+   * be drained. Only once empty any {@link #subscriptionError} is propagated. This simplifies state
+   * management and checkpointing a lot.
+   */
+  private final BiConsumer<Void, Throwable> errorHandler =
+      (Void unused, Throwable error) -> {
+        if (error != null && subscriptionError == null) {
+          subscriptionError = error;
+        }
+      };
+
+  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+  // EventRecords iterator that is currently consumed
+  @Nullable EventRecords current = null;
+
+  private final WatermarkPolicy watermarkPolicy;
+
+  EFOShardSubscribersPool(KinesisIO.Read readSpec, String consumerArn, KinesisAsyncClient kinesis) {
+    this.poolId = UUID.randomUUID();
+    this.read = readSpec;
+    this.consumerArn = consumerArn;
+    this.kinesis = kinesis;
+    this.watermarkPolicy = read.getWatermarkPolicyFactory().createWatermarkPolicy();
+    this.onErrorCoolDownMs = ON_ERROR_COOL_DOWN_MS_DEFAULT;
+  }
+
+  EFOShardSubscribersPool(
+      KinesisIO.Read readSpec,
+      String consumerArn,
+      KinesisAsyncClient kinesis,
+      int onErrorCoolDownMs) {
+    this.poolId = UUID.randomUUID();
+    this.read = readSpec;
+    this.consumerArn = consumerArn;
+    this.kinesis = kinesis;
+    this.watermarkPolicy = read.getWatermarkPolicyFactory().createWatermarkPolicy();
+    this.onErrorCoolDownMs = onErrorCoolDownMs;
+  }
+
+  /**
+   * Starts a subscribers pool by starting a {@link EFOShardSubscriber#subscribe shard subscription}
+   * for each {@link ShardCheckpoint} with the subscription {@link #errorHandler} callback.
+   *
+   * <p>{@link EFOShardSubscriber}s with their respective state are tracked in {@link #state}.
+   */
+  void start(Iterable<ShardCheckpoint> checkpoints) {
+    LOG.info(
+        "Starting pool {} {} {}. Checkpoints = {}",
+        poolId,
+        read.getStreamName(),
+        consumerArn,
+        checkpoints);
+    for (ShardCheckpoint shardCheckpoint : checkpoints) {
+      checkState(
+          !state.containsKey(shardCheckpoint.getShardId()),
+          "Duplicate shard id %s",
+          shardCheckpoint.getShardId());
+      ShardState shardState = new ShardState(initShardSubscriber(shardCheckpoint), shardCheckpoint);
+      state.put(shardCheckpoint.getShardId(), shardState);
+    }
+  }
+
+  /**
+   * Returns the next disaggregated {@link KinesisRecord} if available and updates {@link #state}
+   * accordingly so that it reflects a mutable checkpoint AFTER returning that record.
+   *
+   * <p>Async subscription errors are delayed until {@link #eventQueue} is completely drained and
+   * then rethrown here.
+   *
+   * <p>This repeats the following steps until a record or {@code null} was returned:
+   *
+   * <ol>
+   *   <li>If {@link #current} is null and {@link #eventQueue} is empty, return {@code null} unless
+   *       {@link #subscriptionError} is set: in that case rethrow.
+   *   <li>Otherwise if {@link #current} is null, poll next from {@link #eventQueue}.
+   *   <li>If {@link #current} has a next {@link KinesisClientRecord}, update {@link #state}
+   *       accordingly and return the corresponding converted {@link KinesisRecord}, optionally
+   *       triggering {@link #onEventDone} if that was the last record of {@link #current}.
+   *   <li>Finally, if nothing was returned yet, trigger {@link #onEventDone} and continue loop.
+   * </ol>
+   *
+   * <p>It polls the {@link #eventQueue} in a while loop to avoid returning null immediately if an
+   * event without records arrived. There may be events with records after the {@link #current}, and
+   * it is better to poll again instead of having {@link EFOKinesisReader#advance()} signalling
+   * false to Beam. Otherwise, Beam would poll again later, which would introduce unnecessary delay.
+   */
+  @Nullable
+  KinesisRecord getNextRecord() throws IOException {
+    while (true) {
+      if (!isStopped && subscriptionError != null) {
+        // Stop the pool to cancel all subscribers and prevent new subscriptions.
+        // Doing this as part of getNextRecord() avoids concurrent access to the state map and
+        // prevents any related issues.
+        stop();
+      }
+
+      if (current == null) {
+        current = eventQueue.poll();
+      }
+
+      if (current != null) {
+        String shardId = current.shardId;
+        ShardState shardState = Preconditions.checkStateNotNull(state.get(shardId));
+        if (current.hasNext()) {
+          KinesisClientRecord r = current.next();
+          KinesisRecord kinesisRecord = new KinesisRecord(r, read.getStreamName(), shardId);
+          if (shardState.recordWasNotCheckPointedYet(kinesisRecord)) {
+            shardState.update(r);
+            watermarkPolicy.update(kinesisRecord);
+            return kinesisRecord;
+          }
+          // Make sure to update shard state accordingly if `current` does not contain any more
+          // events. This is necessary to account for any re-sharding, so we could correctly resume
+          // from a checkpoint if taken once we advanced to the record returned by getNextRecord().
+          if (!current.hasNext()) {
+            onEventDone(shardState, current);
+            current = null;
+          }
+        } else {
+          onEventDone(shardState, current);
+          current = null;
+        }
+      } else if (subscriptionError != null) {
+        stop();
+        throw new IOException(subscriptionError);
+      } else {
+        return null; // no record available, queue is empty
+      }
+    }
+  }
+
+  /**
+   * Unsets {@link #current} and updates {@link #state} accordingly.
+   *
+   * <p>If {@link SubscribeToShardEvent#continuationSequenceNumber()} is defined, update {@link
+   * ShardState} accordingly. Otherwise, or if {@link SubscribeToShardEvent#childShards()} exists,
+   * handle re-sharding: remove old shard from {@link #state} and add new ones at TRIM_HORIZON.
+   *
+   * <p>In case of re-sharding, start all new {@link EFOShardSubscriber#subscribe subscriptions}
+   * with the subscription {@link #errorHandler} if there is no {@link #subscriptionError} yet.
+   */
+  private void onEventDone(ShardState shardState, EventRecords noRecordsEvent) {
+    if (noRecordsEvent.event.continuationSequenceNumber() == null
+        && noRecordsEvent.event.hasChildShards()) {
+      LOG.info("Processing re-shard signal {}", noRecordsEvent.event);
+      List<String> successorShardsIds = computeSuccessorShardsIds(noRecordsEvent);
+      for (String successorShardId : successorShardsIds) {
+        ShardCheckpoint newCheckpoint =
+            new ShardCheckpoint(
+                read.getStreamName(),
+                successorShardId,
+                new StartingPoint(InitialPositionInStream.TRIM_HORIZON));
+        state.computeIfAbsent(
+            successorShardId,
+            id -> new ShardState(initShardSubscriber(newCheckpoint), newCheckpoint));
+      }
+
+      state.remove(noRecordsEvent.shardId);
+    } else {
+      shardState.update(noRecordsEvent);
+    }
+  }
+
+  /**
+   * Always initialize a new subscriber to make sure checkpoints will be correct. But only start the
+   * subscription if there is no {@link #subscriptionError}.
+   */
+  @SuppressWarnings("FutureReturnValueIgnored")
+  private EFOShardSubscriber initShardSubscriber(ShardCheckpoint cp) {
+    EFOShardSubscriber subscriber =
+        new EFOShardSubscriber(
+            this, cp.getShardId(), read, consumerArn, kinesis, onErrorCoolDownMs);
+    StartingPosition startingPosition = cp.toEFOStartingPosition();
+    if (subscriptionError == null) {
+      subscriber.subscribe(startingPosition).whenCompleteAsync(errorHandler);
+    }
+    return subscriber;
+  }
+
+  private static List<String> computeSuccessorShardsIds(EventRecords records) {
+    List<String> successorShardsIds = new ArrayList<>();
+    SubscribeToShardEvent event = records.event;
+    for (ChildShard childShard : event.childShards()) {
+      if (childShard.parentShards().contains(records.shardId)) {
+        if (childShard.parentShards().size() > 1) {
+          // This is the case of merging two shards into one.
+          // When there are 2 parent shards, we only pick it up if
+          // its max shard equals to sender shard ID.
+          String maxParentId = childShard.parentShards().stream().max(String::compareTo).get();
+          if (records.shardId.equals(maxParentId)) {
+            successorShardsIds.add(childShard.shardId());
+          }
+        } else {
+          // This is the case when shard is split - we must add both
+          // and start subscriptions for them.
+          successorShardsIds.add(childShard.shardId());
+        }
+      }
+    }
+
+    if (successorShardsIds.isEmpty()) {
+      LOG.info("Found no successors for shard {}", records.shardId);

Review Comment:
   Nitpick, add poolid here and below?



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java:
##########
@@ -147,6 +149,92 @@
  * Read#withCustomRateLimitPolicy(RateLimitPolicyFactory)}. This requires implementing {@link
  * RateLimitPolicy} with a corresponding {@link RateLimitPolicyFactory}.
  *
+ * <h4>Enhanced Fan-Out</h4>
+ *
+ * Kinesis IO supports Consumers with Dedicated Throughput (Enhanced Fan-Out, EFO). This type of
+ * consumer doesn't have to contend with other consumers that are receiving data from the stream.
+ *
+ * <p>More details can be found here: <a
+ * href="https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html">Consumers with
+ * Dedicated Throughput</a>
+ *
+ * <p>Primary method of enabling EFO is setting {@link Read#withConsumerArn(String)}:
+ *
+ * <pre>{@code
+ * p.apply(KinesisIO.read()
+ *    .withStreamName("streamName")
+ *    .withInitialPositionInStream(InitialPositionInStream.LATEST)
+ *    .withConsumerArn("arn:aws:kinesis:.../streamConsumer:12345678"))
+ * }</pre>
+ *
+ * <p>Alternatively, EFO can be enabled for one or more {@link Read} instances via pipeline options:
+ *
+ * <pre>{@code --kinesisIOConsumerArns{
+ *   "stream-01": "arn:aws:kinesis:...:stream/stream-01/consumer/consumer-01:1678576714",
+ *   "stream-02": "arn:aws:kinesis:...:stream/stream-02/consumer/my-consumer:1679576982",
+ *   ...
+ * }}</pre>
+ *
+ * <p>If set, pipeline options will overwrite {@link Read#withConsumerArn(String)} setting. Check
+ * {@link KinesisIOOptions} for more details.
+ *
+ * <p>Depending on the downstream processing performance, the EFO consumer will back-pressure
+ * internally.
+ *
+ * <p>Adjusting runner's settings is recommended - such that it does not (re)start EFO consumer(s)

Review Comment:
   Not sure I fully understand what you mean here.



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java:
##########
@@ -147,6 +149,92 @@
  * Read#withCustomRateLimitPolicy(RateLimitPolicyFactory)}. This requires implementing {@link
  * RateLimitPolicy} with a corresponding {@link RateLimitPolicyFactory}.
  *
+ * <h4>Enhanced Fan-Out</h4>
+ *
+ * Kinesis IO supports Consumers with Dedicated Throughput (Enhanced Fan-Out, EFO). This type of
+ * consumer doesn't have to contend with other consumers that are receiving data from the stream.
+ *
+ * <p>More details can be found here: <a
+ * href="https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html">Consumers with
+ * Dedicated Throughput</a>
+ *
+ * <p>Primary method of enabling EFO is setting {@link Read#withConsumerArn(String)}:
+ *
+ * <pre>{@code
+ * p.apply(KinesisIO.read()
+ *    .withStreamName("streamName")
+ *    .withInitialPositionInStream(InitialPositionInStream.LATEST)
+ *    .withConsumerArn("arn:aws:kinesis:.../streamConsumer:12345678"))
+ * }</pre>
+ *
+ * <p>Alternatively, EFO can be enabled for one or more {@link Read} instances via pipeline options:
+ *
+ * <pre>{@code --kinesisIOConsumerArns{
+ *   "stream-01": "arn:aws:kinesis:...:stream/stream-01/consumer/consumer-01:1678576714",
+ *   "stream-02": "arn:aws:kinesis:...:stream/stream-02/consumer/my-consumer:1679576982",
+ *   ...
+ * }}</pre>
+ *
+ * <p>If set, pipeline options will overwrite {@link Read#withConsumerArn(String)} setting. Check
+ * {@link KinesisIOOptions} for more details.
+ *
+ * <p>Depending on the downstream processing performance, the EFO consumer will back-pressure
+ * internally.
+ *
+ * <p>Adjusting runner's settings is recommended - such that it does not (re)start EFO consumer(s)
+ * faster than once per ~ 10 seconds. Internal calls to {@link
+ * KinesisAsyncClient#subscribeToShard(SubscribeToShardRequest, SubscribeToShardResponseHandler)}
+ * may throw ResourceInUseException otherwise, which will cause a crash loop.
+ *
+ * <p>EFO source, when consuming from a stream with often re-sharding, may eventually get skewed

Review Comment:
   I think this is more of a Kinsesis issue than of the Beam source, right?



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscribersPool.java:
##########
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.kinesis;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ForwardingIterator;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ChildShard;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import software.amazon.kinesis.common.InitialPositionInStream;
+import software.amazon.kinesis.retrieval.AggregatorUtil;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+@SuppressWarnings({"nullness"})
+class EFOShardSubscribersPool {
+  private static final Logger LOG = LoggerFactory.getLogger(EFOShardSubscribersPool.class);
+  private static final int ON_ERROR_COOL_DOWN_MS_DEFAULT = 1_000;
+  private final int onErrorCoolDownMs;
+
+  /**
+   * Identifier of the current subscribers pool.
+   *
+   * <p>Injected into other objects which belong to this pool to ease tracing with logs.
+   */
+  private final UUID poolId;
+
+  private final KinesisIO.Read read;
+  private final String consumerArn;
+  private final KinesisAsyncClient kinesis;
+
+  /**
+   * Unbounded queue of events, but events in-flight are limited by the {@link EFOShardSubscriber}.
+   */
+  private final ConcurrentLinkedQueue<EventRecords> eventQueue = new ConcurrentLinkedQueue<>();
+
+  /**
+   * State map of currently active shards that can be checkpoint-ed.
+   *
+   * <p>This map may only be accessed and updated from within {@link #start}, {@link #getNextRecord}
+   * and dependent {@link #onEventDone} to prevent race conditions.
+   */
+  private final Map<String, ShardState> state = new HashMap<>();
+
+  /**
+   * Async subscription error (as first seen), if set all subscribers must be cancelled and no new
+   * ones started.
+   *
+   * <p>Must be volatile as it is accessed from various threads. But it's best effort, setting this
+   * doesn't have to be atomic.
+   */
+  private volatile @MonotonicNonNull Throwable subscriptionError;
+
+  /**
+   * May only ever be altered from within {@link #stop()} or {@link #getNextRecord()} to prevent
+   * race conditions when cancelling subscribers.
+   */
+  private boolean isStopped = false;
+
+  /**
+   * Async completion callback handling {@link EFOShardSubscriber#subscribe supscriptions} that
+   * terminate exceptionally.
+   *
+   * <p>Unless already in error state, stores error as {@link #subscriptionError}. This pool will be
+   * stopped when {@link #getNextRecord()} is called next, but allowing the {@link #eventQueue} to
+   * be drained. Only once empty any {@link #subscriptionError} is propagated. This simplifies state
+   * management and checkpointing a lot.
+   */
+  private final BiConsumer<Void, Throwable> errorHandler =
+      (Void unused, Throwable error) -> {
+        if (error != null && subscriptionError == null) {
+          subscriptionError = error;
+        }
+      };
+
+  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+  // EventRecords iterator that is currently consumed
+  @Nullable EventRecords current = null;
+
+  private final WatermarkPolicy watermarkPolicy;
+
+  EFOShardSubscribersPool(KinesisIO.Read readSpec, String consumerArn, KinesisAsyncClient kinesis) {
+    this.poolId = UUID.randomUUID();
+    this.read = readSpec;
+    this.consumerArn = consumerArn;
+    this.kinesis = kinesis;
+    this.watermarkPolicy = read.getWatermarkPolicyFactory().createWatermarkPolicy();
+    this.onErrorCoolDownMs = ON_ERROR_COOL_DOWN_MS_DEFAULT;
+  }
+
+  EFOShardSubscribersPool(
+      KinesisIO.Read readSpec,
+      String consumerArn,
+      KinesisAsyncClient kinesis,
+      int onErrorCoolDownMs) {
+    this.poolId = UUID.randomUUID();
+    this.read = readSpec;
+    this.consumerArn = consumerArn;
+    this.kinesis = kinesis;
+    this.watermarkPolicy = read.getWatermarkPolicyFactory().createWatermarkPolicy();
+    this.onErrorCoolDownMs = onErrorCoolDownMs;
+  }
+
+  /**
+   * Starts a subscribers pool by starting a {@link EFOShardSubscriber#subscribe shard subscription}
+   * for each {@link ShardCheckpoint} with the subscription {@link #errorHandler} callback.
+   *
+   * <p>{@link EFOShardSubscriber}s with their respective state are tracked in {@link #state}.
+   */
+  void start(Iterable<ShardCheckpoint> checkpoints) {
+    LOG.info(
+        "Starting pool {} {} {}. Checkpoints = {}",
+        poolId,
+        read.getStreamName(),
+        consumerArn,
+        checkpoints);
+    for (ShardCheckpoint shardCheckpoint : checkpoints) {
+      checkState(
+          !state.containsKey(shardCheckpoint.getShardId()),
+          "Duplicate shard id %s",
+          shardCheckpoint.getShardId());
+      ShardState shardState = new ShardState(initShardSubscriber(shardCheckpoint), shardCheckpoint);
+      state.put(shardCheckpoint.getShardId(), shardState);
+    }
+  }
+
+  /**
+   * Returns the next disaggregated {@link KinesisRecord} if available and updates {@link #state}
+   * accordingly so that it reflects a mutable checkpoint AFTER returning that record.
+   *
+   * <p>Async subscription errors are delayed until {@link #eventQueue} is completely drained and
+   * then rethrown here.
+   *
+   * <p>This repeats the following steps until a record or {@code null} was returned:
+   *
+   * <ol>
+   *   <li>If {@link #current} is null and {@link #eventQueue} is empty, return {@code null} unless
+   *       {@link #subscriptionError} is set: in that case rethrow.
+   *   <li>Otherwise if {@link #current} is null, poll next from {@link #eventQueue}.
+   *   <li>If {@link #current} has a next {@link KinesisClientRecord}, update {@link #state}
+   *       accordingly and return the corresponding converted {@link KinesisRecord}, optionally
+   *       triggering {@link #onEventDone} if that was the last record of {@link #current}.
+   *   <li>Finally, if nothing was returned yet, trigger {@link #onEventDone} and continue loop.
+   * </ol>
+   *
+   * <p>It polls the {@link #eventQueue} in a while loop to avoid returning null immediately if an
+   * event without records arrived. There may be events with records after the {@link #current}, and
+   * it is better to poll again instead of having {@link EFOKinesisReader#advance()} signalling
+   * false to Beam. Otherwise, Beam would poll again later, which would introduce unnecessary delay.
+   */
+  @Nullable
+  KinesisRecord getNextRecord() throws IOException {
+    while (true) {
+      if (!isStopped && subscriptionError != null) {
+        // Stop the pool to cancel all subscribers and prevent new subscriptions.
+        // Doing this as part of getNextRecord() avoids concurrent access to the state map and
+        // prevents any related issues.
+        stop();
+      }
+
+      if (current == null) {
+        current = eventQueue.poll();
+      }
+
+      if (current != null) {
+        String shardId = current.shardId;
+        ShardState shardState = Preconditions.checkStateNotNull(state.get(shardId));
+        if (current.hasNext()) {
+          KinesisClientRecord r = current.next();
+          KinesisRecord kinesisRecord = new KinesisRecord(r, read.getStreamName(), shardId);
+          if (shardState.recordWasNotCheckPointedYet(kinesisRecord)) {
+            shardState.update(r);
+            watermarkPolicy.update(kinesisRecord);

Review Comment:
   I actually haven't looked close enough into watermarks yet, I had wanted to do that multiple times but always forgot. Particularly when backfilling / consuming from the past I expect we have an issue here if a pool contains multiple shards. Watermarks of the different shards don't necessarily progress in sync.
   
   Two options, we can try and sync processing so watermarks of each shard match as closely as possible. That's a bit more complex. Or, alternatively we track watermarks per shard and return the min watermark of these. As far as I know that's how it's done in the non EFO source.
   



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscribersPool.java:
##########
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.kinesis;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ForwardingIterator;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ChildShard;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import software.amazon.kinesis.common.InitialPositionInStream;
+import software.amazon.kinesis.retrieval.AggregatorUtil;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+@SuppressWarnings({"nullness"})
+class EFOShardSubscribersPool {
+  private static final Logger LOG = LoggerFactory.getLogger(EFOShardSubscribersPool.class);
+  private static final int ON_ERROR_COOL_DOWN_MS_DEFAULT = 1_000;
+  private final int onErrorCoolDownMs;
+
+  /**
+   * Identifier of the current subscribers pool.
+   *
+   * <p>Injected into other objects which belong to this pool to ease tracing with logs.
+   */
+  private final UUID poolId;
+
+  private final KinesisIO.Read read;
+  private final String consumerArn;
+  private final KinesisAsyncClient kinesis;
+
+  /**
+   * Unbounded queue of events, but events in-flight are limited by the {@link EFOShardSubscriber}.
+   */
+  private final ConcurrentLinkedQueue<EventRecords> eventQueue = new ConcurrentLinkedQueue<>();
+
+  /**
+   * State map of currently active shards that can be checkpoint-ed.
+   *
+   * <p>This map may only be accessed and updated from within {@link #start}, {@link #getNextRecord}
+   * and dependent {@link #onEventDone} to prevent race conditions.
+   */
+  private final Map<String, ShardState> state = new HashMap<>();
+
+  /**
+   * Async subscription error (as first seen), if set all subscribers must be cancelled and no new
+   * ones started.
+   *
+   * <p>Must be volatile as it is accessed from various threads. But it's best effort, setting this
+   * doesn't have to be atomic.
+   */
+  private volatile @MonotonicNonNull Throwable subscriptionError;
+
+  /**
+   * May only ever be altered from within {@link #stop()} or {@link #getNextRecord()} to prevent
+   * race conditions when cancelling subscribers.
+   */
+  private boolean isStopped = false;
+
+  /**
+   * Async completion callback handling {@link EFOShardSubscriber#subscribe supscriptions} that
+   * terminate exceptionally.
+   *
+   * <p>Unless already in error state, stores error as {@link #subscriptionError}. This pool will be
+   * stopped when {@link #getNextRecord()} is called next, but allowing the {@link #eventQueue} to
+   * be drained. Only once empty any {@link #subscriptionError} is propagated. This simplifies state
+   * management and checkpointing a lot.
+   */
+  private final BiConsumer<Void, Throwable> errorHandler =
+      (Void unused, Throwable error) -> {
+        if (error != null && subscriptionError == null) {
+          subscriptionError = error;
+        }
+      };
+
+  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+  // EventRecords iterator that is currently consumed
+  @Nullable EventRecords current = null;
+
+  private final WatermarkPolicy watermarkPolicy;
+
+  EFOShardSubscribersPool(KinesisIO.Read readSpec, String consumerArn, KinesisAsyncClient kinesis) {
+    this.poolId = UUID.randomUUID();
+    this.read = readSpec;
+    this.consumerArn = consumerArn;
+    this.kinesis = kinesis;
+    this.watermarkPolicy = read.getWatermarkPolicyFactory().createWatermarkPolicy();
+    this.onErrorCoolDownMs = ON_ERROR_COOL_DOWN_MS_DEFAULT;
+  }
+
+  EFOShardSubscribersPool(
+      KinesisIO.Read readSpec,
+      String consumerArn,
+      KinesisAsyncClient kinesis,
+      int onErrorCoolDownMs) {
+    this.poolId = UUID.randomUUID();
+    this.read = readSpec;
+    this.consumerArn = consumerArn;
+    this.kinesis = kinesis;
+    this.watermarkPolicy = read.getWatermarkPolicyFactory().createWatermarkPolicy();
+    this.onErrorCoolDownMs = onErrorCoolDownMs;
+  }
+
+  /**
+   * Starts a subscribers pool by starting a {@link EFOShardSubscriber#subscribe shard subscription}
+   * for each {@link ShardCheckpoint} with the subscription {@link #errorHandler} callback.
+   *
+   * <p>{@link EFOShardSubscriber}s with their respective state are tracked in {@link #state}.
+   */
+  void start(Iterable<ShardCheckpoint> checkpoints) {
+    LOG.info(
+        "Starting pool {} {} {}. Checkpoints = {}",
+        poolId,
+        read.getStreamName(),
+        consumerArn,
+        checkpoints);
+    for (ShardCheckpoint shardCheckpoint : checkpoints) {
+      checkState(
+          !state.containsKey(shardCheckpoint.getShardId()),
+          "Duplicate shard id %s",
+          shardCheckpoint.getShardId());
+      ShardState shardState = new ShardState(initShardSubscriber(shardCheckpoint), shardCheckpoint);
+      state.put(shardCheckpoint.getShardId(), shardState);
+    }
+  }
+
+  /**
+   * Returns the next disaggregated {@link KinesisRecord} if available and updates {@link #state}
+   * accordingly so that it reflects a mutable checkpoint AFTER returning that record.
+   *
+   * <p>Async subscription errors are delayed until {@link #eventQueue} is completely drained and
+   * then rethrown here.
+   *
+   * <p>This repeats the following steps until a record or {@code null} was returned:
+   *
+   * <ol>
+   *   <li>If {@link #current} is null and {@link #eventQueue} is empty, return {@code null} unless
+   *       {@link #subscriptionError} is set: in that case rethrow.
+   *   <li>Otherwise if {@link #current} is null, poll next from {@link #eventQueue}.
+   *   <li>If {@link #current} has a next {@link KinesisClientRecord}, update {@link #state}
+   *       accordingly and return the corresponding converted {@link KinesisRecord}, optionally
+   *       triggering {@link #onEventDone} if that was the last record of {@link #current}.
+   *   <li>Finally, if nothing was returned yet, trigger {@link #onEventDone} and continue loop.
+   * </ol>
+   *
+   * <p>It polls the {@link #eventQueue} in a while loop to avoid returning null immediately if an
+   * event without records arrived. There may be events with records after the {@link #current}, and
+   * it is better to poll again instead of having {@link EFOKinesisReader#advance()} signalling
+   * false to Beam. Otherwise, Beam would poll again later, which would introduce unnecessary delay.
+   */
+  @Nullable
+  KinesisRecord getNextRecord() throws IOException {
+    while (true) {
+      if (!isStopped && subscriptionError != null) {
+        // Stop the pool to cancel all subscribers and prevent new subscriptions.
+        // Doing this as part of getNextRecord() avoids concurrent access to the state map and
+        // prevents any related issues.
+        stop();
+      }
+
+      if (current == null) {
+        current = eventQueue.poll();
+      }
+
+      if (current != null) {
+        String shardId = current.shardId;
+        ShardState shardState = Preconditions.checkStateNotNull(state.get(shardId));
+        if (current.hasNext()) {
+          KinesisClientRecord r = current.next();
+          KinesisRecord kinesisRecord = new KinesisRecord(r, read.getStreamName(), shardId);
+          if (shardState.recordWasNotCheckPointedYet(kinesisRecord)) {
+            shardState.update(r);
+            watermarkPolicy.update(kinesisRecord);
+            return kinesisRecord;
+          }
+          // Make sure to update shard state accordingly if `current` does not contain any more
+          // events. This is necessary to account for any re-sharding, so we could correctly resume
+          // from a checkpoint if taken once we advanced to the record returned by getNextRecord().
+          if (!current.hasNext()) {
+            onEventDone(shardState, current);
+            current = null;
+          }
+        } else {
+          onEventDone(shardState, current);
+          current = null;
+        }
+      } else if (subscriptionError != null) {
+        stop();
+        throw new IOException(subscriptionError);
+      } else {
+        return null; // no record available, queue is empty
+      }
+    }
+  }
+
+  /**
+   * Unsets {@link #current} and updates {@link #state} accordingly.
+   *
+   * <p>If {@link SubscribeToShardEvent#continuationSequenceNumber()} is defined, update {@link
+   * ShardState} accordingly. Otherwise, or if {@link SubscribeToShardEvent#childShards()} exists,
+   * handle re-sharding: remove old shard from {@link #state} and add new ones at TRIM_HORIZON.
+   *
+   * <p>In case of re-sharding, start all new {@link EFOShardSubscriber#subscribe subscriptions}
+   * with the subscription {@link #errorHandler} if there is no {@link #subscriptionError} yet.
+   */
+  private void onEventDone(ShardState shardState, EventRecords noRecordsEvent) {
+    if (noRecordsEvent.event.continuationSequenceNumber() == null
+        && noRecordsEvent.event.hasChildShards()) {
+      LOG.info("Processing re-shard signal {}", noRecordsEvent.event);
+      List<String> successorShardsIds = computeSuccessorShardsIds(noRecordsEvent);
+      for (String successorShardId : successorShardsIds) {
+        ShardCheckpoint newCheckpoint =
+            new ShardCheckpoint(
+                read.getStreamName(),
+                successorShardId,
+                new StartingPoint(InitialPositionInStream.TRIM_HORIZON));
+        state.computeIfAbsent(
+            successorShardId,
+            id -> new ShardState(initShardSubscriber(newCheckpoint), newCheckpoint));
+      }
+
+      state.remove(noRecordsEvent.shardId);
+    } else {
+      shardState.update(noRecordsEvent);
+    }
+  }
+
+  /**
+   * Always initialize a new subscriber to make sure checkpoints will be correct. But only start the
+   * subscription if there is no {@link #subscriptionError}.
+   */
+  @SuppressWarnings("FutureReturnValueIgnored")
+  private EFOShardSubscriber initShardSubscriber(ShardCheckpoint cp) {
+    EFOShardSubscriber subscriber =
+        new EFOShardSubscriber(
+            this, cp.getShardId(), read, consumerArn, kinesis, onErrorCoolDownMs);
+    StartingPosition startingPosition = cp.toEFOStartingPosition();
+    if (subscriptionError == null) {
+      subscriber.subscribe(startingPosition).whenCompleteAsync(errorHandler);
+    }
+    return subscriber;
+  }
+
+  private static List<String> computeSuccessorShardsIds(EventRecords records) {
+    List<String> successorShardsIds = new ArrayList<>();
+    SubscribeToShardEvent event = records.event;
+    for (ChildShard childShard : event.childShards()) {
+      if (childShard.parentShards().contains(records.shardId)) {
+        if (childShard.parentShards().size() > 1) {
+          // This is the case of merging two shards into one.
+          // When there are 2 parent shards, we only pick it up if
+          // its max shard equals to sender shard ID.
+          String maxParentId = childShard.parentShards().stream().max(String::compareTo).get();
+          if (records.shardId.equals(maxParentId)) {
+            successorShardsIds.add(childShard.shardId());
+          }
+        } else {
+          // This is the case when shard is split - we must add both
+          // and start subscriptions for them.
+          successorShardsIds.add(childShard.shardId());
+        }
+      }
+    }
+
+    if (successorShardsIds.isEmpty()) {
+      LOG.info("Found no successors for shard {}", records.shardId);
+    } else {
+      LOG.info("Found successors for shard {}: {}", records.shardId, successorShardsIds);
+    }
+    return successorShardsIds;
+  }
+
+  /** Adds a {@link EventRecords} iterator for shardId and event to {@link #eventQueue}. */
+  void enqueueEvent(String shardId, SubscribeToShardEvent event) {
+    eventQueue.offer(new EventRecords(shardId, event));
+  }
+
+  Instant getWatermark() {
+    return watermarkPolicy.getWatermark();

Review Comment:
   See my comment above on watermarks, we have to return an aggregated watermark from various watermarks we track per shard.



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/ShardListingUtils.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.kinesis;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.core.exception.SdkServiceException;
+import software.amazon.awssdk.core.internal.retry.SdkDefaultRetrySetting;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
+import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
+import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
+import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
+import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
+import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+import software.amazon.awssdk.services.kinesis.model.ShardFilter;
+import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
+import software.amazon.awssdk.services.kinesis.model.StreamDescriptionSummary;
+import software.amazon.kinesis.common.InitialPositionInStream;
+
+class ShardListingUtils {

Review Comment:
   Why moving this code from SimplifiedKinesisClient into a new place? It would be good to minimize changes to existing code in this PR. Could this be a follow up? Skipping over these changes for now, but can get back to this later.
   



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscribersPool.java:
##########
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.kinesis;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ForwardingIterator;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ChildShard;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import software.amazon.kinesis.common.InitialPositionInStream;
+import software.amazon.kinesis.retrieval.AggregatorUtil;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+@SuppressWarnings({"nullness"})
+class EFOShardSubscribersPool {
+  private static final Logger LOG = LoggerFactory.getLogger(EFOShardSubscribersPool.class);
+  private static final int ON_ERROR_COOL_DOWN_MS_DEFAULT = 1_000;
+  private final int onErrorCoolDownMs;
+
+  /**
+   * Identifier of the current subscribers pool.
+   *
+   * <p>Injected into other objects which belong to this pool to ease tracing with logs.
+   */
+  private final UUID poolId;
+
+  private final KinesisIO.Read read;
+  private final String consumerArn;
+  private final KinesisAsyncClient kinesis;
+
+  /**
+   * Unbounded queue of events, but events in-flight are limited by the {@link EFOShardSubscriber}.
+   */
+  private final ConcurrentLinkedQueue<EventRecords> eventQueue = new ConcurrentLinkedQueue<>();
+
+  /**
+   * State map of currently active shards that can be checkpoint-ed.
+   *
+   * <p>This map may only be accessed and updated from within {@link #start}, {@link #getNextRecord}
+   * and dependent {@link #onEventDone} to prevent race conditions.
+   */
+  private final Map<String, ShardState> state = new HashMap<>();
+
+  /**
+   * Async subscription error (as first seen), if set all subscribers must be cancelled and no new
+   * ones started.
+   *
+   * <p>Must be volatile as it is accessed from various threads. But it's best effort, setting this
+   * doesn't have to be atomic.
+   */
+  private volatile @MonotonicNonNull Throwable subscriptionError;
+
+  /**
+   * May only ever be altered from within {@link #stop()} or {@link #getNextRecord()} to prevent
+   * race conditions when cancelling subscribers.
+   */
+  private boolean isStopped = false;
+
+  /**
+   * Async completion callback handling {@link EFOShardSubscriber#subscribe supscriptions} that
+   * terminate exceptionally.
+   *
+   * <p>Unless already in error state, stores error as {@link #subscriptionError}. This pool will be
+   * stopped when {@link #getNextRecord()} is called next, but allowing the {@link #eventQueue} to
+   * be drained. Only once empty any {@link #subscriptionError} is propagated. This simplifies state
+   * management and checkpointing a lot.
+   */
+  private final BiConsumer<Void, Throwable> errorHandler =
+      (Void unused, Throwable error) -> {
+        if (error != null && subscriptionError == null) {
+          subscriptionError = error;
+        }
+      };
+
+  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+  // EventRecords iterator that is currently consumed
+  @Nullable EventRecords current = null;
+
+  private final WatermarkPolicy watermarkPolicy;
+
+  EFOShardSubscribersPool(KinesisIO.Read readSpec, String consumerArn, KinesisAsyncClient kinesis) {
+    this.poolId = UUID.randomUUID();
+    this.read = readSpec;
+    this.consumerArn = consumerArn;
+    this.kinesis = kinesis;
+    this.watermarkPolicy = read.getWatermarkPolicyFactory().createWatermarkPolicy();
+    this.onErrorCoolDownMs = ON_ERROR_COOL_DOWN_MS_DEFAULT;
+  }
+
+  EFOShardSubscribersPool(
+      KinesisIO.Read readSpec,
+      String consumerArn,
+      KinesisAsyncClient kinesis,
+      int onErrorCoolDownMs) {
+    this.poolId = UUID.randomUUID();
+    this.read = readSpec;
+    this.consumerArn = consumerArn;
+    this.kinesis = kinesis;
+    this.watermarkPolicy = read.getWatermarkPolicyFactory().createWatermarkPolicy();
+    this.onErrorCoolDownMs = onErrorCoolDownMs;
+  }
+
+  /**
+   * Starts a subscribers pool by starting a {@link EFOShardSubscriber#subscribe shard subscription}
+   * for each {@link ShardCheckpoint} with the subscription {@link #errorHandler} callback.
+   *
+   * <p>{@link EFOShardSubscriber}s with their respective state are tracked in {@link #state}.
+   */
+  void start(Iterable<ShardCheckpoint> checkpoints) {
+    LOG.info(
+        "Starting pool {} {} {}. Checkpoints = {}",
+        poolId,
+        read.getStreamName(),
+        consumerArn,
+        checkpoints);
+    for (ShardCheckpoint shardCheckpoint : checkpoints) {
+      checkState(
+          !state.containsKey(shardCheckpoint.getShardId()),
+          "Duplicate shard id %s",
+          shardCheckpoint.getShardId());
+      ShardState shardState = new ShardState(initShardSubscriber(shardCheckpoint), shardCheckpoint);
+      state.put(shardCheckpoint.getShardId(), shardState);
+    }
+  }
+
+  /**
+   * Returns the next disaggregated {@link KinesisRecord} if available and updates {@link #state}
+   * accordingly so that it reflects a mutable checkpoint AFTER returning that record.
+   *
+   * <p>Async subscription errors are delayed until {@link #eventQueue} is completely drained and
+   * then rethrown here.
+   *
+   * <p>This repeats the following steps until a record or {@code null} was returned:
+   *
+   * <ol>
+   *   <li>If {@link #current} is null and {@link #eventQueue} is empty, return {@code null} unless
+   *       {@link #subscriptionError} is set: in that case rethrow.
+   *   <li>Otherwise if {@link #current} is null, poll next from {@link #eventQueue}.
+   *   <li>If {@link #current} has a next {@link KinesisClientRecord}, update {@link #state}
+   *       accordingly and return the corresponding converted {@link KinesisRecord}, optionally
+   *       triggering {@link #onEventDone} if that was the last record of {@link #current}.
+   *   <li>Finally, if nothing was returned yet, trigger {@link #onEventDone} and continue loop.
+   * </ol>
+   *
+   * <p>It polls the {@link #eventQueue} in a while loop to avoid returning null immediately if an
+   * event without records arrived. There may be events with records after the {@link #current}, and
+   * it is better to poll again instead of having {@link EFOKinesisReader#advance()} signalling
+   * false to Beam. Otherwise, Beam would poll again later, which would introduce unnecessary delay.
+   */
+  @Nullable
+  KinesisRecord getNextRecord() throws IOException {
+    while (true) {
+      if (!isStopped && subscriptionError != null) {
+        // Stop the pool to cancel all subscribers and prevent new subscriptions.
+        // Doing this as part of getNextRecord() avoids concurrent access to the state map and
+        // prevents any related issues.
+        stop();
+      }
+
+      if (current == null) {
+        current = eventQueue.poll();
+      }
+
+      if (current != null) {
+        String shardId = current.shardId;
+        ShardState shardState = Preconditions.checkStateNotNull(state.get(shardId));
+        if (current.hasNext()) {
+          KinesisClientRecord r = current.next();
+          KinesisRecord kinesisRecord = new KinesisRecord(r, read.getStreamName(), shardId);
+          if (shardState.recordWasNotCheckPointedYet(kinesisRecord)) {
+            shardState.update(r);
+            watermarkPolicy.update(kinesisRecord);
+            return kinesisRecord;
+          }
+          // Make sure to update shard state accordingly if `current` does not contain any more
+          // events. This is necessary to account for any re-sharding, so we could correctly resume
+          // from a checkpoint if taken once we advanced to the record returned by getNextRecord().
+          if (!current.hasNext()) {
+            onEventDone(shardState, current);
+            current = null;
+          }
+        } else {
+          onEventDone(shardState, current);
+          current = null;
+        }
+      } else if (subscriptionError != null) {
+        stop();
+        throw new IOException(subscriptionError);
+      } else {
+        return null; // no record available, queue is empty
+      }
+    }
+  }
+
+  /**
+   * Unsets {@link #current} and updates {@link #state} accordingly.
+   *
+   * <p>If {@link SubscribeToShardEvent#continuationSequenceNumber()} is defined, update {@link
+   * ShardState} accordingly. Otherwise, or if {@link SubscribeToShardEvent#childShards()} exists,
+   * handle re-sharding: remove old shard from {@link #state} and add new ones at TRIM_HORIZON.
+   *
+   * <p>In case of re-sharding, start all new {@link EFOShardSubscriber#subscribe subscriptions}
+   * with the subscription {@link #errorHandler} if there is no {@link #subscriptionError} yet.
+   */
+  private void onEventDone(ShardState shardState, EventRecords noRecordsEvent) {
+    if (noRecordsEvent.event.continuationSequenceNumber() == null
+        && noRecordsEvent.event.hasChildShards()) {
+      LOG.info("Processing re-shard signal {}", noRecordsEvent.event);
+      List<String> successorShardsIds = computeSuccessorShardsIds(noRecordsEvent);
+      for (String successorShardId : successorShardsIds) {
+        ShardCheckpoint newCheckpoint =
+            new ShardCheckpoint(
+                read.getStreamName(),
+                successorShardId,
+                new StartingPoint(InitialPositionInStream.TRIM_HORIZON));
+        state.computeIfAbsent(
+            successorShardId,
+            id -> new ShardState(initShardSubscriber(newCheckpoint), newCheckpoint));
+      }
+
+      state.remove(noRecordsEvent.shardId);
+    } else {
+      shardState.update(noRecordsEvent);
+    }
+  }
+
+  /**
+   * Always initialize a new subscriber to make sure checkpoints will be correct. But only start the
+   * subscription if there is no {@link #subscriptionError}.
+   */
+  @SuppressWarnings("FutureReturnValueIgnored")
+  private EFOShardSubscriber initShardSubscriber(ShardCheckpoint cp) {
+    EFOShardSubscriber subscriber =
+        new EFOShardSubscriber(
+            this, cp.getShardId(), read, consumerArn, kinesis, onErrorCoolDownMs);
+    StartingPosition startingPosition = cp.toEFOStartingPosition();
+    if (subscriptionError == null) {
+      subscriber.subscribe(startingPosition).whenCompleteAsync(errorHandler);
+    }
+    return subscriber;
+  }
+
+  private static List<String> computeSuccessorShardsIds(EventRecords records) {
+    List<String> successorShardsIds = new ArrayList<>();
+    SubscribeToShardEvent event = records.event;
+    for (ChildShard childShard : event.childShards()) {
+      if (childShard.parentShards().contains(records.shardId)) {
+        if (childShard.parentShards().size() > 1) {
+          // This is the case of merging two shards into one.
+          // When there are 2 parent shards, we only pick it up if
+          // its max shard equals to sender shard ID.
+          String maxParentId = childShard.parentShards().stream().max(String::compareTo).get();
+          if (records.shardId.equals(maxParentId)) {
+            successorShardsIds.add(childShard.shardId());
+          }
+        } else {
+          // This is the case when shard is split - we must add both
+          // and start subscriptions for them.
+          successorShardsIds.add(childShard.shardId());
+        }
+      }
+    }
+
+    if (successorShardsIds.isEmpty()) {
+      LOG.info("Found no successors for shard {}", records.shardId);
+    } else {
+      LOG.info("Found successors for shard {}: {}", records.shardId, successorShardsIds);
+    }
+    return successorShardsIds;
+  }
+
+  /** Adds a {@link EventRecords} iterator for shardId and event to {@link #eventQueue}. */
+  void enqueueEvent(String shardId, SubscribeToShardEvent event) {
+    eventQueue.offer(new EventRecords(shardId, event));
+  }
+
+  Instant getWatermark() {
+    return watermarkPolicy.getWatermark();
+  }
+
+  /** This is assumed to be never called before {@link #start} is called. */
+  KinesisReaderCheckpoint getCheckpointMark() {
+    List<ShardCheckpoint> checkpoints = new ArrayList<>();

Review Comment:
   ```suggestion
       List<ShardCheckpoint> checkpoints = new ArrayList<>(state.size);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org