You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/09/21 08:33:01 UTC

[flink] 01/03: [FLINK-18515][Kinesis] Adding FanOutRecordPublisher for Kinesis EFO support

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bbcd0c791371c2c6b3e477a83adfbd78dbee2602
Author: Danny Cranmer <cr...@amazon.com>
AuthorDate: Fri Sep 4 10:35:35 2020 +0100

    [FLINK-18515][Kinesis] Adding FanOutRecordPublisher for Kinesis EFO support
    
    This closes #13189.
---
 flink-connectors/flink-connector-kinesis/pom.xml   |   1 +
 .../kinesis/config/ConsumerConfigConstants.java    |  15 +-
 .../internals/DynamoDBStreamsDataFetcher.java      |   3 +-
 .../kinesis/internals/KinesisDataFetcher.java      | 100 +++--
 .../kinesis/internals/ShardConsumer.java           |  13 +-
 .../publisher/RecordPublisherFactory.java          |   7 +
 .../publisher/fanout/FanOutRecordPublisher.java    | 246 +++++++++++
 .../fanout/FanOutRecordPublisherConfiguration.java |   2 +-
 .../FanOutRecordPublisherFactory.java}             |  77 ++--
 .../publisher/fanout/FanOutShardSubscriber.java    | 468 +++++++++++++++++++++
 .../publisher/polling/PollingRecordPublisher.java  |   5 -
 .../polling/PollingRecordPublisherFactory.java     |   2 +-
 .../kinesis/proxy/FullJitterBackoff.java           |  61 +++
 .../connectors/kinesis/proxy/KinesisProxy.java     |  41 +-
 .../kinesis/proxy/KinesisProxyInterface.java       |   1 +
 .../connectors/kinesis/proxy/KinesisProxyV2.java   |  19 +-
 .../kinesis/proxy/KinesisProxyV2Interface.java     |  14 +
 .../streaming/connectors/kinesis/util/AWSUtil.java |  12 +-
 .../connectors/kinesis/util/AwsV2Util.java         |  36 +-
 .../connectors/kinesis/util/KinesisConfigUtil.java |   3 +
 .../kinesis/FlinkKinesisConsumerTest.java          |  10 +-
 .../kinesis/internals/KinesisDataFetcherTest.java  |  45 +-
 .../kinesis/internals/ShardConsumerFanOutTest.java | 242 +++++++++++
 .../kinesis/internals/ShardConsumerTest.java       | 111 +----
 .../kinesis/internals/ShardConsumerTestUtils.java  | 129 ++++++
 .../FanOutRecordPublisherConfigurationTest.java    |   1 -
 .../fanout/FanOutRecordPublisherTest.java          | 443 +++++++++++++++++++
 .../polling/PollingRecordPublisherTest.java        |  30 +-
 .../kinesis/proxy/KinesisProxyV2Test.java          |  60 +++
 .../FakeKinesisFanOutBehavioursFactory.java        | 391 +++++++++++++++++
 .../connectors/kinesis/testutils/TestUtils.java    |  41 ++
 .../testutils/TestableKinesisDataFetcher.java      |  39 +-
 ...inesisDataFetcherForShardConsumerException.java |   4 +-
 .../connectors/kinesis/util/AWSUtilTest.java       |   8 +-
 .../connectors/kinesis/util/AwsV2UtilTest.java     |  38 +-
 .../kinesis/util/KinesisConfigUtilTest.java        |  35 +-
 36 files changed, 2484 insertions(+), 269 deletions(-)

diff --git a/flink-connectors/flink-connector-kinesis/pom.xml b/flink-connectors/flink-connector-kinesis/pom.xml
index 2a96598..9d22d8c 100644
--- a/flink-connectors/flink-connector-kinesis/pom.xml
+++ b/flink-connectors/flink-connector-kinesis/pom.xml
@@ -103,6 +103,7 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<!-- Amazon AWS SDK v1.x dependencies -->
 		<dependency>
 			<groupId>com.amazonaws</groupId>
 			<artifactId>aws-java-sdk-kinesis</artifactId>
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index dde4821..f003b3b 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -224,6 +224,8 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
 	/** The maximum number of records that will be buffered before suspending consumption of a shard. */
 	public static final String WATERMARK_SYNC_QUEUE_CAPACITY = "flink.watermark.sync.queue.capacity";
 
+	public static final String EFO_HTTP_CLIENT_MAX_CONCURRENCY = "flink.stream.efo.http-client.max-concurrency";
+
 	// ------------------------------------------------------------------------
 	//  Default values for consumer configuration
 	// ------------------------------------------------------------------------
@@ -272,7 +274,7 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
 
 	public static final double DEFAULT_DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
 
-	public static final int DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES = 5;
+	public static final int DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES = 10;
 
 	public static final long DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_BASE = 1000L;
 
@@ -308,10 +310,21 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
 
 	public static final long DEFAULT_WATERMARK_SYNC_MILLIS = 30_000;
 
+	public static final int DEFAULT_EFO_HTTP_CLIENT_MAX_CONURRENCY = 10_000;
+
 	/**
 	 * To avoid shard iterator expires in {@link ShardConsumer}s, the value for the configured
 	 * getRecords interval can not exceed 5 minutes, which is the expire time for retrieved iterators.
 	 */
 	public static final long MAX_SHARD_GETRECORDS_INTERVAL_MILLIS = 300000L;
 
+	/**
+	 * Build the key of an EFO consumer ARN according to a stream name.
+	 * @param streamName the stream name the key is built upon.
+	 * @return a key of EFO consumer ARN.
+	 */
+	public static String efoConsumerArn(final String streamName) {
+		return EFO_CONSUMER_ARN_PREFIX + "." + streamName;
+	}
+
 }
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java
index afa1c28..fbf0d01 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java
@@ -70,7 +70,8 @@ public class DynamoDBStreamsDataFetcher<T> extends KinesisDataFetcher<T> {
 			new ArrayList<>(),
 			createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
 			// use DynamoDBStreamsProxy
-			DynamoDBStreamsProxy::create);
+			DynamoDBStreamsProxy::create,
+			null);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 2be9b1c..133a4d3 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -27,8 +27,10 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
 import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
 import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisherFactory;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisherFactory;
 import org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisherFactory;
 import org.apache.flink.streaming.connectors.kinesis.metrics.KinesisConsumerMetricConstants;
 import org.apache.flink.streaming.connectors.kinesis.metrics.ShardConsumerMetricsReporter;
@@ -41,8 +43,11 @@ import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
 import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.connectors.kinesis.util.AwsV2Util;
 import org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter;
 import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
 import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
@@ -56,6 +61,9 @@ import com.amazonaws.services.kinesis.model.SequenceNumberRange;
 import com.amazonaws.services.kinesis.model.Shard;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -74,6 +82,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RECORD_PUBLISHER_TYPE;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType.POLLING;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -183,6 +193,9 @@ public class KinesisDataFetcher<T> {
 	/** The Kinesis proxy factory that will be used to create instances for discovery and shard consumers. */
 	private final FlinkKinesisProxyFactory kinesisProxyFactory;
 
+	/** The Kinesis proxy V2 factory that will be used to create instances for EFO shard consumers. */
+	private final FlinkKinesisProxyV2Factory kinesisProxyV2Factory;
+
 	/** The Kinesis proxy that the fetcher will be using to discover new shards. */
 	private final KinesisProxyInterface kinesis;
 
@@ -243,6 +256,13 @@ public class KinesisDataFetcher<T> {
 	}
 
 	/**
+	 * Factory to create Kinesis proxy V@ instances used by a fetcher.
+	 */
+	public interface FlinkKinesisProxyV2Factory {
+		KinesisProxyV2Interface create(Properties configProps);
+	}
+
+	/**
 	 * The wrapper that holds the watermark handling related parameters
 	 * of a record produced by the shard consumer thread.
 	 *
@@ -318,14 +338,15 @@ public class KinesisDataFetcher<T> {
 	 * @param configProps the consumer configuration properties
 	 * @param deserializationSchema deserialization schema
 	 */
-	public KinesisDataFetcher(List<String> streams,
-							SourceFunction.SourceContext<T> sourceContext,
-							RuntimeContext runtimeContext,
-							Properties configProps,
-							KinesisDeserializationSchema<T> deserializationSchema,
-							KinesisShardAssigner shardAssigner,
-							AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
-							WatermarkTracker watermarkTracker) {
+	public KinesisDataFetcher(
+			final List<String> streams,
+			final SourceFunction.SourceContext<T> sourceContext,
+			final RuntimeContext runtimeContext,
+			final Properties configProps,
+			final KinesisDeserializationSchema<T> deserializationSchema,
+			final KinesisShardAssigner shardAssigner,
+			final AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
+			final WatermarkTracker watermarkTracker) {
 		this(streams,
 			sourceContext,
 			sourceContext.getCheckpointLock(),
@@ -338,23 +359,26 @@ public class KinesisDataFetcher<T> {
 			new AtomicReference<>(),
 			new ArrayList<>(),
 			createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
-			KinesisProxy::create);
+			KinesisProxy::create,
+			KinesisDataFetcher::createKinesisProxyV2);
 	}
 
 	@VisibleForTesting
-	protected KinesisDataFetcher(List<String> streams,
-								SourceFunction.SourceContext<T> sourceContext,
-								Object checkpointLock,
-								RuntimeContext runtimeContext,
-								Properties configProps,
-								KinesisDeserializationSchema<T> deserializationSchema,
-								KinesisShardAssigner shardAssigner,
-								AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
-								WatermarkTracker watermarkTracker,
-								AtomicReference<Throwable> error,
-								List<KinesisStreamShardState> subscribedShardsState,
-								HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds,
-								FlinkKinesisProxyFactory kinesisProxyFactory) {
+	protected KinesisDataFetcher(
+			final List<String> streams,
+			final SourceFunction.SourceContext<T> sourceContext,
+			final Object checkpointLock,
+			final RuntimeContext runtimeContext,
+			final Properties configProps,
+			final KinesisDeserializationSchema<T> deserializationSchema,
+			final KinesisShardAssigner shardAssigner,
+			final AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
+			final WatermarkTracker watermarkTracker,
+			final AtomicReference<Throwable> error,
+			final List<KinesisStreamShardState> subscribedShardsState,
+			final HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds,
+			final FlinkKinesisProxyFactory kinesisProxyFactory,
+			@Nullable final FlinkKinesisProxyV2Factory kinesisProxyV2Factory) {
 		this.streams = checkNotNull(streams);
 		this.configProps = checkNotNull(configProps);
 		this.sourceContext = checkNotNull(sourceContext);
@@ -367,6 +391,7 @@ public class KinesisDataFetcher<T> {
 		this.periodicWatermarkAssigner = periodicWatermarkAssigner;
 		this.watermarkTracker = watermarkTracker;
 		this.kinesisProxyFactory = checkNotNull(kinesisProxyFactory);
+		this.kinesisProxyV2Factory = kinesisProxyV2Factory;
 		this.kinesis = kinesisProxyFactory.create(configProps);
 		this.recordPublisherFactory = createRecordPublisherFactory();
 
@@ -379,6 +404,7 @@ public class KinesisDataFetcher<T> {
 
 		this.shardConsumersExecutor =
 			createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks());
+
 		this.recordEmitter = createRecordEmitter(configProps);
 	}
 
@@ -402,11 +428,11 @@ public class KinesisDataFetcher<T> {
 	 * @return shard consumer
 	 */
 	protected ShardConsumer<T> createShardConsumer(
-		Integer subscribedShardStateIndex,
-		StreamShardHandle subscribedShard,
-		SequenceNumber lastSequenceNum,
-		MetricGroup metricGroup,
-		KinesisDeserializationSchema<T> shardDeserializer) throws InterruptedException {
+			final Integer subscribedShardStateIndex,
+			final StreamShardHandle subscribedShard,
+			final SequenceNumber lastSequenceNum,
+			final MetricGroup metricGroup,
+			final KinesisDeserializationSchema<T> shardDeserializer) throws InterruptedException {
 
 		return new ShardConsumer<>(
 			this,
@@ -418,8 +444,17 @@ public class KinesisDataFetcher<T> {
 			shardDeserializer);
 	}
 
-	private RecordPublisherFactory createRecordPublisherFactory() {
-		return new PollingRecordPublisherFactory(kinesisProxyFactory);
+	protected RecordPublisherFactory createRecordPublisherFactory() {
+		RecordPublisherType recordPublisherType = RecordPublisherType.valueOf(
+			configProps.getProperty(RECORD_PUBLISHER_TYPE, POLLING.name()));
+
+		switch (recordPublisherType) {
+			case EFO:
+				return new FanOutRecordPublisherFactory(kinesisProxyV2Factory.create(configProps));
+			case POLLING:
+			default:
+				return new PollingRecordPublisherFactory(kinesisProxyFactory);
+		}
 	}
 
 	protected RecordPublisher createRecordPublisher(
@@ -432,6 +467,11 @@ public class KinesisDataFetcher<T> {
 		return recordPublisherFactory.create(startingPosition, configProps, metricGroup, subscribedShard);
 	}
 
+	private static KinesisProxyV2Interface createKinesisProxyV2(final Properties configProps) {
+		final KinesisAsyncClient client = AwsV2Util.createKinesisAsyncClient(configProps);
+		return new KinesisProxyV2(client);
+	}
+
 	/**
 	 * Starts the fetcher. After starting the fetcher, it can only
 	 * be stopped by calling {@link KinesisDataFetcher#shutdownFetcher()}.
@@ -672,6 +712,8 @@ public class KinesisDataFetcher<T> {
 			LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask);
 		}
 		shardConsumersExecutor.shutdownNow();
+
+		recordPublisherFactory.close();
 	}
 
 	/** After calling {@link KinesisDataFetcher#shutdownFetcher()}, this can be called to await the fetcher shutdown. */
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index d9c0d9d..5bf0b09 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -28,6 +28,8 @@ import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 
 import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -51,6 +53,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 @Internal
 public class ShardConsumer<T> implements Runnable {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ShardConsumer.class);
+
 	private final KinesisDeserializationSchema<T> deserializer;
 
 	private final int subscribedShardStateIndex;
@@ -102,6 +107,11 @@ public class ShardConsumer<T> implements Runnable {
 		try {
 			while (isRunning()) {
 				final RecordPublisherRunResult result = recordPublisher.run(batch -> {
+					if (!batch.getDeaggregatedRecords().isEmpty()) {
+						LOG.debug("stream: {}, shard: {}, millis behind latest: {}, batch size: {}",
+							subscribedShard.getStreamName(), subscribedShard.getShard().getShardId(),
+							batch.getMillisBehindLatest(), batch.getDeaggregatedRecordSize());
+					}
 					for (UserRecord userRecord : batch.getDeaggregatedRecords()) {
 						if (filterDeaggregatedRecord(userRecord)) {
 							deserializeRecordForCollectionAndUpdateState(userRecord);
@@ -118,7 +128,6 @@ public class ShardConsumer<T> implements Runnable {
 
 				if (result == COMPLETE) {
 					fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
-
 					// we can close this consumer thread once we've reached the end of the subscribed shard
 					break;
 				}
@@ -188,7 +197,7 @@ public class ShardConsumer<T> implements Runnable {
 	 * This method is to support restarting from a partially consumed aggregated sequence number.
 	 *
 	 * @param record the record to filter
-	 * @return {@code true} if the record should be retained
+	 * @return true if the record should be retained
 	 */
 	private boolean filterDeaggregatedRecord(final UserRecord record) {
 		if (!lastSequenceNum.isAggregated()) {
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisherFactory.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisherFactory.java
index 0dcd0cb..672dc38 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisherFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisherFactory.java
@@ -45,4 +45,11 @@ public interface RecordPublisherFactory {
 			MetricGroup metricGroup,
 			StreamShardHandle streamShardHandle) throws InterruptedException;
 
+	/**
+	 * Destroy any open resources used by the factory.
+	 */
+	default void close() {
+		// Do nothing by default
+	}
+
 }
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java
new file mode 100644
index 0000000..2174029
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.FanOutSubscriberException;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.proxy.FullJitterBackoff;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.model.EncryptionType;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.function.Consumer;
+
+import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
+import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
+import static software.amazon.awssdk.services.kinesis.model.StartingPosition.builder;
+
+/**
+ * A {@link RecordPublisher} that will read and forward records from Kinesis using EFO, to the subscriber.
+ * Records are consumed via Enhanced Fan Out subscriptions using SubscribeToShard API.
+ */
+@Internal
+public class FanOutRecordPublisher implements RecordPublisher {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FanOutRecordPublisher.class);
+
+	private final FullJitterBackoff backoff;
+
+	private final String consumerArn;
+
+	private final KinesisProxyV2Interface kinesisProxy;
+
+	private final StreamShardHandle subscribedShard;
+
+	private final FanOutRecordPublisherConfiguration configuration;
+
+	/** The current attempt in the case of subsequent recoverable errors. */
+	private int attempt = 0;
+
+	private StartingPosition nextStartingPosition;
+
+	/**
+	 * Instantiate a new FanOutRecordPublisher.
+	 * Consumes data from KDS using EFO SubscribeToShard over AWS SDK V2.x
+	 *
+	 * @param startingPosition the position in the shard to start consuming from
+	 * @param consumerArn the consumer ARN of the stream consumer
+	 * @param subscribedShard the shard to consumer from
+	 * @param kinesisProxy the proxy used to talk to Kinesis services
+	 * @param configuration the record publisher configuration
+	 */
+	public FanOutRecordPublisher(
+			final StartingPosition startingPosition,
+			final String consumerArn,
+			final StreamShardHandle subscribedShard,
+			final KinesisProxyV2Interface kinesisProxy,
+			final FanOutRecordPublisherConfiguration configuration,
+			final FullJitterBackoff backoff) {
+		this.nextStartingPosition = Preconditions.checkNotNull(startingPosition);
+		this.consumerArn = Preconditions.checkNotNull(consumerArn);
+		this.subscribedShard = Preconditions.checkNotNull(subscribedShard);
+		this.kinesisProxy = Preconditions.checkNotNull(kinesisProxy);
+		this.configuration = Preconditions.checkNotNull(configuration);
+		this.backoff = Preconditions.checkNotNull(backoff);
+	}
+
+	@Override
+	public RecordPublisherRunResult run(final RecordBatchConsumer recordConsumer) throws InterruptedException {
+		LOG.info("Running fan out record publisher on {}::{} from {} - {}",
+			subscribedShard.getStreamName(),
+			subscribedShard.getShard().getShardId(),
+			nextStartingPosition.getShardIteratorType(),
+			nextStartingPosition.getStartingMarker());
+
+		Consumer<SubscribeToShardEvent> eventConsumer = event -> {
+			RecordBatch recordBatch = new RecordBatch(toSdkV1Records(event.records()), subscribedShard, event.millisBehindLatest());
+			SequenceNumber sequenceNumber = recordConsumer.accept(recordBatch);
+			nextStartingPosition = StartingPosition.continueFromSequenceNumber(sequenceNumber);
+		};
+
+		RecordPublisherRunResult result = runWithBackoff(eventConsumer);
+
+		LOG.info("Subscription expired {}::{}, with status {}",
+			subscribedShard.getStreamName(),
+			subscribedShard.getShard().getShardId(),
+			result);
+
+		return result;
+	}
+
+	/**
+	 * Runs the record publisher, will sleep for configuration computed jitter period in the case of certain exceptions.
+	 * Unrecoverable exceptions are thrown to terminate the application.
+	 *
+	 * @param eventConsumer the consumer to pass events to
+	 * @return {@code COMPLETE} if the shard is complete and this shard consumer should exit
+	 * @throws InterruptedException
+	 */
+	private RecordPublisherRunResult runWithBackoff(
+			final Consumer<SubscribeToShardEvent> eventConsumer) throws InterruptedException {
+		FanOutShardSubscriber fanOutShardSubscriber = new FanOutShardSubscriber(
+			consumerArn,
+			subscribedShard.getShard().getShardId(),
+			kinesisProxy);
+		boolean complete;
+
+		try {
+			complete = fanOutShardSubscriber.subscribeToShardAndConsumeRecords(
+				toSdkV2StartingPosition(nextStartingPosition), eventConsumer);
+			attempt = 0;
+		} catch (FanOutSubscriberException ex) {
+			// We have received an error from the network layer
+			// This can be due to limits being exceeded, network timeouts, etc
+			// We should backoff, reacquire a subscription and try again
+			if (ex.getCause() instanceof ResourceNotFoundException) {
+				LOG.warn("Received ResourceNotFoundException. Either the shard does not exist, or the stream subscriber has been deregistered." +
+					"Marking this shard as complete {} ({})", subscribedShard.getShard().getShardId(), consumerArn);
+
+				return COMPLETE;
+			}
+
+			if (attempt == configuration.getSubscribeToShardMaxRetries()) {
+				throw new RuntimeException("Maximum reties exceeded for SubscribeToShard. " +
+					"Failed " + configuration.getSubscribeToShardMaxRetries() + " times.");
+			}
+
+			attempt++;
+			backoff(ex);
+			return INCOMPLETE;
+		}
+
+		return complete ? COMPLETE : INCOMPLETE;
+	}
+
+	private void backoff(final Throwable ex) throws InterruptedException {
+		long backoffMillis = backoff.calculateFullJitterBackoff(
+			configuration.getSubscribeToShardBaseBackoffMillis(),
+			configuration.getSubscribeToShardMaxBackoffMillis(),
+			configuration.getSubscribeToShardExpConstant(),
+			attempt);
+
+		LOG.warn("Encountered recoverable error {}. Backing off for {} millis {} ({})",
+			ex.getCause().getClass().getSimpleName(),
+			backoffMillis,
+			subscribedShard.getShard().getShardId(),
+			consumerArn,
+			ex);
+
+		backoff.sleep(backoffMillis);
+	}
+
+	/**
+	 * Records that come from KPL may be aggregated.
+	 * Records must be deaggregated before they are processed by the application.
+	 * Deaggregation is performed by KCL.
+	 * In order to prevent having to import KCL 1.x and 2.x we convert the records to v1 format and use KCL v1.
+	 *
+	 * @param records the SDK v2 records
+	 * @return records converted to SDK v1 format
+	 */
+	private List<com.amazonaws.services.kinesis.model.Record> toSdkV1Records(final List<Record> records) {
+		final List<com.amazonaws.services.kinesis.model.Record> sdkV1Records = new ArrayList<>();
+
+		for (Record record : records) {
+			sdkV1Records.add(toSdkV1Record(record));
+		}
+
+		return sdkV1Records;
+	}
+
+	private com.amazonaws.services.kinesis.model.Record toSdkV1Record(@Nonnull final Record record) {
+		final com.amazonaws.services.kinesis.model.Record recordV1 = new com.amazonaws.services.kinesis.model.Record()
+			.withData(record.data().asByteBuffer())
+			.withSequenceNumber(record.sequenceNumber())
+			.withPartitionKey(record.partitionKey())
+			.withApproximateArrivalTimestamp(new Date(record.approximateArrivalTimestamp().toEpochMilli()));
+
+		EncryptionType encryptionType = record.encryptionType();
+		if (encryptionType != null) {
+			recordV1.withEncryptionType(encryptionType.name());
+		}
+
+		return recordV1;
+	}
+
+	/**
+	 * Converts a local {@link StartingPosition} to an AWS SDK V2 object representation.
+	 *
+	 * @param startingPosition the local {@link StartingPosition}
+	 * @return an AWS SDK V2 representation
+	 */
+	private software.amazon.awssdk.services.kinesis.model.StartingPosition toSdkV2StartingPosition(StartingPosition startingPosition) {
+		software.amazon.awssdk.services.kinesis.model.StartingPosition.Builder builder = builder()
+			.type(startingPosition.getShardIteratorType().toString());
+
+		Object marker = startingPosition.getStartingMarker();
+
+		switch (startingPosition.getShardIteratorType()) {
+			case AT_TIMESTAMP: {
+				Preconditions.checkNotNull(marker, "StartingPosition AT_TIMESTAMP date marker is null.");
+				builder.timestamp(((Date) marker).toInstant());
+				break;
+			}
+			case AT_SEQUENCE_NUMBER:
+			case AFTER_SEQUENCE_NUMBER: {
+				Preconditions.checkNotNull(marker, "StartingPosition *_SEQUENCE_NUMBER position is null.");
+				builder.sequenceNumber(marker.toString());
+				break;
+			}
+		}
+
+		return builder.build();
+	}
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
index 03705cf..89ffad3 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
@@ -124,7 +124,7 @@ public class FanOutRecordPublisherConfiguration {
 	private final long describeStreamBaseBackoffMillis;
 
 	/**
-	 * Maximum backoff millis for the describe stream operation.
+	 *  Maximum backoff millis for the describe stream operation.
 	 */
 	private final long describeStreamMaxBackoffMillis;
 
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactory.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherFactory.java
similarity index 52%
copy from flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactory.java
copy to flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherFactory.java
index ee5034f..f21bfdc 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherFactory.java
@@ -15,74 +15,81 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.FlinkKinesisProxyFactory;
 import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
 import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisherFactory;
-import org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
 import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
-import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.proxy.FullJitterBackoff;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
 import org.apache.flink.util.Preconditions;
 
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+
+import java.util.Optional;
 import java.util.Properties;
 
+import static java.util.Collections.singletonList;
+
 /**
- * A {@link RecordPublisher} factory used to create instances of {@link PollingRecordPublisher}.
+ * A {@link RecordPublisher} factory used to create instances of {@link FanOutRecordPublisher}.
  */
 @Internal
-public class PollingRecordPublisherFactory implements RecordPublisherFactory {
+public class FanOutRecordPublisherFactory implements RecordPublisherFactory {
 
-	private final FlinkKinesisProxyFactory kinesisProxyFactory;
+	private static final FullJitterBackoff BACKOFF = new FullJitterBackoff();
 
-	public PollingRecordPublisherFactory(FlinkKinesisProxyFactory kinesisProxyFactory) {
-		this.kinesisProxyFactory = kinesisProxyFactory;
+	/**
+	 * A singleton {@link KinesisProxyV2} is used per Flink task.
+	 * The {@link KinesisAsyncClient} uses an internal thread pool; using a single client reduces overhead.
+	 */
+	private final KinesisProxyV2Interface kinesisProxy;
+
+	/**
+	 * Instantiate a factory responsible for creating {@link FanOutRecordPublisher}.
+	 *
+	 * @param kinesisProxy the singleton proxy used by all record publishers created by this factory
+	 */
+	public FanOutRecordPublisherFactory(final KinesisProxyV2Interface kinesisProxy) {
+		this.kinesisProxy = kinesisProxy;
 	}
 
 	/**
-	 * Create a {@link PollingRecordPublisher}.
-	 * An {@link AdaptivePollingRecordPublisher} will be created should adaptive reads be enabled in the configuration.
+	 * Create a {@link FanOutRecordPublisher}.
 	 *
-	 * @param startingPosition the position in the shard to start consuming records from
+	 * @param startingPosition the starting position in the shard to start consuming from
 	 * @param consumerConfig the consumer configuration properties
 	 * @param metricGroup the metric group to report metrics to
 	 * @param streamShardHandle the shard this consumer is subscribed to
-	 * @return a {@link PollingRecordPublisher}
+	 * @return a {@link FanOutRecordPublisher}
 	 */
 	@Override
-	public PollingRecordPublisher create(
+	public FanOutRecordPublisher create(
 			final StartingPosition startingPosition,
 			final Properties consumerConfig,
 			final MetricGroup metricGroup,
-			final StreamShardHandle streamShardHandle) throws InterruptedException {
+			final StreamShardHandle streamShardHandle) {
 		Preconditions.checkNotNull(startingPosition);
 		Preconditions.checkNotNull(consumerConfig);
 		Preconditions.checkNotNull(metricGroup);
 		Preconditions.checkNotNull(streamShardHandle);
 
-		final PollingRecordPublisherConfiguration configuration = new PollingRecordPublisherConfiguration(consumerConfig);
-		final PollingRecordPublisherMetricsReporter metricsReporter = new PollingRecordPublisherMetricsReporter(metricGroup);
-		final KinesisProxyInterface kinesisProxy = kinesisProxyFactory.create(consumerConfig);
+		String stream = streamShardHandle.getStreamName();
+		FanOutRecordPublisherConfiguration configuration = new FanOutRecordPublisherConfiguration(consumerConfig, singletonList(stream));
+
+		Optional<String> streamConsumerArn = configuration.getStreamConsumerArn(stream);
+		Preconditions.checkState(streamConsumerArn.isPresent());
+
+		return new FanOutRecordPublisher(startingPosition, streamConsumerArn.get(), streamShardHandle, kinesisProxy, configuration, BACKOFF);
+	}
 
-		if (configuration.isAdaptiveReads()) {
-			return new AdaptivePollingRecordPublisher(
-				startingPosition,
-				streamShardHandle,
-				metricsReporter,
-				kinesisProxy,
-				configuration.getMaxNumberOfRecordsPerFetch(),
-				configuration.getFetchIntervalMillis());
-		} else {
-			return new PollingRecordPublisher(
-				startingPosition,
-				streamShardHandle,
-				metricsReporter,
-				kinesisProxy,
-				configuration.getMaxNumberOfRecordsPerFetch(),
-				configuration.getFetchIntervalMillis());
-		}
+	@Override
+	public void close() {
+		kinesisProxy.close();
 	}
+
 }
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
new file mode 100644
index 0000000..0cc1eaf
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
+import org.apache.flink.util.Preconditions;
+
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class is responsible for acquiring an Enhanced Fan Out subscription and consuming records from a shard.
+ * A queue is used to buffer records between the Kinesis Proxy and Flink application. This allows processing
+ * to be separated from consumption; errors thrown in the consumption layer do not propagate up to application.
+ *
+ * <pre>{@code [
+ * | ----------- Source Connector Thread ----------- |                      | --- KinesisAsyncClient Thread(s) -- |
+ * | FanOutRecordPublisher | FanOutShardSubscription | == blocking queue == | KinesisProxyV2 | KinesisAsyncClient |
+ * ]}</pre>
+ * <p>
+ * 	 Three types of message are passed over the queue for inter-thread communication:
+ * 	 <ul>
+ * 	   	<li>{@link SubscriptionNextEvent} - passes data from the network to the consumer</li>
+ * 	  	<li>{@link SubscriptionCompleteEvent} - indicates a subscription has expired</li>
+ * 	   	<li>{@link SubscriptionErrorEvent} - passes an exception from the network to the consumer</li>
+ * 	 </ul>
+ * </p>
+ * <p>
+ *   The blocking queue has a maximum capacity of 1 record.
+ *   This allows backpressure to be applied closer to the network stack and results in record prefetch.
+ *   At maximum capacity we will have three {@link SubscribeToShardEvent} in memory (per instance of this class):
+ *   <ul>
+ *      <li>1 event being processed by the consumer</li>
+ *      <li>1 event enqueued in the blocking queue</li>
+ *      <li>1 event being added to the queue by the network (blocking)</li>
+ *   </ul>
+ * </p>
+ */
+@Internal
+public class FanOutShardSubscriber {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FanOutShardSubscriber.class);
+
+	/**
+	 * The maximum capacity of the queue between the network and consumption thread.
+	 * The queue is mainly used to isolate networking from consumption such that errors do not bubble up.
+	 * This queue also acts as a buffer resulting in a record prefetch and reduced latency.
+	 */
+	private static final int QUEUE_CAPACITY = 1;
+
+	/**
+	 * Read timeout will occur after 30 seconds, a sanity timeout to prevent lockup in unexpected error states.
+	 * If the consumer does not receive a new event within the DEQUEUE_WAIT_SECONDS it will backoff and resubscribe.
+	 * Under normal conditions heartbeat events are received even when there are no records to consume, so it is not
+	 * expected for this timeout to occur under normal conditions.
+	 */
+	private static final int DEQUEUE_WAIT_SECONDS = 35;
+
+	/** The time to wait when enqueuing events to allow error events to "push in front" of data . */
+	private static final int ENQUEUE_WAIT_SECONDS = 5;
+
+	private final BlockingQueue<FanOutSubscriptionEvent> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
+
+	private final KinesisProxyV2Interface kinesis;
+
+	private final String consumerArn;
+
+	private final String shardId;
+
+	/**
+	 * Create a new Fan Out subscriber.
+	 *
+	 * @param consumerArn the stream consumer ARN
+	 * @param shardId the shard ID to subscribe to
+	 * @param kinesis the Kinesis Proxy used to communicate via AWS SDK v2
+	 */
+	FanOutShardSubscriber(final String consumerArn, final String shardId, final KinesisProxyV2Interface kinesis) {
+		this.kinesis = Preconditions.checkNotNull(kinesis);
+		this.consumerArn = Preconditions.checkNotNull(consumerArn);
+		this.shardId = Preconditions.checkNotNull(shardId);
+	}
+
+	/**
+	 * Obtains a subscription to the shard from the specified {@code startingPosition}.
+	 * {@link SubscribeToShardEvent} received from KDS are delivered to the given {@code eventConsumer}.
+	 * Returns false if there are records left to consume from the shard.
+	 *
+	 * @param startingPosition the position in the stream in which to start receiving records
+	 * @param eventConsumer the consumer to deliver received events to
+	 * @return true if there are no more messages (complete), false if a subsequent subscription should be obtained
+	 * @throws FanOutSubscriberException when an exception is propagated from the networking stack
+	 * @throws InterruptedException when the thread is interrupted
+	 */
+	boolean subscribeToShardAndConsumeRecords(
+			final StartingPosition startingPosition,
+			final Consumer<SubscribeToShardEvent> eventConsumer) throws InterruptedException, FanOutSubscriberException {
+		LOG.debug("Subscribing to shard {} ({})", shardId, consumerArn);
+
+		try {
+			openSubscriptionToShard(startingPosition);
+		} catch (FanOutSubscriberException ex) {
+			// The only exception that should cause a failure is a ResourceNotFoundException
+			// Rethrow the exception to trigger the application to terminate
+			if (ex.getCause() instanceof ResourceNotFoundException) {
+				throw (ResourceNotFoundException) ex.getCause();
+			}
+
+			throw ex;
+		}
+
+		return consumeAllRecordsFromKinesisShard(eventConsumer);
+	}
+
+	/**
+	 * Calls {@link KinesisProxyV2#subscribeToShard} and waits to acquire a subscription.
+	 * In the event a non-recoverable error occurs this method will rethrow the exception.
+	 * Once the subscription is acquired the client signals to the producer that we are ready to receive records.
+	 *
+	 * @param startingPosition the position in which to start consuming from
+	 * @throws FanOutSubscriberException when an exception is propagated from the networking stack
+	 */
+	private void openSubscriptionToShard(final StartingPosition startingPosition) throws FanOutSubscriberException, InterruptedException {
+		SubscribeToShardRequest request = SubscribeToShardRequest.builder()
+			.consumerARN(consumerArn)
+			.shardId(shardId)
+			.startingPosition(startingPosition)
+			.build();
+
+		AtomicReference<Throwable> exception = new AtomicReference<>();
+		CountDownLatch waitForSubscriptionLatch = new CountDownLatch(1);
+		FanOutShardSubscription subscription = new FanOutShardSubscription(waitForSubscriptionLatch);
+
+		SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
+			.builder()
+			.onError(e -> {
+				// Errors that occur while trying to acquire a subscription are only thrown from here
+				// Errors that occur during the subscription are surfaced here and to the FanOutShardSubscription
+				//	(errors are ignored here once the subscription is open)
+				if (waitForSubscriptionLatch.getCount() > 0) {
+					exception.set(e);
+					waitForSubscriptionLatch.countDown();
+				}
+			})
+			.subscriber(() -> subscription)
+			.build();
+
+		kinesis.subscribeToShard(request, responseHandler);
+
+		waitForSubscriptionLatch.await();
+
+		Throwable throwable = exception.get();
+		if (throwable != null) {
+			handleError(throwable);
+		}
+
+		LOG.debug("Acquired subscription - {} ({})", shardId, consumerArn);
+
+		// Request the first record to kick off consumption
+		// Following requests are made by the FanOutShardSubscription on the netty thread
+		subscription.requestRecord();
+	}
+
+	/**
+	 * Update the reference to the latest networking error in this object.
+	 * Parent caller can interrogate to decide how to handle error.
+	 *
+	 * @param throwable the exception that has occurred
+	 */
+	private void handleError(final Throwable throwable) throws FanOutSubscriberException {
+		Throwable cause;
+		if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
+			cause = throwable.getCause();
+		} else {
+			cause = throwable;
+		}
+
+		LOG.warn("Error occurred on EFO subscription: {} - ({}).  {} ({})",
+			throwable.getClass().getName(), throwable.getMessage(), shardId, consumerArn, cause);
+
+		throw new FanOutSubscriberException(cause);
+	}
+
+	/**
+	 * Once the subscription is open, records will be delivered to the {@link BlockingQueue}.
+	 * Queue capacity is hardcoded to 1 record, the queue is used solely to separate consumption and processing.
+	 * However, this buffer will result in latency reduction as records are pre-fetched as a result.
+	 * This method will poll the queue and exit under any of these conditions:
+	 * - {@code continuationSequenceNumber} is {@code null}, indicating the shard is complete
+	 * - The subscription expires, indicated by a {@link SubscriptionCompleteEvent}
+	 * - There is an error while consuming records, indicated by a {@link SubscriptionErrorEvent}
+	 *
+	 * @param eventConsumer the event consumer to deliver records to
+	 * @return true if there are no more messages (complete), false if a subsequent subscription should be obtained
+	 * @throws FanOutSubscriberException when an exception is propagated from the networking stack
+	 * @throws InterruptedException when the thread is interrupted
+	 */
+	private boolean consumeAllRecordsFromKinesisShard(
+			final Consumer<SubscribeToShardEvent> eventConsumer) throws InterruptedException, FanOutSubscriberException {
+		String continuationSequenceNumber;
+
+		do {
+			// Read timeout will occur after 30 seconds, add a sanity timeout here to prevent lockup
+			FanOutSubscriptionEvent subscriptionEvent = queue.poll(DEQUEUE_WAIT_SECONDS, SECONDS);
+
+			if (subscriptionEvent == null) {
+				LOG.debug("Timed out polling events from network, reacquiring subscription - {} ({})", shardId, consumerArn);
+				return false;
+			} else if (subscriptionEvent.isSubscribeToShardEvent()) {
+				SubscribeToShardEvent event = subscriptionEvent.getSubscribeToShardEvent();
+				continuationSequenceNumber = event.continuationSequenceNumber();
+				if (!event.records().isEmpty()) {
+					eventConsumer.accept(event);
+				}
+			} else if (subscriptionEvent.isSubscriptionComplete()) {
+				// The subscription is complete, but the shard might not be, so we return incomplete
+				return false;
+			} else {
+				handleError(subscriptionEvent.getThrowable());
+				return false;
+			}
+		} while (continuationSequenceNumber != null);
+
+		return true;
+	}
+
+	/**
+	 * The {@link FanOutShardSubscription} subscribes to the events coming from KDS and adds them to the {@link BlockingQueue}.
+	 * Backpressure is applied based on the maximum capacity of the queue.
+	 * The {@link Subscriber} methods of this class are invoked by a thread from the {@link KinesisAsyncClient}.
+	 */
+	private class FanOutShardSubscription implements Subscriber<SubscribeToShardEventStream> {
+
+		private Subscription subscription;
+
+		private volatile boolean cancelled = false;
+
+		private final CountDownLatch waitForSubscriptionLatch;
+
+		private final Object lockObject = new Object();
+
+		private FanOutShardSubscription(final CountDownLatch waitForSubscriptionLatch) {
+			this.waitForSubscriptionLatch = waitForSubscriptionLatch;
+		}
+
+		/**
+		 * Flag to the producer that we are ready to receive more events.
+		 */
+		void requestRecord() {
+			if (!cancelled) {
+				LOG.debug("Requesting more records from EFO subscription - {} ({})", shardId, consumerArn);
+				subscription.request(1);
+			}
+		}
+
+		@Override
+		public void onSubscribe(Subscription subscription) {
+			this.subscription = subscription;
+			waitForSubscriptionLatch.countDown();
+		}
+
+		@Override
+		public void onNext(SubscribeToShardEventStream subscribeToShardEventStream) {
+			subscribeToShardEventStream.accept(new SubscribeToShardResponseHandler.Visitor() {
+				@Override
+				public void visit(SubscribeToShardEvent event) {
+					synchronized (lockObject) {
+						if (enqueueEventWithRetry(new SubscriptionNextEvent(event))) {
+							requestRecord();
+						}
+					}
+				}
+			});
+		}
+
+		@Override
+		public void onError(Throwable throwable) {
+			LOG.debug("Error occurred on EFO subscription: {} - ({}).  {} ({})",
+				throwable.getClass().getName(), throwable.getMessage(), shardId, consumerArn);
+
+			// Cancel the subscription to signal the onNext to stop queuing and requesting data
+			cancelSubscription();
+
+			synchronized (lockObject) {
+				// Empty the queue and add a poison pill to terminate this subscriber
+				// The synchronized block ensures that new data is not written in the meantime
+				queue.clear();
+				enqueueEvent(new SubscriptionErrorEvent(throwable));
+			}
+		}
+
+		@Override
+		public void onComplete() {
+			LOG.debug("EFO subscription complete - {} ({})", shardId, consumerArn);
+			enqueueEvent(new SubscriptionCompleteEvent());
+		}
+
+		private void cancelSubscription() {
+			if (!cancelled) {
+				cancelled = true;
+				subscription.cancel();
+			}
+		}
+
+		/**
+		 * Continuously attempt to enqueue an event until successful or the subscription is cancelled (due to error).
+		 * When backpressure applied by the consumer exceeds 30s for a single batch, a ReadTimeoutException will be
+		 * thrown by the network stack. This will result in the subscription be cancelled and this event being discarded.
+		 * The subscription would subsequently be reacquired and the discarded data would be fetched again.
+		 *
+		 * @param event the event to enqueue
+		 * @return true if the event was successfully enqueued.
+		 */
+		private boolean enqueueEventWithRetry(final FanOutSubscriptionEvent event) {
+			boolean result = false;
+			do {
+				if (cancelled) {
+					break;
+				}
+
+				synchronized (lockObject) {
+					result = enqueueEvent(event);
+				}
+			} while (!result);
+
+			return result;
+		}
+
+		/**
+		 * Offers the event to the queue.
+		 *
+		 * @param event the event to enqueue
+		 * @return true if the event was successfully enqueued.
+		 */
+		private boolean enqueueEvent(final FanOutSubscriptionEvent event) {
+			try {
+				if (!queue.offer(event, ENQUEUE_WAIT_SECONDS, SECONDS)) {
+					LOG.debug("Timed out enqueuing event {} - {} ({})", event.getClass().getSimpleName(), shardId, consumerArn);
+					return false;
+				}
+			} catch (InterruptedException e) {
+				Thread.currentThread().interrupt();
+				throw new RuntimeException(e);
+			}
+
+			return true;
+		}
+	}
+
+	/**
+	 * An exception wrapper to indicate an error has been thrown from the networking stack.
+	 */
+	static class FanOutSubscriberException extends Exception {
+
+		private static final long serialVersionUID = 2275015497000437736L;
+
+		public FanOutSubscriberException(Throwable cause) {
+			super(cause);
+		}
+	}
+
+	/**
+	 * An interface used to pass messages between {@link FanOutShardSubscription} and {@link FanOutShardSubscriber}
+	 * via the {@link BlockingQueue}.
+	 */
+	private interface FanOutSubscriptionEvent {
+
+		default boolean isSubscribeToShardEvent() {
+			return false;
+		}
+
+		default boolean isSubscriptionComplete() {
+			return false;
+		}
+
+		default SubscribeToShardEvent getSubscribeToShardEvent() {
+			throw new UnsupportedOperationException("This event does not support getSubscribeToShardEvent()");
+		}
+
+		default Throwable getThrowable() {
+			throw new UnsupportedOperationException("This event does not support getThrowable()");
+		}
+	}
+
+	/**
+	 * Indicates that an EFO subscription has completed/expired.
+	 */
+	private static class SubscriptionCompleteEvent implements FanOutSubscriptionEvent {
+
+		@Override
+		public boolean isSubscriptionComplete() {
+			return true;
+		}
+	}
+
+	/**
+	 * Poison pill, indicates that an error occurred while consuming from KDS.
+	 */
+	private static class SubscriptionErrorEvent implements FanOutSubscriptionEvent {
+		private final Throwable throwable;
+
+		private SubscriptionErrorEvent(Throwable throwable) {
+			this.throwable = throwable;
+		}
+
+		@Override
+		public Throwable getThrowable() {
+			return throwable;
+		}
+	}
+
+	/**
+	 * A wrapper to pass the next {@link SubscribeToShardEvent} between threads.
+	 */
+	private static class SubscriptionNextEvent implements FanOutSubscriptionEvent {
+		private final SubscribeToShardEvent subscribeToShardEvent;
+
+		private SubscriptionNextEvent(SubscribeToShardEvent subscribeToShardEvent) {
+			this.subscribeToShardEvent = subscribeToShardEvent;
+		}
+
+		@Override
+		public boolean isSubscribeToShardEvent() {
+			return true;
+		}
+
+		@Override
+		public SubscribeToShardEvent getSubscribeToShardEvent() {
+			return subscribeToShardEvent;
+		}
+	}
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java
index 4edc1f0..36d3c69 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java
@@ -34,7 +34,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-import static com.amazonaws.services.kinesis.model.ShardIteratorType.LATEST;
 import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
 import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
 
@@ -156,10 +155,6 @@ public class PollingRecordPublisher implements RecordPublisher {
 	 */
 	@Nullable
 	private String getShardIterator() throws InterruptedException {
-		if (nextStartingPosition.getShardIteratorType() == LATEST && subscribedShard.isClosed()) {
-			return null;
-		}
-
 		return kinesisProxy.getShardIterator(
 			subscribedShard,
 			nextStartingPosition.getShardIteratorType().toString(),
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactory.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactory.java
index ee5034f..00680e8 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactory.java
@@ -38,7 +38,7 @@ public class PollingRecordPublisherFactory implements RecordPublisherFactory {
 
 	private final FlinkKinesisProxyFactory kinesisProxyFactory;
 
-	public PollingRecordPublisherFactory(FlinkKinesisProxyFactory kinesisProxyFactory) {
+	public PollingRecordPublisherFactory(final FlinkKinesisProxyFactory kinesisProxyFactory) {
 		this.kinesisProxyFactory = kinesisProxyFactory;
 	}
 
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/FullJitterBackoff.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/FullJitterBackoff.java
new file mode 100644
index 0000000..5d2ffa7
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/FullJitterBackoff.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.Random;
+
+/**
+ * Used to calculate full jitter backoff sleep durations.
+ * @see <a href="https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/">
+ *        Exponential Backoff and Jitter
+ * 		</a>
+ */
+@Internal
+public class FullJitterBackoff {
+
+	/** Random seed used to calculate backoff jitter for Kinesis operations. */
+	private final Random seed = new Random();
+
+	/**
+	 * Calculates the sleep time for full jitter based on the given parameters.
+	 *
+	 * @param baseMillis the base backoff time in milliseconds
+	 * @param maxMillis the maximum backoff time in milliseconds
+	 * @param power the power constant for exponential backoff
+	 * @param attempt the attempt number
+	 * @return the time to wait before trying again
+	 */
+	public long calculateFullJitterBackoff(long baseMillis, long maxMillis, double power, int attempt) {
+		long exponentialBackoff = (long) Math.min(maxMillis, baseMillis * Math.pow(power, attempt));
+		return (long) (seed.nextDouble() * exponentialBackoff);
+	}
+
+	/**
+	 * Puts the current thread to sleep for the specified number of millis.
+	 * Simply delegates to {@link Thread#sleep}.
+	 *
+	 * @param millisToSleep the number of milliseconds to sleep for
+	 * @throws InterruptedException
+	 */
+	public void sleep(long millisToSleep) throws InterruptedException {
+		Thread.sleep(millisToSleep);
+	}
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 5f2d6d5..b8d3086 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -55,7 +55,6 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Random;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -74,12 +73,12 @@ public class KinesisProxy implements KinesisProxyInterface {
 
 	private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class);
 
+	/** Calculates full jitter backoff delays. */
+	private static final FullJitterBackoff BACKOFF = new FullJitterBackoff();
+
 	/** The actual Kinesis client from the AWS SDK that we will be using to make calls. */
 	private final AmazonKinesis kinesisClient;
 
-	/** Random seed used to calculate backoff jitter for Kinesis operations. */
-	private static final Random seed = new Random();
-
 	// ------------------------------------------------------------------------
 	//  listShards() related performance settings
 	// ------------------------------------------------------------------------
@@ -206,7 +205,6 @@ public class KinesisProxy implements KinesisProxyInterface {
 			configProps.getProperty(
 				ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES,
 				Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_RETRIES)));
-
 	}
 
 	/**
@@ -230,9 +228,6 @@ public class KinesisProxy implements KinesisProxyInterface {
 		return new KinesisProxy(configProps);
 	}
 
-	/**
-	 * {@inheritDoc}
-	 */
 	@Override
 	public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException {
 		final GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
@@ -247,11 +242,11 @@ public class KinesisProxy implements KinesisProxyInterface {
 				getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
 			} catch (SdkClientException ex) {
 				if (isRecoverableSdkClientException(ex)) {
-					long backoffMillis = fullJitterBackoff(
+					long backoffMillis = BACKOFF.calculateFullJitterBackoff(
 						getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, retryCount++);
 					LOG.warn("Got recoverable SdkClientException. Backing off for "
 						+ backoffMillis + " millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
-					Thread.sleep(backoffMillis);
+					BACKOFF.sleep(backoffMillis);
 				} else {
 					throw ex;
 				}
@@ -266,9 +261,6 @@ public class KinesisProxy implements KinesisProxyInterface {
 		return getRecordsResult;
 	}
 
-	/**
-	 * {@inheritDoc}
-	 */
 	@Override
 	public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) throws InterruptedException {
 		GetShardListResult result = new GetShardListResult();
@@ -281,9 +273,6 @@ public class KinesisProxy implements KinesisProxyInterface {
 		return result;
 	}
 
-	/**
-	 * {@inheritDoc}
-	 */
 	@Override
 	public String getShardIterator(StreamShardHandle shard, String shardIteratorType, @Nullable Object startingMarker) throws InterruptedException {
 		GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest()
@@ -322,11 +311,11 @@ public class KinesisProxy implements KinesisProxyInterface {
 					getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
 			} catch (AmazonServiceException ex) {
 				if (isRecoverableException(ex)) {
-					long backoffMillis = fullJitterBackoff(
+					long backoffMillis = BACKOFF.calculateFullJitterBackoff(
 						getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, retryCount++);
 					LOG.warn("Got recoverable AmazonServiceException. Backing off for "
 						+ backoffMillis + " millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
-					Thread.sleep(backoffMillis);
+					BACKOFF.sleep(backoffMillis);
 				} else {
 					throw ex;
 				}
@@ -438,11 +427,11 @@ public class KinesisProxy implements KinesisProxyInterface {
 
 				listShardsResults = kinesisClient.listShards(listShardsRequest);
 			} catch (LimitExceededException le) {
-				long backoffMillis = fullJitterBackoff(
+				long backoffMillis = BACKOFF.calculateFullJitterBackoff(
 						listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, retryCount++);
 					LOG.warn("Got LimitExceededException when listing shards from stream " + streamName
 									+ ". Backing off for " + backoffMillis + " millis.");
-				Thread.sleep(backoffMillis);
+				BACKOFF.sleep(backoffMillis);
 			} catch (ResourceInUseException reInUse) {
 				if (LOG.isWarnEnabled()) {
 					// List Shards will throw an exception if stream in not in active state. Return and re-use previous state available.
@@ -459,11 +448,11 @@ public class KinesisProxy implements KinesisProxyInterface {
 				break;
 			} catch (SdkClientException ex) {
 				if (retryCount < listShardsMaxRetries && isRecoverableSdkClientException(ex)) {
-					long backoffMillis = fullJitterBackoff(
+					long backoffMillis = BACKOFF.calculateFullJitterBackoff(
 						listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, retryCount++);
 					LOG.warn("Got SdkClientException when listing shards from stream {}. Backing off for {} millis.",
 						streamName, backoffMillis);
-					Thread.sleep(backoffMillis);
+					BACKOFF.sleep(backoffMillis);
 				} else {
 					// propagate if retries exceeded or not recoverable
 					// (otherwise would return null result and keep trying forever)
@@ -515,14 +504,14 @@ public class KinesisProxy implements KinesisProxyInterface {
 			try {
 				describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
 			} catch (LimitExceededException le) {
-				long backoffMillis = fullJitterBackoff(
+				long backoffMillis = BACKOFF.calculateFullJitterBackoff(
 						describeStreamBaseBackoffMillis,
 						describeStreamMaxBackoffMillis,
 						describeStreamExpConstant,
 						attemptCount++);
 				LOG.warn(String.format("Got LimitExceededException when describing stream %s. "
 						+ "Backing off for %d millis.", streamName, backoffMillis));
-				Thread.sleep(backoffMillis);
+				BACKOFF.sleep(backoffMillis);
 			} catch (ResourceNotFoundException re) {
 				throw new RuntimeException("Error while getting stream details", re);
 			}
@@ -541,8 +530,4 @@ public class KinesisProxy implements KinesisProxyInterface {
 		return describeStreamResult;
 	}
 
-	protected static long fullJitterBackoff(long base, long max, double power, int attempt) {
-		long exponentialBackoff = (long) Math.min(max, base * Math.pow(power, attempt));
-		return (long) (seed.nextDouble() * exponentialBackoff); // random jitter between 0 and the exponential backoff
-	}
 }
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
index 30464f3..db9c7ca 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
@@ -71,4 +71,5 @@ public interface KinesisProxyInterface {
 	 *                              if the backoff is interrupted.
 	 */
 	GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) throws InterruptedException;
+
 }
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
index d1310e5..26908ce 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
@@ -21,6 +21,10 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.util.Preconditions;
 
 import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+import java.util.concurrent.CompletableFuture;
 
 /**
  * Kinesis proxy implementation using AWS SDK v2.x - a utility class that is used as a proxy to make
@@ -30,10 +34,11 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
 
+	/** An Asynchronous client used to communicate with AWS services. */
 	private final KinesisAsyncClient kinesisAsyncClient;
 
 	/**
-	 * Create a new KinesisProxyV2 based on the supplied configuration properties.
+	 * Create a new KinesisProxyV2 using the provided Async Client.
 	 *
 	 * @param kinesisAsyncClient the kinesis async client used to communicate with Kinesis
 	 */
@@ -41,4 +46,16 @@ public class KinesisProxyV2 implements KinesisProxyV2Interface {
 		this.kinesisAsyncClient = Preconditions.checkNotNull(kinesisAsyncClient);
 	}
 
+	@Override
+	public CompletableFuture<Void> subscribeToShard(
+			final SubscribeToShardRequest request,
+			final SubscribeToShardResponseHandler responseHandler) {
+		return kinesisAsyncClient.subscribeToShard(request, responseHandler);
+	}
+
+	@Override
+	public void close() {
+		kinesisAsyncClient.close();
+	}
+
 }
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Interface.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Interface.java
index aff6a85..e748eb2 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Interface.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Interface.java
@@ -19,10 +19,24 @@ package org.apache.flink.streaming.connectors.kinesis.proxy;
 
 import org.apache.flink.annotation.Internal;
 
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+import java.util.concurrent.CompletableFuture;
+
 /**
  * Interface for a Kinesis proxy using AWS SDK v2.x operating on multiple Kinesis streams within the same AWS service region.
  */
 @Internal
 public interface KinesisProxyV2Interface {
 
+	CompletableFuture<Void> subscribeToShard(SubscribeToShardRequest request, SubscribeToShardResponseHandler responseHandler);
+
+	/**
+	 * Destroy any open resources used by the factory.
+	 */
+	default void close() {
+		// Do nothing by default
+	}
+
 }
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
index 7301e7a..43e47b7 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -56,6 +56,7 @@ import java.util.Map;
 import java.util.Properties;
 
 import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM;
+import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM;
 
 /**
  * Some utilities specific to Amazon Web Service.
@@ -277,7 +278,16 @@ public class AWSUtil {
 	 * @return the starting position
 	 */
 	public static StartingPosition getStartingPosition(final SequenceNumber sequenceNumber, final Properties configProps) {
-		if (SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get().equals(sequenceNumber)) {
+		if (sequenceNumber.equals(SENTINEL_LATEST_SEQUENCE_NUM.get())) {
+			// LATEST starting positions are translated to AT_TIMESTAMP starting positions. This is to prevent data loss
+			// in the situation where the first read times out and is re-attempted. Consider the following scenario:
+			// 1. Consume from LATEST
+			// 2. No records are consumed and Record Publisher throws retryable error
+			// 3. Restart consumption from LATEST
+			// Any records sent between steps 1 and 3 are lost. Using the timestamp of step 1 allows the consumer to
+			// restart from shard position of step 1, and hence no records are lost.
+			return StartingPosition.fromTimestamp(new Date());
+		} else if (SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get().equals(sequenceNumber)) {
 			Date timestamp = KinesisConfigUtil.parseStreamTimestampStartingPosition(configProps);
 			return StartingPosition.fromTimestamp(timestamp);
 		} else {
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
index c4073c3..2326314 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
@@ -34,7 +34,9 @@ import software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider
 import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
 import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
 import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.http.Protocol;
 import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.Http2Configuration;
 import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
 import software.amazon.awssdk.profiles.ProfileFile;
 import software.amazon.awssdk.regions.Region;
@@ -50,12 +52,19 @@ import java.time.Duration;
 import java.util.Optional;
 import java.util.Properties;
 
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_EFO_HTTP_CLIENT_MAX_CONURRENCY;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_HTTP_CLIENT_MAX_CONCURRENCY;
+
 /**
  * Utility methods specific to Amazon Web Service SDK v2.x.
  */
 @Internal
 public class AwsV2Util {
 
+	private static final int INITIAL_WINDOW_SIZE_BYTES = 512 * 1024; // 512 KB
+	private static final Duration HEALTH_CHECK_PING_PERIOD = Duration.ofSeconds(60);
+	private static final Duration CONNECTION_ACQUISITION_TIMEOUT = Duration.ofSeconds(60);
+
 	/**
 	 * Creates an Amazon Kinesis Async Client from the provided properties.
 	 * Configuration is copied from AWS SDK v1 configuration class as per:
@@ -65,8 +74,8 @@ public class AwsV2Util {
 	 * @return a new Amazon Kinesis Client
 	 */
 	public static KinesisAsyncClient createKinesisAsyncClient(final Properties configProps) {
-		final ClientConfiguration config = new ClientConfigurationFactory().getConfig();
-		return createKinesisAsyncClient(configProps, config);
+		ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig();
+		return createKinesisAsyncClient(configProps, clientConfiguration);
 	}
 
 	/**
@@ -79,7 +88,7 @@ public class AwsV2Util {
 	 * @return a new Amazon Kinesis Client
 	 */
 	public static KinesisAsyncClient createKinesisAsyncClient(final Properties configProps, final ClientConfiguration config) {
-		final SdkAsyncHttpClient httpClient = createHttpClient(config, NettyNioAsyncHttpClient.builder());
+		final SdkAsyncHttpClient httpClient = createHttpClient(config, NettyNioAsyncHttpClient.builder(), configProps);
 		final ClientOverrideConfiguration overrideConfiguration = createClientOverrideConfiguration(config, ClientOverrideConfiguration.builder());
 		final KinesisAsyncClientBuilder clientBuilder = KinesisAsyncClient.builder();
 
@@ -89,13 +98,27 @@ public class AwsV2Util {
 	@VisibleForTesting
 	static SdkAsyncHttpClient createHttpClient(
 			final ClientConfiguration config,
-			final NettyNioAsyncHttpClient.Builder httpClientBuilder) {
+			final NettyNioAsyncHttpClient.Builder httpClientBuilder,
+			final Properties consumerConfig) {
+
+		int maxConcurrency = Optional
+			.ofNullable(consumerConfig.getProperty(EFO_HTTP_CLIENT_MAX_CONCURRENCY))
+			.map(Integer::parseInt)
+			.orElse(DEFAULT_EFO_HTTP_CLIENT_MAX_CONURRENCY);
+
 		httpClientBuilder
-			.maxConcurrency(config.getMaxConnections())
+			.maxConcurrency(maxConcurrency)
 			.connectionTimeout(Duration.ofMillis(config.getConnectionTimeout()))
 			.writeTimeout(Duration.ofMillis(config.getSocketTimeout()))
 			.connectionMaxIdleTime(Duration.ofMillis(config.getConnectionMaxIdleMillis()))
-			.useIdleConnectionReaper(config.useReaper());
+			.useIdleConnectionReaper(config.useReaper())
+			.protocol(Protocol.HTTP2)
+			.connectionAcquisitionTimeout(CONNECTION_ACQUISITION_TIMEOUT)
+			.http2Configuration(Http2Configuration
+				.builder()
+				.healthCheckPingPeriod(HEALTH_CHECK_PING_PERIOD)
+				.initialWindowSize(INITIAL_WINDOW_SIZE_BYTES)
+				.build());
 
 		if (config.getConnectionTTL() > -1) {
 			httpClientBuilder.connectionTimeToLive(Duration.ofMillis(config.getConnectionTTL()));
@@ -248,4 +271,5 @@ public class AwsV2Util {
 	public static Region getRegion(final Properties configProps) {
 		return Region.of(configProps.getProperty(AWSConfigConstants.AWS_REGION));
 	}
+
 }
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index fc512e7..9626478 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -247,6 +247,9 @@ public class KinesisConfigUtil {
 					ConsumerConfigConstants.MAX_SHARD_GETRECORDS_INTERVAL_MILLIS + " milliseconds."
 			);
 		}
+
+		validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.EFO_HTTP_CLIENT_MAX_CONCURRENCY,
+			"Invalid value given for EFO HTTP client max concurrency. Must be positive.");
 	}
 
 	/**
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 061120b..8f77d45 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -748,8 +748,9 @@ public class FlinkKinesisConsumerTest extends TestLogger {
 							new AtomicReference<>(),
 							new ArrayList<>(),
 							subscribedStreamsToLastDiscoveredShardIds,
-							(props) -> FakeKinesisBehavioursFactory.blockingQueueGetRecords(streamToQueueMap)
-							) {};
+							(props) -> FakeKinesisBehavioursFactory.blockingQueueGetRecords(streamToQueueMap),
+							null) {
+						};
 					return fetcher;
 				}
 			};
@@ -880,9 +881,8 @@ public class FlinkKinesisConsumerTest extends TestLogger {
 							new AtomicReference<>(),
 							new ArrayList<>(),
 							subscribedStreamsToLastDiscoveredShardIds,
-							(props) -> FakeKinesisBehavioursFactory.blockingQueueGetRecords(
-								streamToQueueMap)
-						) {
+							(props) -> FakeKinesisBehavioursFactory.blockingQueueGetRecords(streamToQueueMap),
+							null) {
 							@Override
 							protected void emitWatermark() {
 								// necessary in this test to ensure that watermark state is updated
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index 478564b..f6de864 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -26,11 +26,13 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kinesis.testutils.AlwaysThrowsDeserializationSchema;
@@ -67,12 +69,16 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static java.util.Collections.singletonList;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RECORD_PUBLISHER_TYPE;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType.EFO;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /**
@@ -114,7 +120,7 @@ public class KinesisDataFetcherTest extends TestLogger {
 		final TestSourceContext<String> sourceContext = new TestSourceContext<>();
 
 		final TestableKinesisDataFetcher<String> fetcher = new TestableKinesisDataFetcher<>(
-			Collections.singletonList(stream),
+			singletonList(stream),
 			sourceContext,
 			TestUtils.getStandardProperties(),
 			new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
@@ -761,7 +767,7 @@ public class KinesisDataFetcherTest extends TestLogger {
 
 		final KinesisDataFetcher<String> fetcher =
 			new TestableKinesisDataFetcher<String>(
-				Collections.singletonList(fakeStream1),
+				singletonList(fakeStream1),
 				sourceContext,
 				new java.util.Properties(),
 				new KinesisDeserializationSchemaWrapper<>(new org.apache.flink.streaming.util.serialization.SimpleStringSchema()),
@@ -825,14 +831,14 @@ public class KinesisDataFetcherTest extends TestLogger {
 		Map<String, List<BlockingQueue<String>>> streamsToShardQueues = new HashMap<>();
 		LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(10);
 		queue.put("item1");
-		streamsToShardQueues.put(stream, Collections.singletonList(queue));
+		streamsToShardQueues.put(stream, singletonList(queue));
 
 		AlwaysThrowsDeserializationSchema deserializationSchema = new AlwaysThrowsDeserializationSchema();
 		KinesisProxyInterface fakeKinesis =
 			FakeKinesisBehavioursFactory.blockingQueueGetRecords(streamsToShardQueues);
 
 		TestableKinesisDataFetcherForShardConsumerException<String> fetcher = new TestableKinesisDataFetcherForShardConsumerException<>(
-			Collections.singletonList(stream),
+			singletonList(stream),
 			new TestSourceContext<>(),
 			TestUtils.getStandardProperties(),
 			new KinesisDeserializationSchemaWrapper<>(deserializationSchema),
@@ -841,7 +847,8 @@ public class KinesisDataFetcherTest extends TestLogger {
 			new AtomicReference<>(),
 			new LinkedList<>(),
 			new HashMap<>(),
-			fakeKinesis);
+			fakeKinesis,
+			(sequence, properties, metricGroup, streamShardHandle) -> mock(RecordPublisher.class));
 
 		DummyFlinkKinesisConsumer<String> consumer = new DummyFlinkKinesisConsumer<>(
 			TestUtils.getStandardProperties(), fetcher, 1, 0);
@@ -881,4 +888,32 @@ public class KinesisDataFetcherTest extends TestLogger {
 		assertTrue("Expected Fetcher to have been interrupted. This test didn't accomplish its goal.",
 			fetcher.wasInterrupted);
 	}
+
+	@Test
+	public void testRecordPublisherFactoryIsTornDown() {
+		Properties config = TestUtils.getStandardProperties();
+		config.setProperty(RECORD_PUBLISHER_TYPE, EFO.name());
+
+		KinesisProxyV2Interface kinesisV2 = mock(KinesisProxyV2Interface.class);
+
+		TestableKinesisDataFetcher<String> fetcher =
+			new TestableKinesisDataFetcher<String>(
+				singletonList("fakeStream1"),
+				new TestSourceContext<>(),
+				config,
+				new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
+				10,
+				2,
+				new AtomicReference<>(),
+				new LinkedList<>(),
+				new HashMap<>(),
+				mock(KinesisProxyInterface.class),
+				kinesisV2) {
+			};
+
+		fetcher.shutdownFetcher();
+
+		verify(kinesisV2).close();
+	}
+
 }
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerFanOutTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerFanOutTest.java
new file mode 100644
index 0000000..c9ae709
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerFanOutTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisherFactory;
+import org.apache.flink.streaming.connectors.kinesis.metrics.ShardConsumerMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
+import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory;
+import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.AbstractSingleShardFanOutKinesisV2;
+import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.SingleShardFanOutKinesisV2;
+
+import org.junit.Test;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.util.Properties;
+
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT;
+import static org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumerTestUtils.fakeSequenceNumber;
+import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM;
+import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM;
+import static org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils.efoProperties;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER;
+import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER;
+import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
+
+/**
+ * Tests for the {@link ShardConsumer} using Fan Out consumption mocked Kinesis behaviours.
+ */
+public class ShardConsumerFanOutTest {
+
+	@Test
+	public void testEmptyShard() throws Exception {
+		SingleShardFanOutKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory.emptyShard();
+
+		assertNumberOfMessagesReceivedFromKinesis(0, kinesis, fakeSequenceNumber());
+
+		assertEquals(1, kinesis.getNumberOfSubscribeToShardInvocations());
+	}
+
+	@Test
+	public void testStartFromLatestIsTranslatedToTimestamp() throws Exception {
+		Instant now = Instant.now();
+		SingleShardFanOutKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory.boundedShard().build();
+		SequenceNumber sequenceNumber = SENTINEL_LATEST_SEQUENCE_NUM.get();
+
+		// Fake behaviour defaults to 10 messages
+		assertNumberOfMessagesReceivedFromKinesis(10, kinesis, sequenceNumber, efoProperties());
+
+		StartingPosition actual = kinesis.getStartingPositionForSubscription(0);
+		assertEquals(AT_TIMESTAMP, actual.type());
+		assertTrue(now.equals(actual.timestamp()) || now.isBefore(actual.timestamp()));
+	}
+
+	@Test
+	public void testStartFromLatestReceivesNoRecordsContinuesToUseTimestamp() throws Exception {
+		AbstractSingleShardFanOutKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory.emptyBatchFollowedBySingleRecord();
+
+		SequenceNumber sequenceNumber = SENTINEL_LATEST_SEQUENCE_NUM.get();
+
+		// Fake behaviour defaults to 10 messages
+		assertNumberOfMessagesReceivedFromKinesis(1, kinesis, sequenceNumber, efoProperties());
+
+		// This fake Kinesis will give 2 subscriptions
+		assertEquals(2, kinesis.getNumberOfSubscribeToShardInvocations());
+
+		assertEquals(AT_TIMESTAMP, kinesis.getStartingPositionForSubscription(0).type());
+		assertEquals(AT_TIMESTAMP, kinesis.getStartingPositionForSubscription(1).type());
+	}
+
+	@Test
+	public void testBoundedShardConsumesFromTimestamp() throws Exception {
+		String format = "yyyy-MM-dd'T'HH:mm";
+		String timestamp = "2020-07-02T09:14";
+		Instant expectedTimestamp = new SimpleDateFormat(format).parse(timestamp).toInstant();
+
+		SingleShardFanOutKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory.boundedShard().build();
+
+		Properties consumerConfig = efoProperties();
+		consumerConfig.setProperty(STREAM_INITIAL_TIMESTAMP, timestamp);
+		consumerConfig.setProperty(STREAM_TIMESTAMP_DATE_FORMAT, format);
+		SequenceNumber sequenceNumber = SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get();
+
+		// Fake behaviour defaults to 10 messages
+		assertNumberOfMessagesReceivedFromKinesis(10, kinesis, sequenceNumber, consumerConfig);
+
+		StartingPosition actual = kinesis.getStartingPositionForSubscription(0);
+		assertEquals(AT_TIMESTAMP, actual.type());
+		assertEquals(expectedTimestamp, actual.timestamp());
+	}
+
+	@Test
+	public void testMillisBehindReported() throws Exception {
+		SingleShardFanOutKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory
+			.boundedShard()
+			.withMillisBehindLatest(123L)
+			.build();
+
+		// Fake behaviour defaults to 10 messages
+		ShardConsumerMetricsReporter metrics = assertNumberOfMessagesReceivedFromKinesis(10, kinesis, fakeSequenceNumber());
+
+		assertEquals(123L, metrics.getMillisBehindLatest());
+	}
+
+	@Test
+	public void testBoundedShardConsumesCorrectNumberOfMessages() throws Exception {
+		SingleShardFanOutKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory
+			.boundedShard()
+			.withBatchCount(10)
+			.withRecordsPerBatch(5)
+			.build();
+
+		// 10 batches of 5 records = 50
+		assertNumberOfMessagesReceivedFromKinesis(50, kinesis, fakeSequenceNumber());
+
+		assertEquals(1, kinesis.getNumberOfSubscribeToShardInvocations());
+	}
+
+	@Test
+	public void testBoundedShardResubscribesToShard() throws Exception {
+		SingleShardFanOutKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory
+			.boundedShard()
+			.withBatchCount(100)
+			.withRecordsPerBatch(10)
+			.withBatchesPerSubscription(5)
+			.build();
+
+		// 100 batches of 10 records = 1000
+		assertNumberOfMessagesReceivedFromKinesis(1000, kinesis, fakeSequenceNumber());
+
+		// 100 batches / 5 batches per subscription = 20 subscriptions
+		assertEquals(20, kinesis.getNumberOfSubscribeToShardInvocations());
+
+		// Starting from non-aggregated sequence number means we should start AFTER the sequence number
+		assertEquals(AFTER_SEQUENCE_NUMBER, kinesis.getStartingPositionForSubscription(0).type());
+	}
+
+	@Test
+	public void testBoundedShardWithAggregatedRecords() throws Exception {
+		SingleShardFanOutKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory
+			.boundedShard()
+			.withBatchCount(100)
+			.withRecordsPerBatch(10)
+			.withAggregationFactor(100)
+			.build();
+
+		// 100 batches of 10 records * 100 aggregation factor = 100000
+		assertNumberOfMessagesReceivedFromKinesis(100000, kinesis, fakeSequenceNumber());
+	}
+
+	@Test
+	public void testBoundedShardResumingConsumptionFromAggregatedSubsequenceNumber() throws Exception {
+		SingleShardFanOutKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory
+			.boundedShard()
+			.withBatchCount(10)
+			.withRecordsPerBatch(1)
+			.withAggregationFactor(10)
+			.build();
+
+		SequenceNumber subsequenceNumber = new SequenceNumber("1", 5);
+
+		// 10 batches of 1 record * 10 aggregation factor - 6 previously consumed subsequence records (0,1,2,3,4,5) = 94
+		assertNumberOfMessagesReceivedFromKinesis(94, kinesis, subsequenceNumber);
+
+		// Starting from aggregated sequence number means we should start AT the sequence number
+		assertEquals(AT_SEQUENCE_NUMBER, kinesis.getStartingPositionForSubscription(0).type());
+	}
+
+	@Test
+	public void testSubscribeToShardUsesCorrectStartingSequenceNumbers() throws Exception {
+		SingleShardFanOutKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory
+			.boundedShard()
+			.withBatchCount(10)
+			.withRecordsPerBatch(1)
+			.withBatchesPerSubscription(2)
+			.build();
+
+		// 10 batches of 1 records = 10
+		assertNumberOfMessagesReceivedFromKinesis(10, kinesis, new SequenceNumber("0"));
+
+		// 10 batches / 2 batches per subscription = 5 subscriptions
+		assertEquals(5, kinesis.getNumberOfSubscribeToShardInvocations());
+
+		// Starting positions should correlate to the last consumed sequence number
+		assertStartingPositionAfterSequenceNumber(kinesis.getStartingPositionForSubscription(0), "0");
+		assertStartingPositionAfterSequenceNumber(kinesis.getStartingPositionForSubscription(1), "2");
+		assertStartingPositionAfterSequenceNumber(kinesis.getStartingPositionForSubscription(2), "4");
+		assertStartingPositionAfterSequenceNumber(kinesis.getStartingPositionForSubscription(3), "6");
+		assertStartingPositionAfterSequenceNumber(kinesis.getStartingPositionForSubscription(4), "8");
+	}
+
+	private void assertStartingPositionAfterSequenceNumber(
+			final StartingPosition startingPosition,
+			final String sequenceNumber) {
+		assertEquals(AFTER_SEQUENCE_NUMBER, startingPosition.type());
+		assertEquals(sequenceNumber, startingPosition.sequenceNumber());
+	}
+
+	private ShardConsumerMetricsReporter assertNumberOfMessagesReceivedFromKinesis(
+				final int expectedNumberOfMessages,
+				final KinesisProxyV2Interface kinesis,
+				final SequenceNumber startingSequenceNumber) throws Exception {
+		return assertNumberOfMessagesReceivedFromKinesis(
+			expectedNumberOfMessages,
+			kinesis,
+			startingSequenceNumber,
+			efoProperties());
+	}
+
+	private ShardConsumerMetricsReporter assertNumberOfMessagesReceivedFromKinesis(
+			final int expectedNumberOfMessages,
+			final KinesisProxyV2Interface kinesis,
+			final SequenceNumber startingSequenceNumber,
+			final Properties consumerConfig) throws Exception {
+		return ShardConsumerTestUtils.assertNumberOfMessagesReceivedFromKinesis(
+			expectedNumberOfMessages,
+			new FanOutRecordPublisherFactory(kinesis),
+			startingSequenceNumber,
+			consumerConfig);
+	}
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
index b39b99e..40a599c 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
@@ -17,52 +17,32 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
 import org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisherFactory;
 import org.apache.flink.streaming.connectors.kinesis.metrics.ShardConsumerMetricsReporter;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
-import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
-import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
-import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory;
-import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
-import org.apache.flink.streaming.connectors.kinesis.testutils.TestSourceContext;
-import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcher;
-import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
-
-import com.amazonaws.services.kinesis.model.HashKeyRange;
-import com.amazonaws.services.kinesis.model.Shard;
-import org.apache.commons.lang3.StringUtils;
+
 import org.junit.Test;
-import org.mockito.Mockito;
 
-import java.math.BigInteger;
 import java.text.SimpleDateFormat;
-import java.util.Collections;
 import java.util.Date;
-import java.util.LinkedList;
 import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS;
 import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP;
 import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT;
+import static org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumerTestUtils.fakeSequenceNumber;
 import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM;
 import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM;
-import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
 /**
- * Tests for the {@link ShardConsumer}.
+ * Tests for the {@link ShardConsumer} using Polling consumption mocked Kinesis behaviours.
  */
 public class ShardConsumerTest {
 
@@ -161,83 +141,24 @@ public class ShardConsumerTest {
 		verify(kinesis).getShardIterator(any(), eq("AT_SEQUENCE_NUMBER"), eq("0"));
 	}
 
-	private SequenceNumber fakeSequenceNumber() {
-		return new SequenceNumber("fakeStartingState");
-	}
-
 	private ShardConsumerMetricsReporter assertNumberOfMessagesReceivedFromKinesis(
-		final int expectedNumberOfMessages,
-		final KinesisProxyInterface kinesis,
-		final SequenceNumber startingSequenceNumber) throws Exception {
+			final int expectedNumberOfMessages,
+			final KinesisProxyInterface kinesis,
+			final SequenceNumber startingSequenceNumber) throws Exception {
 		return assertNumberOfMessagesReceivedFromKinesis(expectedNumberOfMessages, kinesis, startingSequenceNumber, new Properties());
 	}
 
 	private ShardConsumerMetricsReporter assertNumberOfMessagesReceivedFromKinesis(
-		final int expectedNumberOfMessages,
-		final KinesisProxyInterface kinesis,
-		final SequenceNumber startingSequenceNumber,
-		final Properties consumerProperties) throws Exception {
-		ShardConsumerMetricsReporter shardMetricsReporter = new ShardConsumerMetricsReporter(mock(MetricGroup.class));
-
-		StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);
-
-		LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
-		subscribedShardsStateUnderTest.add(
-			new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
-				fakeToBeConsumedShard, startingSequenceNumber));
-
-		TestSourceContext<String> sourceContext = new TestSourceContext<>();
-
-		KinesisDeserializationSchemaWrapper<String> deserializationSchema = new KinesisDeserializationSchemaWrapper<>(
-			new SimpleStringSchema());
-		TestableKinesisDataFetcher<String> fetcher =
-			new TestableKinesisDataFetcher<>(
-				Collections.singletonList("fakeStream"),
-				sourceContext,
-				consumerProperties,
-				deserializationSchema,
-				10,
-				2,
-				new AtomicReference<>(),
-				subscribedShardsStateUnderTest,
-				KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
-				Mockito.mock(KinesisProxyInterface.class));
-
-		final StreamShardHandle shardHandle = subscribedShardsStateUnderTest.get(0).getStreamShardHandle();
-		SequenceNumber lastProcessedSequenceNum = subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum();
-		StartingPosition startingPosition = AWSUtil.getStartingPosition(lastProcessedSequenceNum, consumerProperties);
-
-		final RecordPublisher recordPublisher = new PollingRecordPublisherFactory(config -> kinesis)
-			.create(startingPosition, fetcher.getConsumerConfiguration(), mock(MetricGroup.class), shardHandle);
-
-		int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
-		new ShardConsumer<>(
-			fetcher,
-			recordPublisher,
-			shardIndex,
-			shardHandle,
-			lastProcessedSequenceNum,
-			shardMetricsReporter,
-			deserializationSchema)
-			.run();
-
-		assertEquals(expectedNumberOfMessages, sourceContext.getCollectedOutputs().size());
-		assertEquals(
-			SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
-			subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum());
-
-		return shardMetricsReporter;
-	}
-
-	private static StreamShardHandle getMockStreamShard(String streamName, int shardId) {
-		return new StreamShardHandle(
-			streamName,
-			new Shard()
-				.withShardId(KinesisShardIdGenerator.generateFromShardOrder(shardId))
-				.withHashKeyRange(
-					new HashKeyRange()
-						.withStartingHashKey("0")
-						.withEndingHashKey(new BigInteger(StringUtils.repeat("FF", 16), 16).toString())));
+			final int expectedNumberOfMessages,
+			final KinesisProxyInterface kinesis,
+			final SequenceNumber startingSequenceNumber,
+			final Properties consumerProperties) throws Exception {
+
+		return ShardConsumerTestUtils.assertNumberOfMessagesReceivedFromKinesis(
+			expectedNumberOfMessages,
+			new PollingRecordPublisherFactory(config -> kinesis),
+			startingSequenceNumber,
+			consumerProperties);
 	}
 
 }
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTestUtils.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTestUtils.java
new file mode 100644
index 0000000..3be976a
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTestUtils.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisherFactory;
+import org.apache.flink.streaming.connectors.kinesis.metrics.ShardConsumerMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestSourceContext;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcher;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+
+import com.amazonaws.services.kinesis.model.HashKeyRange;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.commons.lang3.StringUtils;
+import org.mockito.Mockito;
+
+import java.math.BigInteger;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the {@link ShardConsumer}.
+ */
+public class ShardConsumerTestUtils {
+
+	public static <T> ShardConsumerMetricsReporter assertNumberOfMessagesReceivedFromKinesis(
+				final int expectedNumberOfMessages,
+				final RecordPublisherFactory recordPublisherFactory,
+				final SequenceNumber startingSequenceNumber,
+				final Properties consumerProperties) throws InterruptedException {
+		ShardConsumerMetricsReporter shardMetricsReporter = new ShardConsumerMetricsReporter(mock(MetricGroup.class));
+
+		StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);
+
+		LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
+		subscribedShardsStateUnderTest.add(
+			new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
+				fakeToBeConsumedShard, startingSequenceNumber));
+
+		TestSourceContext<String> sourceContext = new TestSourceContext<>();
+
+		KinesisDeserializationSchemaWrapper<String> deserializationSchema = new KinesisDeserializationSchemaWrapper<>(
+			new SimpleStringSchema());
+		TestableKinesisDataFetcher<String> fetcher =
+			new TestableKinesisDataFetcher<>(
+				Collections.singletonList("fakeStream"),
+				sourceContext,
+				consumerProperties,
+				deserializationSchema,
+				10,
+				2,
+				new AtomicReference<>(),
+				subscribedShardsStateUnderTest,
+				KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
+				Mockito.mock(KinesisProxyInterface.class),
+				Mockito.mock(KinesisProxyV2Interface.class));
+
+		final StreamShardHandle shardHandle = subscribedShardsStateUnderTest.get(0).getStreamShardHandle();
+		final SequenceNumber lastProcessedSequenceNum = subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum();
+		final StartingPosition startingPosition = AWSUtil.getStartingPosition(lastProcessedSequenceNum, consumerProperties);
+
+		final RecordPublisher recordPublisher = recordPublisherFactory
+			.create(startingPosition, fetcher.getConsumerConfiguration(), mock(MetricGroup.class), shardHandle);
+
+		int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
+		new ShardConsumer<>(
+			fetcher,
+			recordPublisher,
+			shardIndex,
+			shardHandle,
+			lastProcessedSequenceNum,
+			shardMetricsReporter,
+			deserializationSchema)
+			.run();
+
+		assertEquals(expectedNumberOfMessages, sourceContext.getCollectedOutputs().size());
+		assertEquals(
+			SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
+			subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum());
+
+		return shardMetricsReporter;
+	}
+
+	public static StreamShardHandle getMockStreamShard(String streamName, int shardId) {
+		return new StreamShardHandle(
+			streamName,
+			new Shard()
+				.withShardId(KinesisShardIdGenerator.generateFromShardOrder(shardId))
+				.withHashKeyRange(
+					new HashKeyRange()
+						.withStartingHashKey("0")
+						.withEndingHashKey(new BigInteger(StringUtils.repeat("FF", 16), 16).toString())));
+	}
+
+	public static SequenceNumber fakeSequenceNumber() {
+		return new SequenceNumber("fakeStartingState");
+	}
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfigurationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfigurationTest.java
index ebeaa33..2b4ee99 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfigurationTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfigurationTest.java
@@ -1,4 +1,3 @@
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherTest.java
new file mode 100644
index 0000000..97aef9f
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherTest.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.proxy.FullJitterBackoff;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
+import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory;
+import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.SingleShardFanOutKinesisV2;
+import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.SubscriptionErrorKinesisV2;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils.TestConsumer;
+
+import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
+import org.hamcrest.Matchers;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+
+import java.nio.ByteBuffer;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.emptyList;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_CONSUMER_NAME;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RECORD_PUBLISHER_TYPE;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType.EFO;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_BASE;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_MAX;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_RETRIES;
+import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
+import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
+import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM;
+import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM;
+import static org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.SubscriptionErrorKinesisV2.NUMBER_OF_SUBSCRIPTIONS;
+import static org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.emptyShard;
+import static org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.singletonShard;
+import static org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils.createDummyStreamShardHandle;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.anyDouble;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER;
+import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER;
+import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
+import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.LATEST;
+import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.TRIM_HORIZON;
+
+/**
+ * Tests for {@link FanOutRecordPublisher}.
+ */
+public class FanOutRecordPublisherTest {
+
+	@Rule
+	public ExpectedException thrown = ExpectedException.none();
+
+	private static final long EXPECTED_SUBSCRIBE_TO_SHARD_MAX = 1;
+	private static final long EXPECTED_SUBSCRIBE_TO_SHARD_BASE = 2;
+	private static final double EXPECTED_SUBSCRIBE_TO_SHARD_POW = 0.5;
+	private static final int EXPECTED_SUBSCRIBE_TO_SHARD_RETRIES = 3;
+
+	private static final String DUMMY_SEQUENCE = "1";
+
+	private static final SequenceNumber SEQUENCE_NUMBER = new SequenceNumber(DUMMY_SEQUENCE);
+
+	private static final SequenceNumber AGGREGATED_SEQUENCE_NUMBER = new SequenceNumber(DUMMY_SEQUENCE, 1L);
+
+	@Test
+	public void testToSdkV2StartingPositionAfterSequenceNumber() throws Exception {
+		SingleShardFanOutKinesisV2 kinesis = emptyShard();
+
+		RecordPublisher publisher = createRecordPublisher(kinesis, StartingPosition.continueFromSequenceNumber(SEQUENCE_NUMBER));
+		publisher.run(new TestConsumer());
+
+		assertEquals(DUMMY_SEQUENCE, kinesis.getStartingPositionForSubscription(0).sequenceNumber());
+		assertEquals(AFTER_SEQUENCE_NUMBER, kinesis.getStartingPositionForSubscription(0).type());
+	}
+
+	@Test
+	public void testToSdkV2StartingPositionAtSequenceNumber() throws Exception {
+		SingleShardFanOutKinesisV2 kinesis = emptyShard();
+
+		RecordPublisher publisher = createRecordPublisher(kinesis, StartingPosition.restartFromSequenceNumber(AGGREGATED_SEQUENCE_NUMBER));
+		publisher.run(new TestConsumer());
+
+		assertEquals(DUMMY_SEQUENCE, kinesis.getStartingPositionForSubscription(0).sequenceNumber());
+		assertEquals(AT_SEQUENCE_NUMBER, kinesis.getStartingPositionForSubscription(0).type());
+	}
+
+	@Test
+	public void testToSdkV2StartingPositionLatest() throws Exception {
+		SingleShardFanOutKinesisV2 kinesis = emptyShard();
+
+		RecordPublisher publisher = createRecordPublisher(kinesis, latest());
+		publisher.run(new TestConsumer());
+
+		assertNull(kinesis.getStartingPositionForSubscription(0).sequenceNumber());
+		assertEquals(LATEST, kinesis.getStartingPositionForSubscription(0).type());
+	}
+
+	@Test
+	public void testToSdkV2StartingPositionTrimHorizon() throws Exception {
+		SingleShardFanOutKinesisV2 kinesis = emptyShard();
+
+		RecordPublisher publisher = createRecordPublisher(kinesis, StartingPosition.continueFromSequenceNumber(SENTINEL_EARLIEST_SEQUENCE_NUM.get()));
+		publisher.run(new TestConsumer());
+
+		assertNull(kinesis.getStartingPositionForSubscription(0).sequenceNumber());
+		assertEquals(TRIM_HORIZON, kinesis.getStartingPositionForSubscription(0).type());
+	}
+
+	@Test
+	public void testToSdkV2StartingPositionAtTimeStamp() throws Exception {
+		SingleShardFanOutKinesisV2 kinesis = emptyShard();
+		Date now = new Date();
+
+		RecordPublisher publisher = createRecordPublisher(kinesis, StartingPosition.fromTimestamp(now));
+		publisher.run(new TestConsumer());
+
+		assertEquals(now.toInstant(), kinesis.getStartingPositionForSubscription(0).timestamp());
+		assertEquals(AT_TIMESTAMP, kinesis.getStartingPositionForSubscription(0).type());
+	}
+
+	@Test
+	public void testToSdkV1Records() throws Exception {
+		Date now = new Date();
+		byte[] data = new byte[] { 0, 1, 2, 3 };
+
+		Record record = Record
+			.builder()
+			.approximateArrivalTimestamp(now.toInstant())
+			.partitionKey("pk")
+			.sequenceNumber("sn")
+			.data(SdkBytes.fromByteArray(data))
+			.build();
+
+		KinesisProxyV2Interface kinesis = singletonShard(createSubscribeToShardEvent(record));
+		RecordPublisher publisher = createRecordPublisher(kinesis, latest());
+
+		TestConsumer consumer = new TestConsumer();
+		publisher.run(consumer);
+
+		UserRecord actual = consumer.getRecordBatches().get(0).getDeaggregatedRecords().get(0);
+		assertFalse(actual.isAggregated());
+		assertEquals(now, actual.getApproximateArrivalTimestamp());
+		assertEquals("sn", actual.getSequenceNumber());
+		assertEquals("pk", actual.getPartitionKey());
+		assertThat(toByteArray(actual.getData()), Matchers.equalTo(data));
+	}
+
+	@Test
+	public void testExceptionThrownInConsumerPropagatesToRecordPublisher() throws Exception {
+		thrown.expect(RuntimeException.class);
+		thrown.expectMessage("An error thrown from the consumer");
+
+		SingleShardFanOutKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory.boundedShard().build();
+		RecordPublisher recordPublisher = createRecordPublisher(kinesis);
+
+		recordPublisher.run(batch -> {
+			throw new RuntimeException("An error thrown from the consumer");
+		});
+	}
+
+	@Test
+	public void testResourceNotFoundWhenObtainingSubscriptionTerminatesApplication() throws Exception {
+		thrown.expect(ResourceNotFoundException.class);
+
+		KinesisProxyV2Interface kinesis = FakeKinesisFanOutBehavioursFactory.resourceNotFoundWhenObtainingSubscription();
+		RecordPublisher recordPublisher = createRecordPublisher(kinesis);
+
+		recordPublisher.run(new TestConsumer());
+	}
+
+	@Test
+	public void testShardConsumerCompletesIfResourceNotFoundExceptionThrownFromSubscription() throws Exception {
+		ResourceNotFoundException exception = ResourceNotFoundException.builder().build();
+		SubscriptionErrorKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(exception);
+		RecordPublisher recordPublisher = createRecordPublisher(kinesis);
+		TestConsumer consumer = new TestConsumer();
+
+		assertEquals(COMPLETE, recordPublisher.run(consumer));
+
+		// Will exit on the first subscription
+		assertEquals(1, kinesis.getNumberOfSubscribeToShardInvocations());
+	}
+
+	@Test
+	public void testShardConsumerRetriesIfLimitExceededExceptionThrownFromSubscription() throws Exception {
+		LimitExceededException exception = LimitExceededException.builder().build();
+		SubscriptionErrorKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(exception);
+		RecordPublisher recordPublisher = createRecordPublisher(kinesis);
+		TestConsumer consumer = new TestConsumer();
+
+		int count = 0;
+		while (recordPublisher.run(consumer) == INCOMPLETE) {
+			if (++count > NUMBER_OF_SUBSCRIPTIONS + 1) {
+				break;
+			}
+		}
+
+		// An exception is thrown on the 5th subscription and then the subscription completes on the next
+		assertEquals(NUMBER_OF_SUBSCRIPTIONS + 1, kinesis.getNumberOfSubscribeToShardInvocations());
+	}
+
+	@Test
+	public void testSubscribeToShardBacksOffForRetryableError() throws Exception {
+		LimitExceededException retryableError = LimitExceededException.builder().build();
+		SubscriptionErrorKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(retryableError);
+		FanOutRecordPublisherConfiguration configuration = createConfiguration();
+
+		FullJitterBackoff backoff = mock(FullJitterBackoff.class);
+		when(backoff.calculateFullJitterBackoff(anyLong(), anyLong(), anyDouble(), anyInt())).thenReturn(100L);
+
+		new FanOutRecordPublisher(latest(), "arn", createDummyStreamShardHandle(), kinesis, configuration, backoff)
+			.run(new TestConsumer());
+
+		verify(backoff).calculateFullJitterBackoff(
+			EXPECTED_SUBSCRIBE_TO_SHARD_BASE,
+			EXPECTED_SUBSCRIBE_TO_SHARD_MAX,
+			EXPECTED_SUBSCRIBE_TO_SHARD_POW,
+			1
+		);
+
+		verify(backoff).sleep(100L);
+	}
+
+	@Test
+	public void testSubscribeToShardFailsWhenMaxRetriesExceeded() throws Exception {
+		thrown.expect(RuntimeException.class);
+		thrown.expectMessage("Maximum reties exceeded for SubscribeToShard. Failed 3 times.");
+
+		Properties efoProperties = createEfoProperties();
+		efoProperties.setProperty(SUBSCRIBE_TO_SHARD_RETRIES, String.valueOf(EXPECTED_SUBSCRIBE_TO_SHARD_RETRIES));
+		FanOutRecordPublisherConfiguration configuration = new FanOutRecordPublisherConfiguration(efoProperties, emptyList());
+
+		LimitExceededException retryableError = LimitExceededException.builder().build();
+		SubscriptionErrorKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(retryableError);
+		FullJitterBackoff backoff = mock(FullJitterBackoff.class);
+
+		FanOutRecordPublisher recordPublisher = new FanOutRecordPublisher(latest(), "arn", createDummyStreamShardHandle(), kinesis, configuration, backoff);
+
+		int count = 0;
+		while (recordPublisher.run(new TestConsumer()) == INCOMPLETE) {
+			if (++count > 3) {
+				break;
+			}
+		}
+	}
+
+	@Test
+	public void testSubscribeToShardBacksOffAttemptIncreases() throws Exception {
+		LimitExceededException retryableError = LimitExceededException.builder().build();
+		SubscriptionErrorKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(retryableError);
+		FanOutRecordPublisherConfiguration configuration = createConfiguration();
+
+		FullJitterBackoff backoff = mock(FullJitterBackoff.class);
+
+		FanOutRecordPublisher recordPublisher = new FanOutRecordPublisher(latest(), "arn", createDummyStreamShardHandle(), kinesis, configuration, backoff);
+
+		recordPublisher.run(new TestConsumer());
+		recordPublisher.run(new TestConsumer());
+		recordPublisher.run(new TestConsumer());
+
+		verify(backoff).calculateFullJitterBackoff(anyLong(), anyLong(), anyDouble(), eq(1));
+		verify(backoff).calculateFullJitterBackoff(anyLong(), anyLong(), anyDouble(), eq(2));
+		verify(backoff).calculateFullJitterBackoff(anyLong(), anyLong(), anyDouble(), eq(3));
+
+		verify(backoff, never()).calculateFullJitterBackoff(anyLong(), anyLong(), anyDouble(), eq(0));
+		verify(backoff, never()).calculateFullJitterBackoff(anyLong(), anyLong(), anyDouble(), eq(4));
+	}
+
+	@Test
+	public void testBackOffAttemptResetsWithSuccessfulSubscription() throws Exception {
+		SubscriptionErrorKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory.alternatingSuccessErrorDuringSubscription();
+		FanOutRecordPublisherConfiguration configuration = createConfiguration();
+
+		FullJitterBackoff backoff = mock(FullJitterBackoff.class);
+
+		FanOutRecordPublisher recordPublisher = new FanOutRecordPublisher(latest(), "arn", createDummyStreamShardHandle(), kinesis, configuration, backoff);
+
+		recordPublisher.run(new TestConsumer());
+		recordPublisher.run(new TestConsumer());
+		recordPublisher.run(new TestConsumer());
+
+		// Expecting:
+		// - first attempt to fail, and backoff attempt #1
+		// - second attempt to succeed, and reset attempt index
+		// - third attempt to fail, and backoff attempt #1
+
+		verify(backoff, times(2)).calculateFullJitterBackoff(anyLong(), anyLong(), anyDouble(), eq(1));
+
+		verify(backoff, never()).calculateFullJitterBackoff(anyLong(), anyLong(), anyDouble(), eq(0));
+		verify(backoff, never()).calculateFullJitterBackoff(anyLong(), anyLong(), anyDouble(), eq(2));
+	}
+
+	@Test
+	public void testRecordDurability() throws Exception {
+		SingleShardFanOutKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory
+			.boundedShard()
+			.withBatchCount(10)
+			.withBatchesPerSubscription(3)
+			.withRecordsPerBatch(12)
+			.build();
+
+		RecordPublisher recordPublisher = createRecordPublisher(kinesis);
+		TestConsumer consumer = new TestConsumer();
+
+		int count = 0;
+		while (recordPublisher.run(consumer) == INCOMPLETE) {
+			if (++count > 4) {
+				break;
+			}
+		}
+
+		List<UserRecord> userRecords = flattenToUserRecords(consumer.getRecordBatches());
+
+		// Should have received 10 * 12 = 120 records
+		assertEquals(120, userRecords.size());
+
+		int expectedSequenceNumber = 1;
+		for (UserRecord record : userRecords) {
+			assertEquals(String.valueOf(expectedSequenceNumber++), record.getSequenceNumber());
+		}
+	}
+
+	@Test
+	public void testAggregatedRecordDurability() throws Exception {
+		SingleShardFanOutKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory
+			.boundedShard()
+			.withBatchCount(10)
+			.withAggregationFactor(5)
+			.withRecordsPerBatch(12)
+			.build();
+
+		RecordPublisher recordPublisher = createRecordPublisher(kinesis);
+		TestConsumer consumer = new TestConsumer();
+
+		int count = 0;
+		while (recordPublisher.run(consumer) == INCOMPLETE) {
+			if (++count > 5) {
+				break;
+			}
+		}
+
+		List<UserRecord> userRecords = flattenToUserRecords(consumer.getRecordBatches());
+
+		// Should have received 10 * 12 * 5 = 600 records
+		assertEquals(600, userRecords.size());
+
+		int sequence = 1;
+		long subsequence = 0;
+		for (UserRecord userRecord : userRecords) {
+			assertEquals(String.valueOf(sequence), userRecord.getSequenceNumber());
+			assertEquals(subsequence++, userRecord.getSubSequenceNumber());
+
+			if (subsequence == 5) {
+				sequence++;
+				subsequence = 0;
+			}
+		}
+	}
+
+	private List<UserRecord> flattenToUserRecords(final List<RecordBatch> recordBatch) {
+		return recordBatch
+			.stream()
+			.flatMap(b -> b.getDeaggregatedRecords().stream())
+			.collect(Collectors.toList());
+	}
+
+	private byte[] toByteArray(final ByteBuffer byteBuffer) {
+		byte[] dataBytes = new byte[byteBuffer.remaining()];
+		byteBuffer.get(dataBytes);
+		return dataBytes;
+	}
+
+	private RecordPublisher createRecordPublisher(final KinesisProxyV2Interface kinesis) {
+		return createRecordPublisher(kinesis, latest());
+	}
+
+	private RecordPublisher createRecordPublisher(final KinesisProxyV2Interface kinesis, final StartingPosition startingPosition) {
+		return new FanOutRecordPublisher(startingPosition, "arn", createDummyStreamShardHandle(), kinesis, createConfiguration(), new FullJitterBackoff());
+	}
+
+	private FanOutRecordPublisherConfiguration createConfiguration() {
+		return new FanOutRecordPublisherConfiguration(createEfoProperties(), emptyList());
+	}
+
+	private Properties createEfoProperties() {
+		Properties config = new Properties();
+		config.setProperty(RECORD_PUBLISHER_TYPE, EFO.name());
+		config.setProperty(EFO_CONSUMER_NAME, "dummy-efo-consumer");
+		config.setProperty(SUBSCRIBE_TO_SHARD_BACKOFF_BASE, String.valueOf(EXPECTED_SUBSCRIBE_TO_SHARD_BASE));
+		config.setProperty(SUBSCRIBE_TO_SHARD_BACKOFF_MAX, String.valueOf(EXPECTED_SUBSCRIBE_TO_SHARD_MAX));
+		config.setProperty(SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT, String.valueOf(EXPECTED_SUBSCRIBE_TO_SHARD_POW));
+		return config;
+	}
+
+	private SubscribeToShardEvent createSubscribeToShardEvent(final Record...records) {
+		return SubscribeToShardEvent
+			.builder()
+			.records(records)
+			.build();
+	}
+
+	private StartingPosition latest() {
+		return StartingPosition.continueFromSequenceNumber(SENTINEL_LATEST_SEQUENCE_NUM.get());
+	}
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherTest.java
index 2bbe3da..6b6107e 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherTest.java
@@ -18,23 +18,17 @@
 package org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
 
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
-import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordBatchConsumer;
 import org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
-import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory;
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils.TestConsumer;
 
-import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
 import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
 import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM;
@@ -62,9 +56,9 @@ public class PollingRecordPublisherTest {
 		TestConsumer consumer = new TestConsumer();
 		recordPublisher.run(consumer);
 
-		assertEquals(1, consumer.recordBatches.size());
-		assertEquals(5, consumer.recordBatches.get(0).getDeaggregatedRecordSize());
-		assertEquals(100L, consumer.recordBatches.get(0).getMillisBehindLatest(), 0);
+		assertEquals(1, consumer.getRecordBatches().size());
+		assertEquals(5, consumer.getRecordBatches().get(0).getDeaggregatedRecordSize());
+		assertEquals(100L, consumer.getRecordBatches().get(0).getMillisBehindLatest(), 0);
 	}
 
 	@Test
@@ -146,20 +140,4 @@ public class PollingRecordPublisherTest {
 			500L);
 	}
 
-	private static class TestConsumer implements RecordBatchConsumer {
-		private final List<RecordBatch> recordBatches = new ArrayList<>();
-		private String latestSequenceNumber;
-
-		@Override
-		public SequenceNumber accept(final RecordBatch batch) {
-			recordBatches.add(batch);
-
-			if (batch.getDeaggregatedRecordSize() > 0) {
-				List<UserRecord> records = batch.getDeaggregatedRecords();
-				latestSequenceNumber = records.get(records.size() - 1).getSequenceNumber();
-			}
-
-			return new SequenceNumber(latestSequenceNumber);
-		}
-	}
 }
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Test.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Test.java
new file mode 100644
index 0000000..f7641c3
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Test.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import org.junit.Test;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for {@link KinesisProxyV2}.
+ */
+public class KinesisProxyV2Test {
+
+	@Test
+	public void testSubscribeToShard() {
+		KinesisAsyncClient kinesis = mock(KinesisAsyncClient.class);
+		KinesisProxyV2 proxy = new KinesisProxyV2(kinesis);
+
+		SubscribeToShardRequest request = SubscribeToShardRequest.builder().build();
+		SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
+			.builder()
+			.subscriber(event -> {})
+			.build();
+
+		proxy.subscribeToShard(request, responseHandler);
+
+		verify(kinesis).subscribeToShard(eq(request), eq(responseHandler));
+	}
+
+	@Test
+	public void testCloseInvokesClientClose() {
+		KinesisAsyncClient kinesis = mock(KinesisAsyncClient.class);
+		KinesisProxyV2 proxy = new KinesisProxyV2(kinesis);
+
+		proxy.close();
+
+		verify(kinesis).close();
+	}
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java
new file mode 100644
index 0000000..83e299a
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.testutils;
+
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
+
+import com.amazonaws.kinesis.agg.RecordAggregator;
+import org.apache.commons.lang3.NotImplementedException;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Factory for different kinds of fake Kinesis behaviours using the {@link KinesisProxyV2Interface} interface.
+ */
+public class FakeKinesisFanOutBehavioursFactory {
+
+	public static SingleShardFanOutKinesisV2.Builder boundedShard() {
+		return new SingleShardFanOutKinesisV2.Builder();
+	}
+
+	public static KinesisProxyV2Interface singletonShard(final SubscribeToShardEvent event) {
+		return new SingletonEventFanOutKinesisV2(event);
+	}
+
+	public static SingleShardFanOutKinesisV2 emptyShard() {
+		return new SingleShardFanOutKinesisV2.Builder().withBatchCount(0).build();
+	}
+
+	public static KinesisProxyV2Interface resourceNotFoundWhenObtainingSubscription() {
+		return new ExceptionalKinesisV2(ResourceNotFoundException.builder().build());
+	}
+
+	public static SubscriptionErrorKinesisV2 errorDuringSubscription(final Throwable throwable) {
+		return new SubscriptionErrorKinesisV2(throwable);
+	}
+
+	public static SubscriptionErrorKinesisV2 alternatingSuccessErrorDuringSubscription() {
+		return new AlternatingSubscriptionErrorKinesisV2(LimitExceededException.builder().build());
+	}
+
+	public static AbstractSingleShardFanOutKinesisV2 emptyBatchFollowedBySingleRecord() {
+		return new AbstractSingleShardFanOutKinesisV2(2) {
+			private int subscription = 0;
+
+			@Override
+			void sendEvents(Subscriber<? super SubscribeToShardEventStream> subscriber) {
+				SubscribeToShardEvent.Builder builder = SubscribeToShardEvent
+					.builder()
+					.continuationSequenceNumber(subscription == 0 ? "1" : null);
+
+				if (subscription == 1) {
+					builder.records(createRecord(new AtomicInteger(1)));
+				}
+
+				subscriber.onNext(builder.build());
+				subscription++;
+			}
+		};
+	}
+
+	/**
+	 * An unbounded fake Kinesis that offers subscriptions with 5 records, alternating throwing the given exception.
+	 * The first subscription is exceptional, second successful, and so on.
+	 */
+	private static class AlternatingSubscriptionErrorKinesisV2 extends SubscriptionErrorKinesisV2 {
+
+		int index = 0;
+
+		private AlternatingSubscriptionErrorKinesisV2(final Throwable throwable) {
+			super(throwable);
+		}
+
+		@Override
+		void sendEvents(Subscriber<? super SubscribeToShardEventStream> subscriber) {
+			if (index % 2 == 0) {
+				super.sendEvents(subscriber);
+			} else {
+				super.sendEventBatch(subscriber);
+				subscriber.onComplete();
+			}
+
+			index++;
+		}
+	}
+
+	/**
+	 * A fake Kinesis that throws the given exception after sending 5 records.
+	 * A total of 5 subscriptions can be acquired.
+	 */
+	public static class SubscriptionErrorKinesisV2 extends AbstractSingleShardFanOutKinesisV2 {
+
+		public static final int NUMBER_OF_SUBSCRIPTIONS = 5;
+
+		public static final int NUMBER_OF_EVENTS_PER_SUBSCRIPTION = 5;
+
+		private final Throwable throwable;
+
+		AtomicInteger sequenceNumber = new AtomicInteger();
+
+		private SubscriptionErrorKinesisV2(final Throwable throwable) {
+			super(NUMBER_OF_SUBSCRIPTIONS);
+			this.throwable = throwable;
+		}
+
+		@Override
+		void sendEvents(Subscriber<? super SubscribeToShardEventStream> subscriber) {
+			sendEventBatch(subscriber);
+			subscriber.onError(throwable);
+		}
+
+		void sendEventBatch(Subscriber<? super SubscribeToShardEventStream> subscriber) {
+			for (int i = 0; i < NUMBER_OF_EVENTS_PER_SUBSCRIPTION; i++) {
+				subscriber.onNext(SubscribeToShardEvent
+					.builder()
+					.records(createRecord(sequenceNumber))
+					.continuationSequenceNumber(String.valueOf(i))
+					.build());
+			}
+		}
+	}
+
+	private static class ExceptionalKinesisV2 extends KinesisProxyV2InterfaceAdapter {
+
+		private final RuntimeException exception;
+
+		private ExceptionalKinesisV2(RuntimeException exception) {
+			this.exception = exception;
+		}
+
+		@Override
+		public CompletableFuture<Void> subscribeToShard(SubscribeToShardRequest request, SubscribeToShardResponseHandler responseHandler) {
+			responseHandler.exceptionOccurred(exception);
+			return CompletableFuture.completedFuture(null);
+		}
+	}
+
+	private static class SingletonEventFanOutKinesisV2 extends AbstractSingleShardFanOutKinesisV2 {
+
+		private final SubscribeToShardEvent event;
+
+		private SingletonEventFanOutKinesisV2(SubscribeToShardEvent event) {
+			super(1);
+			this.event = event;
+		}
+
+		@Override
+		void sendEvents(Subscriber<? super SubscribeToShardEventStream> subscriber) {
+			subscriber.onNext(event);
+		}
+	}
+
+	/**
+	 * A fake implementation of KinesisProxyV2 SubscribeToShard that provides dummy records for EFO subscriptions.
+	 * Aggregated and non-aggregated records are supported with various batch and subscription sizes.
+	 */
+	public static class SingleShardFanOutKinesisV2 extends AbstractSingleShardFanOutKinesisV2 {
+
+		private final int batchesPerSubscription;
+
+		private final int recordsPerBatch;
+
+		private final long millisBehindLatest;
+
+		private final int totalRecords;
+
+		private final int aggregationFactor;
+
+		private final AtomicInteger sequenceNumber = new AtomicInteger();
+
+		private SingleShardFanOutKinesisV2(final Builder builder) {
+			super(builder.getSubscriptionCount());
+			this.batchesPerSubscription = builder.batchesPerSubscription;
+			this.recordsPerBatch = builder.recordsPerBatch;
+			this.millisBehindLatest = builder.millisBehindLatest;
+			this.aggregationFactor = builder.aggregationFactor;
+			this.totalRecords = builder.getTotalRecords();
+		}
+
+		@Override
+		void sendEvents(final Subscriber<? super SubscribeToShardEventStream> subscriber) {
+			SubscribeToShardEvent.Builder eventBuilder = SubscribeToShardEvent
+				.builder()
+				.millisBehindLatest(millisBehindLatest);
+
+			for (int batchIndex = 0; batchIndex < batchesPerSubscription && sequenceNumber.get() < totalRecords; batchIndex++) {
+				List<Record> records = new ArrayList<>();
+
+				for (int i = 0; i < recordsPerBatch; i++) {
+					final Record record;
+
+					if (aggregationFactor == 1) {
+						record = createRecord(sequenceNumber);
+					} else {
+						record = createAggregatedRecord(aggregationFactor, sequenceNumber);
+					}
+
+					records.add(record);
+				}
+
+				eventBuilder.records(records);
+
+				String continuation = sequenceNumber.get() < totalRecords ? String.valueOf(sequenceNumber.get() + 1) : null;
+				eventBuilder.continuationSequenceNumber(continuation);
+
+				subscriber.onNext(eventBuilder.build());
+			}
+		}
+
+		/**
+		 * A convenience builder for {@link SingleShardFanOutKinesisV2}.
+		 */
+		public static class Builder {
+			private int batchesPerSubscription = 100000;
+			private int recordsPerBatch = 10;
+			private long millisBehindLatest = 0;
+			private int batchCount = 1;
+			private int aggregationFactor = 1;
+
+			public int getSubscriptionCount() {
+				return (int) Math.ceil((double) getTotalRecords() / batchesPerSubscription / recordsPerBatch);
+			}
+
+			public int getTotalRecords() {
+				return batchCount * recordsPerBatch;
+			}
+
+			public Builder withBatchesPerSubscription(final int batchesPerSubscription) {
+				this.batchesPerSubscription = batchesPerSubscription;
+				return this;
+			}
+
+			public Builder withRecordsPerBatch(final int recordsPerBatch) {
+				this.recordsPerBatch = recordsPerBatch;
+				return this;
+			}
+
+			public Builder withBatchCount(final int batchCount) {
+				this.batchCount = batchCount;
+				return this;
+			}
+
+			public Builder withMillisBehindLatest(final long millisBehindLatest) {
+				this.millisBehindLatest = millisBehindLatest;
+				return this;
+			}
+
+			public Builder withAggregationFactor(final int aggregationFactor) {
+				this.aggregationFactor = aggregationFactor;
+				return this;
+			}
+
+			public SingleShardFanOutKinesisV2 build() {
+				return new SingleShardFanOutKinesisV2(this);
+			}
+		}
+	}
+
+	/**
+	 * A single shard dummy EFO implementation that provides basic responses and subscription management.
+	 * Does not provide any records.
+	 */
+	public abstract static class AbstractSingleShardFanOutKinesisV2 extends KinesisProxyV2InterfaceAdapter {
+
+		private final List<SubscribeToShardRequest> requests = new ArrayList<>();
+
+		private int remainingSubscriptions;
+
+		private AbstractSingleShardFanOutKinesisV2(final int remainingSubscriptions) {
+			this.remainingSubscriptions = remainingSubscriptions;
+		}
+
+		public int getNumberOfSubscribeToShardInvocations() {
+			return requests.size();
+		}
+
+		public StartingPosition getStartingPositionForSubscription(final int subscriptionIndex) {
+			assertTrue(subscriptionIndex >= 0);
+			assertTrue(subscriptionIndex < getNumberOfSubscribeToShardInvocations());
+
+			return requests.get(subscriptionIndex).startingPosition();
+		}
+
+		@Override
+		public CompletableFuture<Void> subscribeToShard(
+			final SubscribeToShardRequest request,
+			final SubscribeToShardResponseHandler responseHandler) {
+
+			requests.add(request);
+
+			return CompletableFuture.supplyAsync(() -> {
+				responseHandler.responseReceived(SubscribeToShardResponse.builder().build());
+
+				responseHandler.onEventStream(subscriber -> {
+					subscriber.onSubscribe(mock(Subscription.class));
+
+					if (remainingSubscriptions > 0) {
+						sendEvents(subscriber);
+						remainingSubscriptions--;
+					} else {
+						SubscribeToShardEvent.Builder eventBuilder = SubscribeToShardEvent
+							.builder()
+							.millisBehindLatest(0L)
+							.continuationSequenceNumber(null);
+
+						subscriber.onNext(eventBuilder.build());
+					}
+
+					subscriber.onComplete();
+				});
+
+				return null;
+			});
+		}
+
+		abstract void sendEvents(final Subscriber<? super SubscribeToShardEventStream> subscriber);
+
+	}
+
+	private static class KinesisProxyV2InterfaceAdapter implements KinesisProxyV2Interface {
+
+		@Override
+		public CompletableFuture<Void> subscribeToShard(SubscribeToShardRequest request, SubscribeToShardResponseHandler responseHandler) {
+			throw new NotImplementedException("This method is not implemented.");
+		}
+	}
+
+	private static Record createRecord(final AtomicInteger sequenceNumber) {
+		return createRecord(randomAlphabetic(32).getBytes(UTF_8), sequenceNumber);
+	}
+
+	private static Record createRecord(final byte[] data, final AtomicInteger sequenceNumber) {
+		return Record
+			.builder()
+			.approximateArrivalTimestamp(Instant.now())
+			.data(SdkBytes.fromByteArray(data))
+			.sequenceNumber(String.valueOf(sequenceNumber.incrementAndGet()))
+			.partitionKey("pk")
+			.build();
+	}
+
+	private static Record createAggregatedRecord(final int aggregationFactor, final AtomicInteger sequenceNumber) {
+		RecordAggregator recordAggregator = new RecordAggregator();
+
+		for (int i = 0; i < aggregationFactor; i++) {
+			try {
+				recordAggregator.addUserRecord("pk", randomAlphabetic(32).getBytes(UTF_8));
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+		}
+
+		return createRecord(recordAggregator.clearAndGet().toRecordBytes(), sequenceNumber);
+	}
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
index 7c5c786..abb186b 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
@@ -19,10 +19,14 @@ package org.apache.flink.streaming.connectors.kinesis.testutils;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 
 import com.amazonaws.kinesis.agg.AggRecord;
 import com.amazonaws.kinesis.agg.RecordAggregator;
+import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
 import com.amazonaws.services.kinesis.model.HashKeyRange;
 import com.amazonaws.services.kinesis.model.Record;
 import com.amazonaws.services.kinesis.model.SequenceNumberRange;
@@ -39,6 +43,12 @@ import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType.NONE;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_REGISTRATION_TYPE;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RECORD_PUBLISHER_TYPE;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType.EFO;
+
 /**
  * General test utils.
  */
@@ -116,4 +126,35 @@ public class TestUtils {
 		return new StreamShardHandle(streamName, shard);
 	}
 
+	public static Properties efoProperties() {
+		Properties consumerConfig = new Properties();
+		consumerConfig.setProperty(RECORD_PUBLISHER_TYPE, EFO.name());
+		consumerConfig.setProperty(EFO_REGISTRATION_TYPE, NONE.name());
+		consumerConfig.setProperty(EFO_CONSUMER_ARN_PREFIX + "." + "fakeStream", "stream-consumer-arn");
+		return consumerConfig;
+	}
+
+	/**
+	 * A test record consumer used to capture messages from kinesis.
+	 */
+	public static class TestConsumer implements RecordPublisher.RecordBatchConsumer {
+		private final List<RecordBatch> recordBatches = new ArrayList<>();
+		private String latestSequenceNumber;
+
+		@Override
+		public SequenceNumber accept(final RecordBatch batch) {
+			recordBatches.add(batch);
+
+			if (batch.getDeaggregatedRecordSize() > 0) {
+				List<UserRecord> records = batch.getDeaggregatedRecords();
+				latestSequenceNumber = records.get(records.size() - 1).getSequenceNumber();
+			}
+
+			return new SequenceNumber(latestSequenceNumber);
+		}
+
+		public List<RecordBatch> getRecordBatches() {
+			return recordBatches;
+		}
+	}
 }
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
index 3bb11bd..cd32361 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetche
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 
 import org.mockito.Mockito;
@@ -48,13 +49,39 @@ import static org.mockito.Mockito.when;
  */
 public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
 
-	private OneShotLatch runWaiter;
-	private OneShotLatch initialDiscoveryWaiter;
-	private OneShotLatch shutdownWaiter;
+	private final OneShotLatch runWaiter;
+	private final OneShotLatch initialDiscoveryWaiter;
+	private final OneShotLatch shutdownWaiter;
 
 	private volatile boolean running;
 
 	public TestableKinesisDataFetcher(
+		List<String> fakeStreams,
+		SourceFunction.SourceContext<T> sourceContext,
+		Properties fakeConfiguration,
+		KinesisDeserializationSchema<T> deserializationSchema,
+		int fakeTotalCountOfSubtasks,
+		int fakeIndexOfThisSubtask,
+		AtomicReference<Throwable> thrownErrorUnderTest,
+		LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest,
+		HashMap<String, String> subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
+		KinesisProxyInterface fakeKinesis) {
+
+		this(
+			fakeStreams,
+			sourceContext,
+			fakeConfiguration,
+			deserializationSchema,
+			fakeTotalCountOfSubtasks,
+			fakeIndexOfThisSubtask,
+			thrownErrorUnderTest,
+			subscribedShardsStateUnderTest,
+			subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
+			fakeKinesis,
+			null);
+	}
+
+	public TestableKinesisDataFetcher(
 			List<String> fakeStreams,
 			SourceFunction.SourceContext<T> sourceContext,
 			Properties fakeConfiguration,
@@ -64,7 +91,8 @@ public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
 			AtomicReference<Throwable> thrownErrorUnderTest,
 			LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest,
 			HashMap<String, String> subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
-			KinesisProxyInterface fakeKinesis) {
+			KinesisProxyInterface fakeKinesis,
+			KinesisProxyV2Interface fakeKinesisV2) {
 		super(
 			fakeStreams,
 			sourceContext,
@@ -78,7 +106,8 @@ public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
 			thrownErrorUnderTest,
 			subscribedShardsStateUnderTest,
 			subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
-			(properties) -> fakeKinesis);
+			properties -> fakeKinesis,
+			properties -> fakeKinesisV2);
 
 		this.runWaiter = new OneShotLatch();
 		this.initialDiscoveryWaiter = new OneShotLatch();
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java
index 74bb0c5..a45a463 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.testutils;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisherFactory;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
@@ -55,7 +56,8 @@ public class TestableKinesisDataFetcherForShardConsumerException<T> extends Test
 			final AtomicReference<Throwable> thrownErrorUnderTest,
 			final LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest,
 			final HashMap<String, String> subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
-			final KinesisProxyInterface fakeKinesis) {
+			final KinesisProxyInterface fakeKinesis,
+			final RecordPublisherFactory recordPublisherFactory) {
 		super(fakeStreams, sourceContext, fakeConfiguration, deserializationSchema, fakeTotalCountOfSubtasks,
 			fakeIndexOfThisSubtask, thrownErrorUnderTest, subscribedShardsStateUnderTest,
 			subscribedStreamsToLastDiscoveredShardIdsStateUnderTest, fakeKinesis);
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtilTest.java
index e66ff51..cdb871b 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtilTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtilTest.java
@@ -40,7 +40,6 @@ import java.util.Date;
 import java.util.Properties;
 
 import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
-import static com.amazonaws.services.kinesis.model.ShardIteratorType.LATEST;
 import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
 import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider.ASSUME_ROLE;
 import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider.AUTO;
@@ -52,7 +51,7 @@ import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequen
 import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -61,6 +60,7 @@ import static org.junit.Assert.assertTrue;
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(AWSUtil.class)
 public class AWSUtilTest {
+
 	@Rule
 	private final ExpectedException exception = ExpectedException.none();
 
@@ -214,8 +214,8 @@ public class AWSUtilTest {
 	public void testGetStartingPositionForLatest() {
 		StartingPosition position = AWSUtil.getStartingPosition(SENTINEL_LATEST_SEQUENCE_NUM.get(), new Properties());
 
-		assertEquals(LATEST, position.getShardIteratorType());
-		assertNull(position.getStartingMarker());
+		assertEquals(AT_TIMESTAMP, position.getShardIteratorType());
+		assertNotNull(position.getStartingMarker());
 	}
 
 	@Test
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
index 0862ec2..4d96ab8 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
@@ -33,6 +33,7 @@ import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsPr
 import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
 import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
 import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.Http2Configuration;
 import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
@@ -48,6 +49,8 @@ import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigCons
 import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.roleArn;
 import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.roleSessionName;
 import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.webIdentityTokenFile;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_EFO_HTTP_CLIENT_MAX_CONURRENCY;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_HTTP_CLIENT_MAX_CONCURRENCY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
@@ -59,6 +62,7 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static software.amazon.awssdk.http.Protocol.HTTP2;
 
 /**
  * Tests for {@link AwsV2Util}.
@@ -227,39 +231,40 @@ public class AwsV2UtilTest {
 		ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig();
 		NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder();
 
-		AwsV2Util.createHttpClient(clientConfiguration, builder);
+		AwsV2Util.createHttpClient(clientConfiguration, builder, new Properties());
 
 		verify(builder).build();
-		verify(builder).maxConcurrency(50);
+		verify(builder).maxConcurrency(DEFAULT_EFO_HTTP_CLIENT_MAX_CONURRENCY);
 		verify(builder).connectionTimeout(Duration.ofSeconds(10));
 		verify(builder).writeTimeout(Duration.ofSeconds(50));
 		verify(builder).connectionMaxIdleTime(Duration.ofMinutes(1));
 		verify(builder).useIdleConnectionReaper(true);
+		verify(builder).protocol(HTTP2);
 		verify(builder, never()).connectionTimeToLive(any());
 	}
 
 	@Test
-	public void testCreateNettyHttpClientMaxConcurrency() {
+	public void testCreateNettyHttpClientConnectionTimeout() {
 		ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig();
-		clientConfiguration.setMaxConnections(100);
+		clientConfiguration.setConnectionTimeout(1000);
 
 		NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder();
 
-		AwsV2Util.createHttpClient(clientConfiguration, builder);
+		AwsV2Util.createHttpClient(clientConfiguration, builder, new Properties());
 
-		verify(builder).maxConcurrency(100);
+		verify(builder).connectionTimeout(Duration.ofSeconds(1));
 	}
 
 	@Test
-	public void testCreateNettyHttpClientConnectionTimeout() {
-		ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig();
-		clientConfiguration.setConnectionTimeout(1000);
+	public void testCreateNettyHttpClientMaxConcurrency() {
+		Properties clientConfiguration = new Properties();
+		clientConfiguration.setProperty(EFO_HTTP_CLIENT_MAX_CONCURRENCY, "123");
 
 		NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder();
 
-		AwsV2Util.createHttpClient(clientConfiguration, builder);
+		AwsV2Util.createHttpClient(new ClientConfigurationFactory().getConfig(), builder, clientConfiguration);
 
-		verify(builder).connectionTimeout(Duration.ofSeconds(1));
+		verify(builder).maxConcurrency(123);
 	}
 
 	@Test
@@ -269,7 +274,7 @@ public class AwsV2UtilTest {
 
 		NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder();
 
-		AwsV2Util.createHttpClient(clientConfiguration, builder);
+		AwsV2Util.createHttpClient(clientConfiguration, builder, new Properties());
 
 		verify(builder).writeTimeout(Duration.ofSeconds(3));
 	}
@@ -281,7 +286,7 @@ public class AwsV2UtilTest {
 
 		NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder();
 
-		AwsV2Util.createHttpClient(clientConfiguration, builder);
+		AwsV2Util.createHttpClient(clientConfiguration, builder, new Properties());
 
 		verify(builder).connectionMaxIdleTime(Duration.ofSeconds(2));
 	}
@@ -293,7 +298,7 @@ public class AwsV2UtilTest {
 
 		NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder();
 
-		AwsV2Util.createHttpClient(clientConfiguration, builder);
+		AwsV2Util.createHttpClient(clientConfiguration, builder, new Properties());
 
 		verify(builder).useIdleConnectionReaper(false);
 	}
@@ -305,7 +310,7 @@ public class AwsV2UtilTest {
 
 		NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder();
 
-		AwsV2Util.createHttpClient(clientConfiguration, builder);
+		AwsV2Util.createHttpClient(clientConfiguration, builder, new Properties());
 
 		verify(builder).connectionTimeToLive(Duration.ofSeconds(5));
 	}
@@ -383,6 +388,9 @@ public class AwsV2UtilTest {
 		when(builder.writeTimeout(any())).thenReturn(builder);
 		when(builder.connectionMaxIdleTime(any())).thenReturn(builder);
 		when(builder.useIdleConnectionReaper(anyBoolean())).thenReturn(builder);
+		when(builder.connectionAcquisitionTimeout(any())).thenReturn(builder);
+		when(builder.protocol(any())).thenReturn(builder);
+		when(builder.http2Configuration(any(Http2Configuration.class))).thenReturn(builder);
 
 		return builder;
 	}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
index bb0f6a1..ee32b6b 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
@@ -38,6 +38,7 @@ import java.util.Properties;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_HTTP_CLIENT_MAX_CONCURRENCY;
 import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP;
 import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT;
 import static org.junit.Assert.assertEquals;
@@ -50,7 +51,7 @@ import static org.junit.Assert.fail;
 public class KinesisConfigUtilTest {
 
 	@Rule
-	private ExpectedException exception = ExpectedException.none();
+	public ExpectedException exception = ExpectedException.none();
 
 	// ----------------------------------------------------------------------
 	// getValidatedProducerConfiguration() tests
@@ -214,6 +215,7 @@ public class KinesisConfigUtilTest {
 	// ----------------------------------------------------------------------
 	// validateEfoConfiguration() tests
 	// ----------------------------------------------------------------------
+
 	@Test
 	public void testNoEfoRegistrationTypeInConfig() {
 		Properties testConfig = TestUtils.getStandardProperties();
@@ -282,6 +284,37 @@ public class KinesisConfigUtilTest {
 		List<String> streams = Arrays.asList("stream1", "stream2");
 		KinesisConfigUtil.validateEfoConfiguration(testConfig, streams);
 	}
+
+	@Test
+	public void testValidateEfoMaxConcurrency() {
+		Properties testConfig = TestUtils.getStandardProperties();
+		testConfig.setProperty(EFO_HTTP_CLIENT_MAX_CONCURRENCY, "55");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testValidateEfoMaxConcurrencyNonNumeric() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for EFO HTTP client max concurrency. Must be positive.");
+
+		Properties testConfig = TestUtils.getStandardProperties();
+		testConfig.setProperty(EFO_HTTP_CLIENT_MAX_CONCURRENCY, "abc");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testValidateEfoMaxConcurrencyNegative() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for EFO HTTP client max concurrency. Must be positive.");
+
+		Properties testConfig = TestUtils.getStandardProperties();
+		testConfig.setProperty(EFO_HTTP_CLIENT_MAX_CONCURRENCY, "-1");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
 	// ----------------------------------------------------------------------
 	// validateConsumerConfiguration() tests
 	// ----------------------------------------------------------------------