You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/08/24 06:39:48 UTC

[flink] branch master updated: [FLINK-18512][kinesis] Introducing RecordPublisher. Refactor ShardConsumer to use PollingRecordPublisher

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5487fcf  [FLINK-18512][kinesis] Introducing RecordPublisher. Refactor ShardConsumer to use PollingRecordPublisher
5487fcf is described below

commit 5487fcf2c3451e8ec4a21160f370226a2f4dc1f5
Author: Danny Cranmer <cr...@amazon.com>
AuthorDate: Fri Jul 10 22:12:23 2020 +0100

    [FLINK-18512][kinesis] Introducing RecordPublisher. Refactor ShardConsumer to use PollingRecordPublisher
    
    This closes #12881.
---
 .../internals/DynamoDBStreamsDataFetcher.java      |  15 +-
 .../kinesis/internals/KinesisDataFetcher.java      |  62 ++--
 .../kinesis/internals/ShardConsumer.java           | 358 ++++-----------------
 .../kinesis/internals/publisher/RecordBatch.java   |  90 ++++++
 .../internals/publisher/RecordPublisher.java       |  57 ++++
 .../publisher/RecordPublisherFactory.java          |  48 +++
 .../polling/AdaptivePollingRecordPublisher.java    | 130 ++++++++
 .../publisher/polling/PollingRecordPublisher.java  | 168 ++++++++++
 .../PollingRecordPublisherConfiguration.java       |  62 ++++
 .../polling/PollingRecordPublisherFactory.java     |  88 +++++
 ... => PollingRecordPublisherMetricsReporter.java} |  47 +--
 ...rter.java => ShardConsumerMetricsReporter.java} |  55 +---
 .../connectors/kinesis/model/StartingPosition.java | 109 +++++++
 .../connectors/kinesis/proxy/KinesisProxy.java     |  36 +--
 .../streaming/connectors/kinesis/util/AWSUtil.java |  24 ++
 .../connectors/kinesis/util/KinesisConfigUtil.java |  32 +-
 .../kinesis/internals/KinesisDataFetcherTest.java  |   2 +-
 .../kinesis/internals/ShardConsumerTest.java       |  55 ++--
 .../internals/publisher/RecordBatchTest.java       |  94 ++++++
 .../PollingRecordPublisherConfigurationTest.java   |  74 +++++
 .../polling/PollingRecordPublisherFactoryTest.java |  69 ++++
 .../polling/PollingRecordPublisherTest.java        | 165 ++++++++++
 .../PollingRecordPublisherMetricsReporterTest.java |  72 +++++
 .../metrics/ShardConsumerMetricsReporterTest.java  |  69 ++++
 .../kinesis/model/StartingPositionTest.java        |  96 ++++++
 .../testutils/FakeKinesisBehavioursFactory.java    |  21 +-
 .../connectors/kinesis/testutils/TestUtils.java    |  23 ++
 .../connectors/kinesis/util/AWSUtilTest.java       |  33 ++
 .../kinesis/util/KinesisConfigUtilTest.java        |  54 ++++
 29 files changed, 1746 insertions(+), 462 deletions(-)

diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java
index e33a0c8..afa1c28 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java
@@ -18,9 +18,10 @@
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
-import org.apache.flink.streaming.connectors.kinesis.metrics.ShardMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.metrics.ShardConsumerMetricsReporter;
 import org.apache.flink.streaming.connectors.kinesis.model.DynamoDBStreamsShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
@@ -89,7 +90,7 @@ public class DynamoDBStreamsDataFetcher<T> extends KinesisDataFetcher<T> {
 	 * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to
 	 * @param handle stream handle
 	 * @param lastSeqNum last sequence number
-	 * @param shardMetricsReporter the reporter to report metrics to
+	 * @param metricGroup the metric group to report metrics to
 	 * @return
 	 */
 	@Override
@@ -97,16 +98,16 @@ public class DynamoDBStreamsDataFetcher<T> extends KinesisDataFetcher<T> {
 		Integer subscribedShardStateIndex,
 		StreamShardHandle handle,
 		SequenceNumber lastSeqNum,
-		ShardMetricsReporter shardMetricsReporter,
-		KinesisDeserializationSchema<T> shardDeserializer) {
+		MetricGroup metricGroup,
+		KinesisDeserializationSchema<T> shardDeserializer) throws InterruptedException {
 
-		return new ShardConsumer(
+		return new ShardConsumer<T>(
 			this,
+			createRecordPublisher(lastSeqNum, getConsumerConfiguration(), metricGroup, handle),
 			subscribedShardStateIndex,
 			handle,
 			lastSeqNum,
-			DynamoDBStreamsProxy.create(getConsumerConfiguration()),
-			shardMetricsReporter,
+			new ShardConsumerMetricsReporter(metricGroup),
 			shardDeserializer);
 	}
 }
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 59e2cb1..2be9b1c 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -27,17 +27,22 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisherFactory;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisherFactory;
 import org.apache.flink.streaming.connectors.kinesis.metrics.KinesisConsumerMetricConstants;
-import org.apache.flink.streaming.connectors.kinesis.metrics.ShardMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.metrics.ShardConsumerMetricsReporter;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
 import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
 import org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter;
 import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
 import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
@@ -181,6 +186,9 @@ public class KinesisDataFetcher<T> {
 	/** The Kinesis proxy that the fetcher will be using to discover new shards. */
 	private final KinesisProxyInterface kinesis;
 
+	/** The factory used to create record publishers that consumer from Kinesis shards. */
+	private final RecordPublisherFactory recordPublisherFactory;
+
 	/** Thread that executed runFetcher(). */
 	private volatile Thread mainThread;
 
@@ -360,6 +368,7 @@ public class KinesisDataFetcher<T> {
 		this.watermarkTracker = watermarkTracker;
 		this.kinesisProxyFactory = checkNotNull(kinesisProxyFactory);
 		this.kinesis = kinesisProxyFactory.create(configProps);
+		this.recordPublisherFactory = createRecordPublisherFactory();
 
 		this.consumerMetricGroup = runtimeContext.getMetricGroup()
 			.addGroup(KinesisConsumerMetricConstants.KINESIS_CONSUMER_METRICS_GROUP);
@@ -389,25 +398,40 @@ public class KinesisDataFetcher<T> {
 	 * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to
 	 * @param subscribedShard the shard this consumer is subscribed to
 	 * @param lastSequenceNum the sequence number in the shard to start consuming
-	 * @param shardMetricsReporter the reporter to report metrics to
+	 * @param metricGroup the metric group to report metrics to
 	 * @return shard consumer
 	 */
 	protected ShardConsumer<T> createShardConsumer(
 		Integer subscribedShardStateIndex,
 		StreamShardHandle subscribedShard,
 		SequenceNumber lastSequenceNum,
-		ShardMetricsReporter shardMetricsReporter,
-		KinesisDeserializationSchema<T> shardDeserializer) {
+		MetricGroup metricGroup,
+		KinesisDeserializationSchema<T> shardDeserializer) throws InterruptedException {
+
 		return new ShardConsumer<>(
 			this,
+			createRecordPublisher(lastSequenceNum, configProps, metricGroup, subscribedShard),
 			subscribedShardStateIndex,
 			subscribedShard,
 			lastSequenceNum,
-			this.kinesisProxyFactory.create(configProps),
-			shardMetricsReporter,
+			new ShardConsumerMetricsReporter(metricGroup),
 			shardDeserializer);
 	}
 
+	private RecordPublisherFactory createRecordPublisherFactory() {
+		return new PollingRecordPublisherFactory(kinesisProxyFactory);
+	}
+
+	protected RecordPublisher createRecordPublisher(
+			final SequenceNumber sequenceNumber,
+			final Properties configProps,
+			final MetricGroup metricGroup,
+			final StreamShardHandle subscribedShard) throws InterruptedException {
+
+		StartingPosition startingPosition = AWSUtil.getStartingPosition(sequenceNumber, configProps);
+		return recordPublisherFactory.create(startingPosition, configProps, metricGroup, subscribedShard);
+	}
+
 	/**
 	 * Starts the fetcher. After starting the fetcher, it can only
 	 * be stopped by calling {@link KinesisDataFetcher#shutdownFetcher()}.
@@ -475,7 +499,7 @@ public class KinesisDataFetcher<T> {
 						seededStateIndex,
 						streamShardHandle,
 						subscribedShardsState.get(seededStateIndex).getLastProcessedSequenceNum(),
-						registerShardMetrics(consumerMetricGroup, subscribedShardsState.get(seededStateIndex)),
+						registerShardMetricGroup(consumerMetricGroup, subscribedShardsState.get(seededStateIndex)),
 						shardDeserializationSchema));
 			}
 		}
@@ -576,7 +600,7 @@ public class KinesisDataFetcher<T> {
 						newStateIndex,
 						newShardState.getStreamShardHandle(),
 						newShardState.getLastProcessedSequenceNum(),
-						registerShardMetrics(consumerMetricGroup, newShardState),
+						registerShardMetricGroup(consumerMetricGroup, newShardState),
 						shardDeserializationSchema));
 			}
 
@@ -1057,29 +1081,16 @@ public class KinesisDataFetcher<T> {
 	/**
 	 * Registers a metric group associated with the shard id of the provided {@link KinesisStreamShardState shardState}.
 	 *
-	 * @return a {@link ShardMetricsReporter} that can be used to update metric values
+	 * @return a {@link MetricGroup} that can be used to update metric values
 	 */
-	private static ShardMetricsReporter registerShardMetrics(MetricGroup metricGroup, KinesisStreamShardState shardState) {
-		ShardMetricsReporter shardMetrics = new ShardMetricsReporter();
-
-		MetricGroup streamShardMetricGroup = metricGroup
+	private MetricGroup registerShardMetricGroup(final MetricGroup metricGroup, final KinesisStreamShardState shardState) {
+		return metricGroup
 			.addGroup(
 				KinesisConsumerMetricConstants.STREAM_METRICS_GROUP,
 				shardState.getStreamShardHandle().getStreamName())
 			.addGroup(
 				KinesisConsumerMetricConstants.SHARD_METRICS_GROUP,
 				shardState.getStreamShardHandle().getShard().getShardId());
-
-		streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.MILLIS_BEHIND_LATEST_GAUGE, shardMetrics::getMillisBehindLatest);
-		streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.MAX_RECORDS_PER_FETCH, shardMetrics::getMaxNumberOfRecordsPerFetch);
-		streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.NUM_AGGREGATED_RECORDS_PER_FETCH, shardMetrics::getNumberOfAggregatedRecords);
-		streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.NUM_DEAGGREGATED_RECORDS_PER_FETCH, shardMetrics::getNumberOfDeaggregatedRecords);
-		streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.AVG_RECORD_SIZE_BYTES, shardMetrics::getAverageRecordSizeBytes);
-		streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.BYTES_PER_READ, shardMetrics::getBytesPerRead);
-		streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.RUNTIME_LOOP_NANOS, shardMetrics::getRunLoopTimeNanos);
-		streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.LOOP_FREQUENCY_HZ, shardMetrics::getLoopFrequencyHz);
-		streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.SLEEP_TIME_MILLIS, shardMetrics::getSleepTimeMillis);
-		return shardMetrics;
 	}
 
 	// ------------------------------------------------------------------------
@@ -1102,9 +1113,10 @@ public class KinesisDataFetcher<T> {
 	@VisibleForTesting
 	protected ExecutorService createShardConsumersThreadPool(final String subtaskName) {
 		return Executors.newCachedThreadPool(new ThreadFactory() {
+			private final AtomicLong threadCount = new AtomicLong(0);
+
 			@Override
 			public Thread newThread(Runnable runnable) {
-				final AtomicLong threadCount = new AtomicLong(0);
 				Thread thread = new Thread(runnable);
 				thread.setName("shardConsumers-" + subtaskName + "-thread-" + threadCount.getAndIncrement());
 				thread.setDaemon(true);
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 8e9c88b..d9c0d9d 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -19,256 +19,108 @@ package org.apache.flink.streaming.connectors.kinesis.internals;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.metrics.ShardMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult;
+import org.apache.flink.streaming.connectors.kinesis.metrics.ShardConsumerMetricsReporter;
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
-import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 
 import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
-import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
-import com.amazonaws.services.kinesis.model.GetRecordsResult;
-import com.amazonaws.services.kinesis.model.Record;
-import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.math.BigInteger;
 import java.nio.ByteBuffer;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.List;
-import java.util.Properties;
 
-import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.isSentinelSequenceNumber;
+import static java.util.Optional.ofNullable;
+import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Thread that does the actual data pulling from AWS Kinesis shards. Each thread is in charge of one Kinesis shard only.
+ * Thread that subscribes to the given {@link RecordPublisher}. Each thread is in charge of one Kinesis shard only.
+ * <p>
+ * 	A {@link ShardConsumer} is responsible for:
+ * 	<ul>
+ * 	    <li>Running the {@link RecordPublisher} to consume all records from the subscribed shard</li>
+ * 	    <li>Deserializing and deaggregating incoming records from Kinesis</li>
+ * 	    <li>Logging metrics</li>
+ * 	    <li>Passing the records up to the {@link KinesisDataFetcher}</li>
+ * 	</ul>
+ * </p>
  */
 @Internal
 public class ShardConsumer<T> implements Runnable {
-
-	private static final Logger LOG = LoggerFactory.getLogger(ShardConsumer.class);
-
-	// AWS Kinesis has a read limit of 2 Mb/sec
-	// https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html
-	private static final long KINESIS_SHARD_BYTES_PER_SECOND_LIMIT = 2 * 1024L * 1024L;
-
 	private final KinesisDeserializationSchema<T> deserializer;
 
-	private final KinesisProxyInterface kinesis;
-
 	private final int subscribedShardStateIndex;
 
 	private final KinesisDataFetcher<T> fetcherRef;
 
 	private final StreamShardHandle subscribedShard;
 
-	private int maxNumberOfRecordsPerFetch;
-	private final long fetchIntervalMillis;
-	private final boolean useAdaptiveReads;
-
-	private final ShardMetricsReporter shardMetricsReporter;
+	private final ShardConsumerMetricsReporter shardConsumerMetricsReporter;
 
 	private SequenceNumber lastSequenceNum;
 
-	private Date initTimestamp;
+	private final RecordPublisher recordPublisher;
 
 	/**
 	 * Creates a shard consumer.
 	 *
 	 * @param fetcherRef reference to the owning fetcher
+	 * @param recordPublisher the record publisher used to read records from kinesis
 	 * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to
 	 * @param subscribedShard the shard this consumer is subscribed to
 	 * @param lastSequenceNum the sequence number in the shard to start consuming
-	 * @param kinesis the proxy instance to interact with Kinesis
-	 * @param shardMetricsReporter the reporter to report metrics to
+	 * @param shardConsumerMetricsReporter the reporter to report metrics to
+	 * @param shardDeserializer used to deserialize incoming records
 	 */
 	public ShardConsumer(KinesisDataFetcher<T> fetcherRef,
+						RecordPublisher recordPublisher,
 						Integer subscribedShardStateIndex,
 						StreamShardHandle subscribedShard,
 						SequenceNumber lastSequenceNum,
-						KinesisProxyInterface kinesis,
-						ShardMetricsReporter shardMetricsReporter,
+						ShardConsumerMetricsReporter shardConsumerMetricsReporter,
 						KinesisDeserializationSchema<T> shardDeserializer) {
 		this.fetcherRef = checkNotNull(fetcherRef);
+		this.recordPublisher = checkNotNull(recordPublisher);
 		this.subscribedShardStateIndex = checkNotNull(subscribedShardStateIndex);
 		this.subscribedShard = checkNotNull(subscribedShard);
+		this.shardConsumerMetricsReporter = checkNotNull(shardConsumerMetricsReporter);
 		this.lastSequenceNum = checkNotNull(lastSequenceNum);
 
-		this.shardMetricsReporter = checkNotNull(shardMetricsReporter);
-
 		checkArgument(
 			!lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()),
 			"Should not start a ShardConsumer if the shard has already been completely read.");
 
 		this.deserializer = shardDeserializer;
-
-		Properties consumerConfig = fetcherRef.getConsumerConfiguration();
-		this.kinesis = kinesis;
-		this.maxNumberOfRecordsPerFetch = Integer.valueOf(consumerConfig.getProperty(
-			ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
-			Integer.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX)));
-		this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty(
-			ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
-			Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
-		this.useAdaptiveReads = Boolean.valueOf(consumerConfig.getProperty(
-			ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS,
-			Boolean.toString(ConsumerConfigConstants.DEFAULT_SHARD_USE_ADAPTIVE_READS)));
-
-		if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
-			String timestamp = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP);
-
-			try {
-				String format = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT,
-					ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT);
-				SimpleDateFormat customDateFormat = new SimpleDateFormat(format);
-				this.initTimestamp = customDateFormat.parse(timestamp);
-			} catch (IllegalArgumentException | NullPointerException exception) {
-				throw new IllegalArgumentException(exception);
-			} catch (ParseException exception) {
-				this.initTimestamp = new Date((long) (Double.parseDouble(timestamp) * 1000));
-			}
-		} else {
-			this.initTimestamp = null;
-		}
-	}
-
-	/**
-	 * Returns a shard iterator for the given {@link SequenceNumber}.
-	 *
-	 * @return shard iterator
-	 * @throws Exception
-	 */
-	protected String getShardIterator(SequenceNumber sequenceNumber) throws Exception {
-
-		if (isSentinelSequenceNumber(sequenceNumber)) {
-			return getShardIteratorForSentinel(sequenceNumber);
-		} else {
-			// we will be starting from an actual sequence number (due to restore from failure).
-			return getShardIteratorForRealSequenceNumber(sequenceNumber);
-		}
-	}
-
-	protected String getShardIteratorForSentinel(SequenceNumber sentinelSequenceNumber) throws InterruptedException {
-		String nextShardItr;
-
-		if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get())) {
-			// if the shard is already closed, there will be no latest next record to get for this shard
-			if (subscribedShard.isClosed()) {
-				nextShardItr = null;
-			} else {
-				nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null);
-			}
-		} else if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())) {
-			nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null);
-		} else if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
-			nextShardItr = null;
-		} else if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
-			nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_TIMESTAMP.toString(), initTimestamp);
-		} else {
-			throw new RuntimeException("Unknown sentinel type: " + sentinelSequenceNumber);
-		}
-
-		return nextShardItr;
-	}
-
-	protected String getShardIteratorForRealSequenceNumber(SequenceNumber sequenceNumber)
-			throws Exception {
-
-		// if the last sequence number refers to an aggregated record, we need to clean up any dangling sub-records
-		// from the last aggregated record; otherwise, we can simply start iterating from the record right after.
-
-		if (sequenceNumber.isAggregated()) {
-			return getShardIteratorForAggregatedSequenceNumber(sequenceNumber);
-		} else {
-			// the last record was non-aggregated, so we can simply start from the next record
-			return kinesis.getShardIterator(
-					subscribedShard,
-					ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(),
-					sequenceNumber.getSequenceNumber());
-		}
-	}
-
-	protected String getShardIteratorForAggregatedSequenceNumber(SequenceNumber sequenceNumber)
-			throws Exception {
-
-		String itrForLastAggregatedRecord =
-				kinesis.getShardIterator(
-						subscribedShard,
-						ShardIteratorType.AT_SEQUENCE_NUMBER.toString(),
-						sequenceNumber.getSequenceNumber());
-
-		// get only the last aggregated record
-		GetRecordsResult getRecordsResult = getRecords(itrForLastAggregatedRecord, 1);
-
-		List<UserRecord> fetchedRecords = deaggregateRecords(
-				getRecordsResult.getRecords(),
-				subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
-				subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
-
-		long lastSubSequenceNum = sequenceNumber.getSubSequenceNumber();
-		for (UserRecord record : fetchedRecords) {
-			// we have found a dangling sub-record if it has a larger subsequence number
-			// than our last sequence number; if so, collect the record and update state
-			if (record.getSubSequenceNumber() > lastSubSequenceNum) {
-				deserializeRecordForCollectionAndUpdateState(record);
-			}
-		}
-
-		return getRecordsResult.getNextShardIterator();
 	}
 
-	@SuppressWarnings("unchecked")
 	@Override
 	public void run() {
 		try {
-			String nextShardItr = getShardIterator(lastSequenceNum);
+			while (isRunning()) {
+				final RecordPublisherRunResult result = recordPublisher.run(batch -> {
+					for (UserRecord userRecord : batch.getDeaggregatedRecords()) {
+						if (filterDeaggregatedRecord(userRecord)) {
+							deserializeRecordForCollectionAndUpdateState(userRecord);
+						}
+					}
 
-			long processingStartTimeNanos = System.nanoTime();
+					shardConsumerMetricsReporter.setAverageRecordSizeBytes(batch.getAverageRecordSizeBytes());
+					shardConsumerMetricsReporter.setNumberOfAggregatedRecords(batch.getAggregatedRecordSize());
+					shardConsumerMetricsReporter.setNumberOfDeaggregatedRecords(batch.getDeaggregatedRecordSize());
+					ofNullable(batch.getMillisBehindLatest()).ifPresent(shardConsumerMetricsReporter::setMillisBehindLatest);
 
-			while (isRunning()) {
-				if (nextShardItr == null) {
+					return lastSequenceNum;
+				});
+
+				if (result == COMPLETE) {
 					fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
 
 					// we can close this consumer thread once we've reached the end of the subscribed shard
 					break;
-				} else {
-					shardMetricsReporter.setMaxNumberOfRecordsPerFetch(maxNumberOfRecordsPerFetch);
-					GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
-
-					List<Record> aggregatedRecords = getRecordsResult.getRecords();
-					int numberOfAggregatedRecords = aggregatedRecords.size();
-					shardMetricsReporter.setNumberOfAggregatedRecords(numberOfAggregatedRecords);
-
-					// each of the Kinesis records may be aggregated, so we must deaggregate them before proceeding
-					List<UserRecord> fetchedRecords = deaggregateRecords(
-						aggregatedRecords,
-						subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
-						subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
-
-					long recordBatchSizeBytes = 0L;
-					for (UserRecord record : fetchedRecords) {
-						recordBatchSizeBytes += record.getData().remaining();
-						deserializeRecordForCollectionAndUpdateState(record);
-					}
-
-					int numberOfDeaggregatedRecords = fetchedRecords.size();
-					shardMetricsReporter.setNumberOfDeaggregatedRecords(numberOfDeaggregatedRecords);
-
-					nextShardItr = getRecordsResult.getNextShardIterator();
-
-					long adjustmentEndTimeNanos = adjustRunLoopFrequency(processingStartTimeNanos, System.nanoTime());
-					long runLoopTimeNanos = adjustmentEndTimeNanos - processingStartTimeNanos;
-					maxNumberOfRecordsPerFetch = adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), recordBatchSizeBytes, maxNumberOfRecordsPerFetch);
-					shardMetricsReporter.setRunLoopTimeNanos(runLoopTimeNanos);
-					processingStartTimeNanos = adjustmentEndTimeNanos; // for next time through the loop
 				}
 			}
 		} catch (Throwable t) {
@@ -277,59 +129,9 @@ public class ShardConsumer<T> implements Runnable {
 	}
 
 	/**
-	 * Adjusts loop timing to match target frequency if specified.
-	 * @param processingStartTimeNanos The start time of the run loop "work"
-	 * @param processingEndTimeNanos The end time of the run loop "work"
-	 * @return The System.nanoTime() after the sleep (if any)
-	 * @throws InterruptedException
-	 */
-	protected long adjustRunLoopFrequency(long processingStartTimeNanos, long processingEndTimeNanos)
-		throws InterruptedException {
-		long endTimeNanos = processingEndTimeNanos;
-		if (fetchIntervalMillis != 0) {
-			long processingTimeNanos = processingEndTimeNanos - processingStartTimeNanos;
-			long sleepTimeMillis = fetchIntervalMillis - (processingTimeNanos / 1_000_000);
-			if (sleepTimeMillis > 0) {
-				Thread.sleep(sleepTimeMillis);
-				endTimeNanos = System.nanoTime();
-				shardMetricsReporter.setSleepTimeMillis(sleepTimeMillis);
-			}
-		}
-		return endTimeNanos;
-	}
-
-	/**
-	 * Calculates how many records to read each time through the loop based on a target throughput
-	 * and the measured frequenecy of the loop.
-	 * @param runLoopTimeNanos The total time of one pass through the loop
-	 * @param numRecords The number of records of the last read operation
-	 * @param recordBatchSizeBytes The total batch size of the last read operation
-	 * @param maxNumberOfRecordsPerFetch The current maxNumberOfRecordsPerFetch
-	 */
-	private int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, long recordBatchSizeBytes,
-			int maxNumberOfRecordsPerFetch) {
-		if (useAdaptiveReads && numRecords != 0 && runLoopTimeNanos != 0) {
-			long averageRecordSizeBytes = recordBatchSizeBytes / numRecords;
-			// Adjust number of records to fetch from the shard depending on current average record size
-			// to optimize 2 Mb / sec read limits
-			double loopFrequencyHz = 1000000000.0d / runLoopTimeNanos;
-			double bytesPerRead = KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / loopFrequencyHz;
-			maxNumberOfRecordsPerFetch = (int) (bytesPerRead / averageRecordSizeBytes);
-			// Ensure the value is greater than 0 and not more than 10000L
-			maxNumberOfRecordsPerFetch = Math.max(1, Math.min(maxNumberOfRecordsPerFetch, ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX));
-
-			// Set metrics
-			shardMetricsReporter.setAverageRecordSizeBytes(averageRecordSizeBytes);
-			shardMetricsReporter.setLoopFrequencyHz(loopFrequencyHz);
-			shardMetricsReporter.setBytesPerRead(bytesPerRead);
-		}
-		return maxNumberOfRecordsPerFetch;
-	}
-
-	/**
 	 * The loop in run() checks this before fetching next batch of records. Since this runnable will be executed
-	 * by the ExecutorService {@link KinesisDataFetcher#shardConsumersExecutor}, the only way to close down this thread
-	 * would be by calling shutdownNow() on {@link KinesisDataFetcher#shardConsumersExecutor} and let the executor service
+	 * by the ExecutorService {@code KinesisDataFetcher#shardConsumersExecutor}, the only way to close down this thread
+	 * would be by calling shutdownNow() on {@code KinesisDataFetcher#shardConsumersExecutor} and let the executor service
 	 * interrupt all currently running {@link ShardConsumer}s.
 	 */
 	private boolean isRunning() {
@@ -338,18 +140,16 @@ public class ShardConsumer<T> implements Runnable {
 
 	/**
 	 * Deserializes a record for collection, and accordingly updates the shard state in the fetcher. The last
-	 * successfully collected sequence number in this shard consumer is also updated so that
-	 * {@link ShardConsumer#getRecords(String, int)} may be able to use the correct sequence number to refresh shard
+	 * successfully collected sequence number in this shard consumer is also updated so that a
+	 * {@link RecordPublisher} may be able to use the correct sequence number to refresh shard
 	 * iterators if necessary.
 	 *
 	 * <p>Note that the server-side Kinesis timestamp is attached to the record when collected. When the
 	 * user programs uses {@link TimeCharacteristic#EventTime}, this timestamp will be used by default.
 	 *
 	 * @param record record to deserialize and collect
-	 * @throws IOException
 	 */
-	private void deserializeRecordForCollectionAndUpdateState(UserRecord record)
-		throws IOException {
+	private void deserializeRecordForCollectionAndUpdateState(final UserRecord record) {
 		ByteBuffer recordData = record.getData();
 
 		byte[] dataBytes = new byte[recordData.remaining()];
@@ -357,13 +157,18 @@ public class ShardConsumer<T> implements Runnable {
 
 		final long approxArrivalTimestamp = record.getApproximateArrivalTimestamp().getTime();
 
-		final T value = deserializer.deserialize(
-			dataBytes,
-			record.getPartitionKey(),
-			record.getSequenceNumber(),
-			approxArrivalTimestamp,
-			subscribedShard.getStreamName(),
-			subscribedShard.getShard().getShardId());
+		final T value;
+		try {
+			value = deserializer.deserialize(
+				dataBytes,
+				record.getPartitionKey(),
+				record.getSequenceNumber(),
+				approxArrivalTimestamp,
+				subscribedShard.getStreamName(),
+				subscribedShard.getShard().getShardId());
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
 
 		SequenceNumber collectedSequenceNumber = (record.isAggregated())
 			? new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber())
@@ -375,53 +180,22 @@ public class ShardConsumer<T> implements Runnable {
 			subscribedShardStateIndex,
 			collectedSequenceNumber);
 
-		lastSequenceNum = collectedSequenceNumber;
+		this.lastSequenceNum = collectedSequenceNumber;
 	}
 
 	/**
-	 * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected
-	 * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on
-	 * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should
-	 * be used for the next call to this method.
-	 *
-	 * <p>Note: it is important that this method is not called again before all the records from the last result have been
-	 * fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise
-	 * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to
-	 * incorrect shard iteration if the iterator had to be refreshed.
+	 * Filters out aggregated records that have previously been processed.
+	 * This method is to support restarting from a partially consumed aggregated sequence number.
 	 *
-	 * @param shardItr shard iterator to use
-	 * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt
-	 * @return get records result
-	 * @throws InterruptedException
+	 * @param record the record to filter
+	 * @return {@code true} if the record should be retained
 	 */
-	private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws Exception {
-		GetRecordsResult getRecordsResult = null;
-		while (getRecordsResult == null) {
-			try {
-				getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords);
-
-				// Update millis behind latest so it gets reported by the millisBehindLatest gauge
-				Long millisBehindLatest = getRecordsResult.getMillisBehindLatest();
-				if (millisBehindLatest != null) {
-					shardMetricsReporter.setMillisBehindLatest(millisBehindLatest);
-				}
-			} catch (ExpiredIteratorException eiEx) {
-				LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
-					" refreshing the iterator ...", shardItr, subscribedShard);
-
-				shardItr = getShardIterator(lastSequenceNum);
-
-				// sleep for the fetch interval before the next getRecords attempt with the refreshed iterator
-				if (fetchIntervalMillis != 0) {
-					Thread.sleep(fetchIntervalMillis);
-				}
-			}
+	private boolean filterDeaggregatedRecord(final UserRecord record) {
+		if (!lastSequenceNum.isAggregated()) {
+			return true;
 		}
-		return getRecordsResult;
-	}
 
-	@SuppressWarnings("unchecked")
-	protected static List<UserRecord> deaggregateRecords(List<Record> records, String startingHashKey, String endingHashKey) {
-		return UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey));
+		return !record.getSequenceNumber().equals(lastSequenceNum.getSequenceNumber()) ||
+			record.getSubSequenceNumber() > lastSequenceNum.getSubSequenceNumber();
 	}
 }
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordBatch.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordBatch.java
new file mode 100644
index 0000000..f6ce271
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordBatch.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.util.Preconditions;
+
+import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
+import com.amazonaws.services.kinesis.model.Record;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+import java.util.List;
+
+/**
+ * A batch of UserRecords received from Kinesis.
+ * Input records are de-aggregated using KCL 1.x library.
+ * It is expected that AWS SDK v2.x messages are converted to KCL 1.x {@link UserRecord}.
+ */
+@Internal
+public class RecordBatch {
+
+	private final int aggregatedRecordSize;
+
+	private final List<UserRecord> deaggregatedRecords;
+
+	private final long totalSizeInBytes;
+
+	private final Long millisBehindLatest;
+
+	public RecordBatch(
+			final List<Record> records,
+			final StreamShardHandle subscribedShard,
+			@Nullable final Long millisBehindLatest) {
+		Preconditions.checkNotNull(subscribedShard);
+		this.aggregatedRecordSize = Preconditions.checkNotNull(records).size();
+		this.deaggregatedRecords = deaggregateRecords(records, subscribedShard);
+		this.totalSizeInBytes = this.deaggregatedRecords.stream().mapToInt(r -> r.getData().remaining()).sum();
+		this.millisBehindLatest = millisBehindLatest;
+	}
+
+	public int getAggregatedRecordSize() {
+		return aggregatedRecordSize;
+	}
+
+	public int getDeaggregatedRecordSize() {
+		return deaggregatedRecords.size();
+	}
+
+	public List<UserRecord> getDeaggregatedRecords() {
+		return deaggregatedRecords;
+	}
+
+	public long getTotalSizeInBytes() {
+		return totalSizeInBytes;
+	}
+
+	public long getAverageRecordSizeBytes() {
+		return deaggregatedRecords.isEmpty() ? 0 : getTotalSizeInBytes() / getDeaggregatedRecordSize();
+	}
+
+	@Nullable
+	public Long getMillisBehindLatest() {
+		return millisBehindLatest;
+	}
+
+	private List<UserRecord> deaggregateRecords(final List<Record> records, final StreamShardHandle subscribedShard) {
+		BigInteger start = new BigInteger(subscribedShard.getShard().getHashKeyRange().getStartingHashKey());
+		BigInteger end = new BigInteger(subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
+
+		return UserRecord.deaggregate(records, start, end);
+	}
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisher.java
new file mode 100644
index 0000000..29e4fb4
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisher.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+
+/**
+ * A {@code RecordPublisher} will consume records from an external stream and deliver them to the registered subscriber.
+ */
+@Internal
+public interface RecordPublisher {
+
+	/**
+	 * Run the record publisher. Records will be consumed from the stream and published to the consumer.
+	 * The number of batches retrieved by a single invocation will vary based on implementation.
+	 *
+	 * @param recordBatchConsumer the record batch consumer in which to output records
+	 * @return a status enum to represent whether a shard has been fully consumed
+	 * @throws InterruptedException
+	 */
+	RecordPublisherRunResult run(RecordBatchConsumer recordBatchConsumer) throws InterruptedException;
+
+	/**
+	 * A status enum to represent whether a shard has been fully consumed.
+	 */
+	enum RecordPublisherRunResult {
+		/** There are no more records to consume from this shard. */
+		COMPLETE,
+
+		/** There are more records to consume from this shard. */
+		INCOMPLETE
+	}
+
+	/**
+	 * An interface used to collect record batches, and reply with the latest consumed sequence number.
+	 */
+	interface RecordBatchConsumer {
+
+		SequenceNumber accept(RecordBatch recordBatch);
+	}
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisherFactory.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisherFactory.java
new file mode 100644
index 0000000..0dcd0cb
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisherFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import java.util.Properties;
+
+/**
+ * A factory interface used to create instances of {@link RecordPublisher}.
+ */
+@Internal
+public interface RecordPublisherFactory {
+
+	/**
+	 * Create a {@link RecordPublisher}.
+	 *
+	 * @param startingPosition the position in the shard to start consuming records from
+	 * @param consumerConfig the properties used to configure the {@link RecordPublisher}.
+	 * @param metricGroup the {@link MetricGroup} used to report metrics to
+	 * @param streamShardHandle the stream shard in which to consume from
+	 * @return the constructed {@link RecordPublisher}
+	 */
+	RecordPublisher create(
+			StartingPosition startingPosition,
+			Properties consumerConfig,
+			MetricGroup metricGroup,
+			StreamShardHandle streamShardHandle) throws InterruptedException;
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/AdaptivePollingRecordPublisher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/AdaptivePollingRecordPublisher.java
new file mode 100644
index 0000000..bdb7167
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/AdaptivePollingRecordPublisher.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+
+/**
+ * An adaptive record publisher to add a dynamic loop delay and batch read size for {@link PollingRecordPublisher}.
+ * Kinesis Streams have quotas on the transactions per second, and throughout. This class attempts to balance
+ * quotas and mitigate back off errors.
+ */
+@Internal
+public class AdaptivePollingRecordPublisher extends PollingRecordPublisher {
+	// AWS Kinesis has a read limit of 2 Mb/sec
+	// https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html
+	private static final long KINESIS_SHARD_BYTES_PER_SECOND_LIMIT = 2 * 1024L * 1024L;
+
+	private int lastRecordBatchSize = 0;
+
+	private long lastRecordBatchSizeInBytes = 0;
+
+	private long processingStartTimeNanos = System.nanoTime();
+
+	private int maxNumberOfRecordsPerFetch;
+
+	private final long fetchIntervalMillis;
+
+	private final PollingRecordPublisherMetricsReporter metricsReporter;
+
+	AdaptivePollingRecordPublisher(
+			final StartingPosition startingPosition,
+			final StreamShardHandle subscribedShard,
+			final PollingRecordPublisherMetricsReporter metricsReporter,
+			final KinesisProxyInterface kinesisProxy,
+			final int maxNumberOfRecordsPerFetch,
+			final long fetchIntervalMillis) throws InterruptedException {
+		super(startingPosition, subscribedShard, metricsReporter, kinesisProxy, maxNumberOfRecordsPerFetch, fetchIntervalMillis);
+		this.maxNumberOfRecordsPerFetch = maxNumberOfRecordsPerFetch;
+		this.fetchIntervalMillis = fetchIntervalMillis;
+		this.metricsReporter = metricsReporter;
+	}
+
+	@Override
+	public RecordPublisherRunResult run(final RecordBatchConsumer consumer) throws InterruptedException {
+		final RecordPublisherRunResult result = super.run(batch -> {
+			SequenceNumber latestSequenceNumber = consumer.accept(batch);
+			lastRecordBatchSize = batch.getDeaggregatedRecordSize();
+			lastRecordBatchSizeInBytes = batch.getTotalSizeInBytes();
+			return latestSequenceNumber;
+		}, maxNumberOfRecordsPerFetch);
+
+		long adjustmentEndTimeNanos = adjustRunLoopFrequency(processingStartTimeNanos, System.nanoTime());
+		long runLoopTimeNanos = adjustmentEndTimeNanos - processingStartTimeNanos;
+		maxNumberOfRecordsPerFetch = adaptRecordsToRead(runLoopTimeNanos, lastRecordBatchSize, lastRecordBatchSizeInBytes, maxNumberOfRecordsPerFetch);
+		processingStartTimeNanos = adjustmentEndTimeNanos;
+		metricsReporter.setRunLoopTimeNanos(runLoopTimeNanos);
+
+		return result;
+	}
+
+	/**
+	 * Adjusts loop timing to match target frequency if specified.
+	 * @param processingStartTimeNanos The start time of the run loop "work"
+	 * @param processingEndTimeNanos The end time of the run loop "work"
+	 * @return The System.nanoTime() after the sleep (if any)
+	 * @throws InterruptedException
+	 */
+	private long adjustRunLoopFrequency(long processingStartTimeNanos, long processingEndTimeNanos)
+			throws InterruptedException {
+		long endTimeNanos = processingEndTimeNanos;
+		if (fetchIntervalMillis != 0) {
+			long processingTimeNanos = processingEndTimeNanos - processingStartTimeNanos;
+			long sleepTimeMillis = fetchIntervalMillis - (processingTimeNanos / 1_000_000);
+			if (sleepTimeMillis > 0) {
+				Thread.sleep(sleepTimeMillis);
+				endTimeNanos = System.nanoTime();
+				metricsReporter.setSleepTimeMillis(sleepTimeMillis);
+			}
+		}
+		return endTimeNanos;
+	}
+
+	/**
+	 * Calculates how many records to read each time through the loop based on a target throughput
+	 * and the measured frequenecy of the loop.
+	 * @param runLoopTimeNanos The total time of one pass through the loop
+	 * @param numRecords The number of records of the last read operation
+	 * @param recordBatchSizeBytes The total batch size of the last read operation
+	 * @param maxNumberOfRecordsPerFetch The current maxNumberOfRecordsPerFetch
+	 */
+	private int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, long recordBatchSizeBytes,
+			int maxNumberOfRecordsPerFetch) {
+		if (numRecords != 0 && runLoopTimeNanos != 0) {
+			long averageRecordSizeBytes = recordBatchSizeBytes / numRecords;
+			// Adjust number of records to fetch from the shard depending on current average record size
+			// to optimize 2 Mb / sec read limits
+			double loopFrequencyHz = 1000000000.0d / runLoopTimeNanos;
+			double bytesPerRead = KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / loopFrequencyHz;
+			maxNumberOfRecordsPerFetch = (int) (bytesPerRead / averageRecordSizeBytes);
+			// Ensure the value is greater than 0 and not more than 10000L
+			maxNumberOfRecordsPerFetch = Math.max(1, Math.min(maxNumberOfRecordsPerFetch, ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX));
+
+			// Set metrics
+			metricsReporter.setLoopFrequencyHz(loopFrequencyHz);
+			metricsReporter.setBytesPerRead(bytesPerRead);
+		}
+		return maxNumberOfRecordsPerFetch;
+	}
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java
new file mode 100644
index 0000000..4edc1f0
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.util.Preconditions;
+
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.LATEST;
+import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
+import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
+
+/**
+ * A {@link RecordPublisher} that will read records from Kinesis and forward them to the subscriber.
+ * Records are consumed by polling the GetRecords KDS API using a ShardIterator.
+ */
+@Internal
+public class PollingRecordPublisher implements RecordPublisher {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PollingRecordPublisher.class);
+
+	private final PollingRecordPublisherMetricsReporter metricsReporter;
+
+	private final KinesisProxyInterface kinesisProxy;
+
+	private final StreamShardHandle subscribedShard;
+
+	private String nextShardItr;
+
+	private StartingPosition nextStartingPosition;
+
+	private final int maxNumberOfRecordsPerFetch;
+
+	private final long expiredIteratorBackoffMillis;
+
+	/**
+	 * A Polling implementation of {@link RecordPublisher} that polls kinesis for records.
+	 * The following KDS services are used: GetRecords and GetShardIterator.
+	 *
+	 * @param startingPosition the position in the stream to start consuming from
+	 * @param subscribedShard the shard in which to consume from
+	 * @param metricsReporter a metric reporter used to output metrics
+	 * @param kinesisProxy the proxy used to communicate with kinesis
+	 * @param maxNumberOfRecordsPerFetch the maximum number of records to retrieve per batch
+	 * @param expiredIteratorBackoffMillis the duration to sleep in the event of an {@link ExpiredIteratorException}
+	 */
+	PollingRecordPublisher(
+			final StartingPosition startingPosition,
+			final StreamShardHandle subscribedShard,
+			final PollingRecordPublisherMetricsReporter metricsReporter,
+			final KinesisProxyInterface kinesisProxy,
+			final int maxNumberOfRecordsPerFetch,
+			final long expiredIteratorBackoffMillis) throws InterruptedException {
+		this.nextStartingPosition = Preconditions.checkNotNull(startingPosition);
+		this.subscribedShard = Preconditions.checkNotNull(subscribedShard);
+		this.metricsReporter = Preconditions.checkNotNull(metricsReporter);
+		this.kinesisProxy = Preconditions.checkNotNull(kinesisProxy);
+		this.maxNumberOfRecordsPerFetch = maxNumberOfRecordsPerFetch;
+		this.expiredIteratorBackoffMillis = expiredIteratorBackoffMillis;
+
+		Preconditions.checkArgument(expiredIteratorBackoffMillis >= 0);
+		Preconditions.checkArgument(maxNumberOfRecordsPerFetch > 0);
+
+		this.nextShardItr = getShardIterator();
+	}
+
+	@Override
+	public RecordPublisherRunResult run(final RecordBatchConsumer consumer) throws InterruptedException {
+		return run(consumer, maxNumberOfRecordsPerFetch);
+	}
+
+	public RecordPublisherRunResult run(final RecordBatchConsumer consumer, int maxNumberOfRecords) throws InterruptedException {
+		if (nextShardItr == null) {
+			return COMPLETE;
+		}
+
+		metricsReporter.setMaxNumberOfRecordsPerFetch(maxNumberOfRecords);
+
+		GetRecordsResult result = getRecords(nextShardItr, maxNumberOfRecords);
+
+		RecordBatch recordBatch = new RecordBatch(result.getRecords(), subscribedShard, result.getMillisBehindLatest());
+		SequenceNumber latestSeequenceNumber = consumer.accept(recordBatch);
+
+		nextStartingPosition = StartingPosition.continueFromSequenceNumber(latestSeequenceNumber);
+		nextShardItr = result.getNextShardIterator();
+		return nextShardItr == null ? COMPLETE : INCOMPLETE;
+	}
+
+	/**
+	 * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected
+	 * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on
+	 * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should
+	 * be used for the next call to this method.
+	 *
+	 * <p>Note: it is important that this method is not called again before all the records from the last result have been
+	 * fully collected with {@code ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise
+	 * {@code ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to
+	 * incorrect shard iteration if the iterator had to be refreshed.
+	 *
+	 * @param shardItr shard iterator to use
+	 * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt
+	 * @return get records result
+	 */
+	private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException {
+		GetRecordsResult getRecordsResult = null;
+		while (getRecordsResult == null) {
+			try {
+				getRecordsResult = kinesisProxy.getRecords(shardItr, maxNumberOfRecords);
+			} catch (ExpiredIteratorException eiEx) {
+				LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
+					" refreshing the iterator ...", shardItr, subscribedShard);
+
+				shardItr = getShardIterator();
+
+				// sleep for the fetch interval before the next getRecords attempt with the refreshed iterator
+				if (expiredIteratorBackoffMillis != 0) {
+					Thread.sleep(expiredIteratorBackoffMillis);
+				}
+			}
+		}
+		return getRecordsResult;
+	}
+
+	/**
+	 * Returns a shard iterator for the given {@link SequenceNumber}.
+	 *
+	 * @return shard iterator
+	 */
+	@Nullable
+	private String getShardIterator() throws InterruptedException {
+		if (nextStartingPosition.getShardIteratorType() == LATEST && subscribedShard.isClosed()) {
+			return null;
+		}
+
+		return kinesisProxy.getShardIterator(
+			subscribedShard,
+			nextStartingPosition.getShardIteratorType().toString(),
+			nextStartingPosition.getStartingMarker());
+	}
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherConfiguration.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherConfiguration.java
new file mode 100644
index 0000000..d5a6baf
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherConfiguration.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+
+import java.util.Properties;
+
+/**
+ * A configuration class for {@link PollingRecordPublisher} instantiated from a properties map.
+ */
+@Internal
+public class PollingRecordPublisherConfiguration {
+
+	private final boolean adaptiveReads;
+
+	private final int maxNumberOfRecordsPerFetch;
+
+	private final long fetchIntervalMillis;
+
+	public PollingRecordPublisherConfiguration(final Properties consumerConfig) {
+		this.maxNumberOfRecordsPerFetch = Integer.parseInt(consumerConfig.getProperty(
+			ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
+			Integer.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX)));
+
+		this.fetchIntervalMillis = Long.parseLong(consumerConfig.getProperty(
+			ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
+			Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
+
+		this.adaptiveReads = Boolean.parseBoolean(consumerConfig.getProperty(
+			ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS,
+			Boolean.toString(ConsumerConfigConstants.DEFAULT_SHARD_USE_ADAPTIVE_READS)));
+	}
+
+	public boolean isAdaptiveReads() {
+		return adaptiveReads;
+	}
+
+	public int getMaxNumberOfRecordsPerFetch() {
+		return maxNumberOfRecordsPerFetch;
+	}
+
+	public long getFetchIntervalMillis() {
+		return fetchIntervalMillis;
+	}
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactory.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactory.java
new file mode 100644
index 0000000..ee5034f
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactory.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.FlinkKinesisProxyFactory;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisherFactory;
+import org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Properties;
+
+/**
+ * A {@link RecordPublisher} factory used to create instances of {@link PollingRecordPublisher}.
+ */
+@Internal
+public class PollingRecordPublisherFactory implements RecordPublisherFactory {
+
+	private final FlinkKinesisProxyFactory kinesisProxyFactory;
+
+	public PollingRecordPublisherFactory(FlinkKinesisProxyFactory kinesisProxyFactory) {
+		this.kinesisProxyFactory = kinesisProxyFactory;
+	}
+
+	/**
+	 * Create a {@link PollingRecordPublisher}.
+	 * An {@link AdaptivePollingRecordPublisher} will be created should adaptive reads be enabled in the configuration.
+	 *
+	 * @param startingPosition the position in the shard to start consuming records from
+	 * @param consumerConfig the consumer configuration properties
+	 * @param metricGroup the metric group to report metrics to
+	 * @param streamShardHandle the shard this consumer is subscribed to
+	 * @return a {@link PollingRecordPublisher}
+	 */
+	@Override
+	public PollingRecordPublisher create(
+			final StartingPosition startingPosition,
+			final Properties consumerConfig,
+			final MetricGroup metricGroup,
+			final StreamShardHandle streamShardHandle) throws InterruptedException {
+		Preconditions.checkNotNull(startingPosition);
+		Preconditions.checkNotNull(consumerConfig);
+		Preconditions.checkNotNull(metricGroup);
+		Preconditions.checkNotNull(streamShardHandle);
+
+		final PollingRecordPublisherConfiguration configuration = new PollingRecordPublisherConfiguration(consumerConfig);
+		final PollingRecordPublisherMetricsReporter metricsReporter = new PollingRecordPublisherMetricsReporter(metricGroup);
+		final KinesisProxyInterface kinesisProxy = kinesisProxyFactory.create(consumerConfig);
+
+		if (configuration.isAdaptiveReads()) {
+			return new AdaptivePollingRecordPublisher(
+				startingPosition,
+				streamShardHandle,
+				metricsReporter,
+				kinesisProxy,
+				configuration.getMaxNumberOfRecordsPerFetch(),
+				configuration.getFetchIntervalMillis());
+		} else {
+			return new PollingRecordPublisher(
+				startingPosition,
+				streamShardHandle,
+				metricsReporter,
+				kinesisProxy,
+				configuration.getMaxNumberOfRecordsPerFetch(),
+				configuration.getFetchIntervalMillis());
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/PollingRecordPublisherMetricsReporter.java
similarity index 62%
copy from flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java
copy to flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/PollingRecordPublisherMetricsReporter.java
index 4a27b9c..c0f77a8 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/PollingRecordPublisherMetricsReporter.java
@@ -19,30 +19,27 @@
 package org.apache.flink.streaming.connectors.kinesis.metrics;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher;
 
 /**
- * A container for {@link ShardConsumer}s to report metric values.
+ * A container for {@link PollingRecordPublisher}s to report metric values.
  */
 @Internal
-public class ShardMetricsReporter {
+public class PollingRecordPublisherMetricsReporter {
 
-	private volatile long millisBehindLatest = -1;
 	private volatile double loopFrequencyHz = 0.0;
 	private volatile double bytesPerRead = 0.0;
 	private volatile long runLoopTimeNanos = 0L;
-	private volatile long averageRecordSizeBytes = 0L;
 	private volatile long sleepTimeMillis = 0L;
-	private volatile int numberOfAggregatedRecords = 0;
-	private volatile int numberOfDeaggregatedRecords = 0;
 	private volatile int maxNumberOfRecordsPerFetch = 0;
 
-	public long getMillisBehindLatest() {
-		return millisBehindLatest;
-	}
-
-	public void setMillisBehindLatest(long millisBehindLatest) {
-		this.millisBehindLatest = millisBehindLatest;
+	public PollingRecordPublisherMetricsReporter(final MetricGroup metricGroup) {
+		metricGroup.gauge(KinesisConsumerMetricConstants.MAX_RECORDS_PER_FETCH, this::getMaxNumberOfRecordsPerFetch);
+		metricGroup.gauge(KinesisConsumerMetricConstants.BYTES_PER_READ, this::getBytesPerRead);
+		metricGroup.gauge(KinesisConsumerMetricConstants.RUNTIME_LOOP_NANOS, this::getRunLoopTimeNanos);
+		metricGroup.gauge(KinesisConsumerMetricConstants.LOOP_FREQUENCY_HZ, this::getLoopFrequencyHz);
+		metricGroup.gauge(KinesisConsumerMetricConstants.SLEEP_TIME_MILLIS, this::getSleepTimeMillis);
 	}
 
 	public double getLoopFrequencyHz() {
@@ -69,14 +66,6 @@ public class ShardMetricsReporter {
 		this.runLoopTimeNanos = runLoopTimeNanos;
 	}
 
-	public long getAverageRecordSizeBytes() {
-		return averageRecordSizeBytes;
-	}
-
-	public void setAverageRecordSizeBytes(long averageRecordSizeBytes) {
-		this.averageRecordSizeBytes = averageRecordSizeBytes;
-	}
-
 	public long getSleepTimeMillis() {
 		return sleepTimeMillis;
 	}
@@ -85,22 +74,6 @@ public class ShardMetricsReporter {
 		this.sleepTimeMillis = sleepTimeMillis;
 	}
 
-	public int getNumberOfAggregatedRecords() {
-		return numberOfAggregatedRecords;
-	}
-
-	public void setNumberOfAggregatedRecords(int numberOfAggregatedRecords) {
-		this.numberOfAggregatedRecords = numberOfAggregatedRecords;
-	}
-
-	public int getNumberOfDeaggregatedRecords() {
-		return numberOfDeaggregatedRecords;
-	}
-
-	public void setNumberOfDeaggregatedRecords(int numberOfDeaggregatedRecords) {
-		this.numberOfDeaggregatedRecords = numberOfDeaggregatedRecords;
-	}
-
 	public int getMaxNumberOfRecordsPerFetch() {
 		return maxNumberOfRecordsPerFetch;
 	}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardConsumerMetricsReporter.java
similarity index 64%
rename from flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java
rename to flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardConsumerMetricsReporter.java
index 4a27b9c..81cce80 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardConsumerMetricsReporter.java
@@ -19,23 +19,26 @@
 package org.apache.flink.streaming.connectors.kinesis.metrics;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer;
 
 /**
  * A container for {@link ShardConsumer}s to report metric values.
  */
 @Internal
-public class ShardMetricsReporter {
+public class ShardConsumerMetricsReporter {
 
 	private volatile long millisBehindLatest = -1;
-	private volatile double loopFrequencyHz = 0.0;
-	private volatile double bytesPerRead = 0.0;
-	private volatile long runLoopTimeNanos = 0L;
 	private volatile long averageRecordSizeBytes = 0L;
-	private volatile long sleepTimeMillis = 0L;
 	private volatile int numberOfAggregatedRecords = 0;
 	private volatile int numberOfDeaggregatedRecords = 0;
-	private volatile int maxNumberOfRecordsPerFetch = 0;
+
+	public ShardConsumerMetricsReporter(final MetricGroup metricGroup) {
+		metricGroup.gauge(KinesisConsumerMetricConstants.MILLIS_BEHIND_LATEST_GAUGE, this::getMillisBehindLatest);
+		metricGroup.gauge(KinesisConsumerMetricConstants.NUM_AGGREGATED_RECORDS_PER_FETCH, this::getNumberOfAggregatedRecords);
+		metricGroup.gauge(KinesisConsumerMetricConstants.NUM_DEAGGREGATED_RECORDS_PER_FETCH, this::getNumberOfDeaggregatedRecords);
+		metricGroup.gauge(KinesisConsumerMetricConstants.AVG_RECORD_SIZE_BYTES, this::getAverageRecordSizeBytes);
+	}
 
 	public long getMillisBehindLatest() {
 		return millisBehindLatest;
@@ -45,30 +48,6 @@ public class ShardMetricsReporter {
 		this.millisBehindLatest = millisBehindLatest;
 	}
 
-	public double getLoopFrequencyHz() {
-		return loopFrequencyHz;
-	}
-
-	public void setLoopFrequencyHz(double loopFrequencyHz) {
-		this.loopFrequencyHz = loopFrequencyHz;
-	}
-
-	public double getBytesPerRead() {
-		return bytesPerRead;
-	}
-
-	public void setBytesPerRead(double bytesPerRead) {
-		this.bytesPerRead = bytesPerRead;
-	}
-
-	public long getRunLoopTimeNanos() {
-		return runLoopTimeNanos;
-	}
-
-	public void setRunLoopTimeNanos(long runLoopTimeNanos) {
-		this.runLoopTimeNanos = runLoopTimeNanos;
-	}
-
 	public long getAverageRecordSizeBytes() {
 		return averageRecordSizeBytes;
 	}
@@ -77,14 +56,6 @@ public class ShardMetricsReporter {
 		this.averageRecordSizeBytes = averageRecordSizeBytes;
 	}
 
-	public long getSleepTimeMillis() {
-		return sleepTimeMillis;
-	}
-
-	public void setSleepTimeMillis(long sleepTimeMillis) {
-		this.sleepTimeMillis = sleepTimeMillis;
-	}
-
 	public int getNumberOfAggregatedRecords() {
 		return numberOfAggregatedRecords;
 	}
@@ -101,12 +72,4 @@ public class ShardMetricsReporter {
 		this.numberOfDeaggregatedRecords = numberOfDeaggregatedRecords;
 	}
 
-	public int getMaxNumberOfRecordsPerFetch() {
-		return maxNumberOfRecordsPerFetch;
-	}
-
-	public void setMaxNumberOfRecordsPerFetch(int maxNumberOfRecordsPerFetch) {
-		this.maxNumberOfRecordsPerFetch = maxNumberOfRecordsPerFetch;
-	}
-
 }
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StartingPosition.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StartingPosition.java
new file mode 100644
index 0000000..e2090de
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StartingPosition.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import org.apache.flink.annotation.Internal;
+
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+
+import javax.annotation.Nullable;
+
+import java.util.Date;
+
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.LATEST;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.TRIM_HORIZON;
+import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.isSentinelSequenceNumber;
+
+/**
+ * The position in which to start consuming from a stream.
+ */
+@Internal
+public class StartingPosition {
+
+	private final ShardIteratorType shardIteratorType;
+
+	private final Object startingMarker;
+
+	private StartingPosition(final ShardIteratorType shardIteratorType, @Nullable final Object startingMarker) {
+		this.shardIteratorType = shardIteratorType;
+		this.startingMarker = startingMarker;
+	}
+
+	public ShardIteratorType getShardIteratorType() {
+		return shardIteratorType;
+	}
+
+	@Nullable
+	public Object getStartingMarker() {
+		return startingMarker;
+	}
+
+	public static StartingPosition fromTimestamp(final Date date) {
+		return new StartingPosition(AT_TIMESTAMP, date);
+	}
+
+	/**
+	 * Returns the starting position for the next record to consume from the given sequence number.
+	 * The difference between {@code restartFromSequenceNumber()} and {@code continueFromSequenceNumber()} is that
+	 * for {@code restartFromSequenceNumber()} aggregated records are reread to support subsequence failure.
+	 *
+	 * @param sequenceNumber the last successful sequence number, or sentinel marker
+	 * @return the start position in which to consume from
+	 */
+	public static StartingPosition continueFromSequenceNumber(final SequenceNumber sequenceNumber) {
+		return fromSequenceNumber(sequenceNumber, false);
+	}
+
+	/**
+	 * Returns the starting position to restart record consumption from the given sequence number after failure.
+	 * The difference between {@code restartFromSequenceNumber()} and {@code continueFromSequenceNumber()} is that
+	 * for {@code restartFromSequenceNumber()} aggregated records are reread to support subsequence failure.
+	 *
+	 * @param sequenceNumber the last successful sequence number, or sentinel marker
+	 * @return the start position in which to consume from
+	 */
+	public static StartingPosition restartFromSequenceNumber(final SequenceNumber sequenceNumber) {
+		return fromSequenceNumber(sequenceNumber, true);
+	}
+
+	private static StartingPosition fromSequenceNumber(final SequenceNumber sequenceNumber, final boolean restart) {
+		if (isSentinelSequenceNumber(sequenceNumber)) {
+			return new StartingPosition(fromSentinelSequenceNumber(sequenceNumber), null);
+		} else {
+			// we will be starting from an actual sequence number (due to restore from failure).
+			return new StartingPosition(getShardIteratorType(sequenceNumber, restart), sequenceNumber.getSequenceNumber());
+		}
+	}
+
+	private static ShardIteratorType getShardIteratorType(final SequenceNumber sequenceNumber, final boolean restart) {
+		return restart && sequenceNumber.isAggregated() ? AT_SEQUENCE_NUMBER : AFTER_SEQUENCE_NUMBER;
+	}
+
+	private static ShardIteratorType fromSentinelSequenceNumber(final SequenceNumber sequenceNumber) {
+		if (sequenceNumber.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get())) {
+			return LATEST;
+		} else if (sequenceNumber.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())) {
+			return TRIM_HORIZON;
+		} else {
+			throw new IllegalArgumentException("Unexpected sentinel type: " + sequenceNumber);
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index fe4ae32b..5f2d6d5 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -128,13 +128,13 @@ public class KinesisProxy implements KinesisProxyInterface {
 	/** Maximum retry attempts for the get shard iterator operation. */
 	private final int getShardIteratorMaxRetries;
 
-	/* Backoff millis for the describe stream operation. */
+	/** Backoff millis for the describe stream operation. */
 	private final long describeStreamBaseBackoffMillis;
 
-	/* Maximum backoff millis for the describe stream operation. */
+	/** Maximum backoff millis for the describe stream operation. */
 	private final long describeStreamMaxBackoffMillis;
 
-	/* Exponential backoff power constant for the describe stream operation. */
+	/** Exponential backoff power constant for the describe stream operation. */
 	private final double describeStreamExpConstant;
 
 	/**
@@ -148,61 +148,61 @@ public class KinesisProxy implements KinesisProxyInterface {
 
 		this.kinesisClient = createKinesisClient(configProps);
 
-		this.listShardsBaseBackoffMillis = Long.valueOf(
+		this.listShardsBaseBackoffMillis = Long.parseLong(
 			configProps.getProperty(
 				ConsumerConfigConstants.LIST_SHARDS_BACKOFF_BASE,
 				Long.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_BACKOFF_BASE)));
-		this.listShardsMaxBackoffMillis = Long.valueOf(
+		this.listShardsMaxBackoffMillis = Long.parseLong(
 			configProps.getProperty(
 				ConsumerConfigConstants.LIST_SHARDS_BACKOFF_MAX,
 				Long.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_BACKOFF_MAX)));
-		this.listShardsExpConstant = Double.valueOf(
+		this.listShardsExpConstant = Double.parseDouble(
 			configProps.getProperty(
 				ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT,
 				Double.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT)));
-		this.listShardsMaxRetries = Integer.valueOf(
+		this.listShardsMaxRetries = Integer.parseInt(
 			configProps.getProperty(
 				ConsumerConfigConstants.LIST_SHARDS_RETRIES,
 				Long.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_RETRIES)));
-		this.describeStreamBaseBackoffMillis = Long.valueOf(
+		this.describeStreamBaseBackoffMillis = Long.parseLong(
 				configProps.getProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE,
 						Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE)));
-		this.describeStreamMaxBackoffMillis = Long.valueOf(
+		this.describeStreamMaxBackoffMillis = Long.parseLong(
 				configProps.getProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX,
 						Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX)));
-		this.describeStreamExpConstant = Double.valueOf(
+		this.describeStreamExpConstant = Double.parseDouble(
 				configProps.getProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
 						Double.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT)));
-		this.getRecordsBaseBackoffMillis = Long.valueOf(
+		this.getRecordsBaseBackoffMillis = Long.parseLong(
 			configProps.getProperty(
 				ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE,
 				Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE)));
-		this.getRecordsMaxBackoffMillis = Long.valueOf(
+		this.getRecordsMaxBackoffMillis = Long.parseLong(
 			configProps.getProperty(
 				ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX,
 				Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX)));
-		this.getRecordsExpConstant = Double.valueOf(
+		this.getRecordsExpConstant = Double.parseDouble(
 			configProps.getProperty(
 				ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
 				Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT)));
-		this.getRecordsMaxRetries = Integer.valueOf(
+		this.getRecordsMaxRetries = Integer.parseInt(
 			configProps.getProperty(
 				ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES,
 				Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_RETRIES)));
 
-		this.getShardIteratorBaseBackoffMillis = Long.valueOf(
+		this.getShardIteratorBaseBackoffMillis = Long.parseLong(
 			configProps.getProperty(
 				ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE,
 				Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE)));
-		this.getShardIteratorMaxBackoffMillis = Long.valueOf(
+		this.getShardIteratorMaxBackoffMillis = Long.parseLong(
 			configProps.getProperty(
 				ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX,
 				Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX)));
-		this.getShardIteratorExpConstant = Double.valueOf(
+		this.getShardIteratorExpConstant = Double.parseDouble(
 			configProps.getProperty(
 				ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
 				Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT)));
-		this.getShardIteratorMaxRetries = Integer.valueOf(
+		this.getShardIteratorMaxRetries = Integer.parseInt(
 			configProps.getProperty(
 				ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES,
 				Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_RETRIES)));
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
index b5926b1..6652529 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -21,6 +21,9 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider;
+import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
 
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.ClientConfigurationFactory;
@@ -47,10 +50,13 @@ import com.fasterxml.jackson.databind.deser.DefaultDeserializationContext;
 import com.fasterxml.jackson.databind.deser.DeserializerFactory;
 
 import java.io.IOException;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
+import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM;
+
 /**
  * Some utilities specific to Amazon Web Service.
  */
@@ -240,4 +246,22 @@ public class AWSUtil {
 		}
 	}
 
+	/**
+	 * Creates a {@link StartingPosition} from the given {@link SequenceNumber} and {@link Properties}.
+	 * In the case we are restating from a {@link SentinelSequenceNumber#SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM}, the date
+	 * is parsed from the properties.
+	 *
+	 * @param sequenceNumber the sequence number to resume from
+	 * @param configProps the properties to parse date from
+	 * @return the starting position
+	 */
+	public static StartingPosition getStartingPosition(final SequenceNumber sequenceNumber, final Properties configProps) {
+		if (SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get().equals(sequenceNumber)) {
+			Date timestamp = KinesisConfigUtil.parseStreamTimestampStartingPosition(configProps);
+			return StartingPosition.fromTimestamp(timestamp);
+		} else {
+			return StartingPosition.restartFromSequenceNumber(sequenceNumber);
+		}
+	}
+
 }
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index ace075e..6bd5548 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -31,10 +31,14 @@ import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
 
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -105,13 +109,13 @@ public class KinesisConfigUtil {
 
 			// specified initial timestamp in stream when using AT_TIMESTAMP
 			if (InitialPosition.valueOf(initPosType) == InitialPosition.AT_TIMESTAMP) {
-				if (!config.containsKey(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP)) {
+				if (!config.containsKey(STREAM_INITIAL_TIMESTAMP)) {
 					throw new IllegalArgumentException("Please set value for initial timestamp ('"
-						+ ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position.");
+						+ STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position.");
 				}
 				validateOptionalDateProperty(config,
-					ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP,
-					config.getProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT),
+					STREAM_INITIAL_TIMESTAMP,
+					config.getProperty(STREAM_TIMESTAMP_DATE_FORMAT, DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT),
 					"Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream. "
 						+ "Must be a valid format: yyyy-MM-dd'T'HH:mm:ss.SSSXXX or non-negative double value. For example, 2016-04-04T19:58:46.480-00:00 or 1459799926.480 .");
 			}
@@ -300,6 +304,26 @@ public class KinesisConfigUtil {
 		}
 	}
 
+	/**
+	 * Parses the timestamp in which to start consuming from the stream, from the given properties.
+	 *
+	 * @param consumerConfig the properties to parse timestamp from
+	 * @return the timestamp
+	 */
+	public static Date parseStreamTimestampStartingPosition(final Properties consumerConfig) {
+		String timestamp = consumerConfig.getProperty(STREAM_INITIAL_TIMESTAMP);
+
+		try {
+			String format = consumerConfig.getProperty(STREAM_TIMESTAMP_DATE_FORMAT, DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT);
+			SimpleDateFormat customDateFormat = new SimpleDateFormat(format);
+			return customDateFormat.parse(timestamp);
+		} catch (IllegalArgumentException | NullPointerException exception) {
+			throw new IllegalArgumentException(exception);
+		} catch (ParseException exception) {
+			return new Date((long) (Double.parseDouble(timestamp) * 1000));
+		}
+	}
+
 	private static void validateOptionalPositiveLongProperty(Properties config, String key, String message) {
 		if (config.containsKey(key)) {
 			try {
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index 2815193..478564b 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -152,7 +152,7 @@ public class KinesisDataFetcherTest extends TestLogger {
 		// emitting a null (i.e., a corrupt record) should not produce any output, but still have the shard state updated
 		fetcher.emitRecordAndUpdateState(null, 10L, 1, new SequenceNumber("seq-num-2"));
 			assertEquals(new SequenceNumber("seq-num-2"), testShardStates.get(1).getLastProcessedSequenceNum());
-		assertEquals(null, sourceContext.removeLatestOutput()); // no output should have been collected
+		assertNull(sourceContext.removeLatestOutput()); // no output should have been collected
 	}
 
 	@Test
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
index 6fbe4ee..b39b99e 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
@@ -18,9 +18,13 @@
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.streaming.connectors.kinesis.metrics.ShardMetricsReporter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisherFactory;
+import org.apache.flink.streaming.connectors.kinesis.metrics.ShardConsumerMetricsReporter;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
@@ -28,6 +32,7 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavi
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestSourceContext;
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcher;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
 
 import com.amazonaws.services.kinesis.model.HashKeyRange;
 import com.amazonaws.services.kinesis.model.Shard;
@@ -52,6 +57,7 @@ import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequen
 import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
@@ -61,12 +67,11 @@ import static org.mockito.Mockito.verify;
 public class ShardConsumerTest {
 
 	@Test
-	public void testMetricsReporting() {
+	public void testMetricsReporting() throws Exception {
 		KinesisProxyInterface kinesis = FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(500, 5, 500);
 
-		ShardMetricsReporter metrics = assertNumberOfMessagesReceivedFromKinesis(500, kinesis, fakeSequenceNumber());
+		ShardConsumerMetricsReporter metrics = assertNumberOfMessagesReceivedFromKinesis(500, kinesis, fakeSequenceNumber());
 		assertEquals(500, metrics.getMillisBehindLatest());
-		assertEquals(10000, metrics.getMaxNumberOfRecordsPerFetch());
 	}
 
 	@Test
@@ -105,7 +110,7 @@ public class ShardConsumerTest {
 	}
 
 	@Test
-	public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithUnexpectedExpiredIterator() {
+	public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithUnexpectedExpiredIterator() throws Exception {
 		KinesisProxyInterface kinesis = FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(1000, 9, 7, 500L);
 
 		// Get a total of 1000 records with 9 getRecords() calls,
@@ -114,7 +119,7 @@ public class ShardConsumerTest {
 	}
 
 	@Test
-	public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithAdaptiveReads() {
+	public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithAdaptiveReads() throws Exception {
 		Properties consumerProperties = new Properties();
 		consumerProperties.setProperty(SHARD_USE_ADAPTIVE_READS, "true");
 
@@ -133,7 +138,7 @@ public class ShardConsumerTest {
 		// Expecting to receive all messages
 		// 10 batches of 3 aggregated records each with 5 child records
 		// 10 * 3 * 5 = 150
-		ShardMetricsReporter metrics = assertNumberOfMessagesReceivedFromKinesis(150, kinesis, fakeSequenceNumber());
+		ShardConsumerMetricsReporter metrics = assertNumberOfMessagesReceivedFromKinesis(150, kinesis, fakeSequenceNumber());
 		assertEquals(3, metrics.getNumberOfAggregatedRecords());
 		assertEquals(15, metrics.getNumberOfDeaggregatedRecords());
 
@@ -149,7 +154,7 @@ public class ShardConsumerTest {
 		// 5 batches of 1 aggregated record each with 10 child records
 		// Last consumed message was sub-sequence 5 (6/10) (zero based) (remaining are 6, 7, 8, 9)
 		// 5 * 1 * 10 - 6 = 44
-		ShardMetricsReporter metrics = assertNumberOfMessagesReceivedFromKinesis(44, kinesis, sequenceNumber);
+		ShardConsumerMetricsReporter metrics = assertNumberOfMessagesReceivedFromKinesis(44, kinesis, sequenceNumber);
 		assertEquals(1, metrics.getNumberOfAggregatedRecords());
 		assertEquals(10, metrics.getNumberOfDeaggregatedRecords());
 
@@ -160,19 +165,20 @@ public class ShardConsumerTest {
 		return new SequenceNumber("fakeStartingState");
 	}
 
-	private ShardMetricsReporter assertNumberOfMessagesReceivedFromKinesis(
-			final int expectedNumberOfMessages,
-			final KinesisProxyInterface kinesis,
-			final SequenceNumber startingSequenceNumber) {
+	private ShardConsumerMetricsReporter assertNumberOfMessagesReceivedFromKinesis(
+		final int expectedNumberOfMessages,
+		final KinesisProxyInterface kinesis,
+		final SequenceNumber startingSequenceNumber) throws Exception {
 		return assertNumberOfMessagesReceivedFromKinesis(expectedNumberOfMessages, kinesis, startingSequenceNumber, new Properties());
 	}
 
-	private ShardMetricsReporter assertNumberOfMessagesReceivedFromKinesis(
-			final int expectedNumberOfMessages,
-			final KinesisProxyInterface kinesis,
-			final SequenceNumber startingSequenceNumber,
-			final Properties consumerProperties) {
-		ShardMetricsReporter shardMetricsReporter = new ShardMetricsReporter();
+	private ShardConsumerMetricsReporter assertNumberOfMessagesReceivedFromKinesis(
+		final int expectedNumberOfMessages,
+		final KinesisProxyInterface kinesis,
+		final SequenceNumber startingSequenceNumber,
+		final Properties consumerProperties) throws Exception {
+		ShardConsumerMetricsReporter shardMetricsReporter = new ShardConsumerMetricsReporter(mock(MetricGroup.class));
+
 		StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);
 
 		LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
@@ -197,13 +203,20 @@ public class ShardConsumerTest {
 				KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
 				Mockito.mock(KinesisProxyInterface.class));
 
+		final StreamShardHandle shardHandle = subscribedShardsStateUnderTest.get(0).getStreamShardHandle();
+		SequenceNumber lastProcessedSequenceNum = subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum();
+		StartingPosition startingPosition = AWSUtil.getStartingPosition(lastProcessedSequenceNum, consumerProperties);
+
+		final RecordPublisher recordPublisher = new PollingRecordPublisherFactory(config -> kinesis)
+			.create(startingPosition, fetcher.getConsumerConfiguration(), mock(MetricGroup.class), shardHandle);
+
 		int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
 		new ShardConsumer<>(
 			fetcher,
+			recordPublisher,
 			shardIndex,
-			subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
-			subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
-			kinesis,
+			shardHandle,
+			lastProcessedSequenceNum,
 			shardMetricsReporter,
 			deserializationSchema)
 			.run();
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordBatchTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordBatchTest.java
new file mode 100644
index 0000000..2446093
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordBatchTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
+
+import com.amazonaws.services.kinesis.model.Record;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils.createDummyStreamShardHandle;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link RecordBatch}.
+ */
+public class RecordBatchTest {
+
+	@Test
+	public void testDeaggregateRecordsPassThrough() {
+		RecordBatch result = new RecordBatch(Arrays.asList(
+			record("1"),
+			record("2"),
+			record("3"),
+			record("4")
+		), createDummyStreamShardHandle(), 100L);
+
+		assertEquals(4, result.getAggregatedRecordSize());
+		assertEquals(4, result.getDeaggregatedRecordSize());
+		assertEquals(128, result.getTotalSizeInBytes());
+		assertEquals(32, result.getAverageRecordSizeBytes());
+	}
+
+	@Test
+	public void testDeaggregateRecordsWithAggregatedRecords() {
+		final List<Record> records = TestUtils.createAggregatedRecordBatch(5, 5, new AtomicInteger());
+		RecordBatch result = new RecordBatch(records, createDummyStreamShardHandle(), 100L);
+
+		assertEquals(5, result.getAggregatedRecordSize());
+		assertEquals(25, result.getDeaggregatedRecordSize());
+		assertEquals(25 * 1024, result.getTotalSizeInBytes());
+		assertEquals(1024, result.getAverageRecordSizeBytes());
+	}
+
+	@Test
+	public void testGetAverageRecordSizeBytesEmptyList() {
+		RecordBatch result = new RecordBatch(emptyList(), createDummyStreamShardHandle(), 100L);
+
+		assertEquals(0, result.getAggregatedRecordSize());
+		assertEquals(0, result.getDeaggregatedRecordSize());
+		assertEquals(0, result.getAverageRecordSizeBytes());
+	}
+
+	@Test
+	public void testGetMillisBehindLatest() {
+		RecordBatch result = new RecordBatch(singletonList(record("1")), createDummyStreamShardHandle(), 100L);
+
+		assertEquals(Long.valueOf(100), result.getMillisBehindLatest());
+	}
+
+	private Record record(final String sequenceNumber) {
+		byte[] data = RandomStringUtils.randomAlphabetic(32)
+			.getBytes(ConfigConstants.DEFAULT_CHARSET);
+
+		return new Record()
+			.withData(ByteBuffer.wrap(data))
+			.withPartitionKey("pk")
+			.withSequenceNumber(sequenceNumber);
+	}
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherConfigurationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherConfigurationTest.java
new file mode 100644
index 0000000..b091256
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherConfigurationTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
+
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SHARD_GETRECORDS_MAX;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link PollingRecordPublisherConfiguration}.
+ */
+public class PollingRecordPublisherConfigurationTest {
+
+	@Test
+	public void testDefaults() {
+		PollingRecordPublisherConfiguration configuration = new PollingRecordPublisherConfiguration(new Properties());
+		assertEquals(configuration.getFetchIntervalMillis(), 200);
+		assertEquals(configuration.getMaxNumberOfRecordsPerFetch(), 10000);
+		assertFalse(configuration.isAdaptiveReads());
+	}
+
+	@Test
+	public void testGetFetchIntervalMillis() {
+		Properties properties = properties(SHARD_GETRECORDS_INTERVAL_MILLIS, "1");
+		PollingRecordPublisherConfiguration configuration = new PollingRecordPublisherConfiguration(properties);
+
+		assertEquals(configuration.getFetchIntervalMillis(), 1);
+	}
+
+	@Test
+	public void testGetMaxNumberOfRecordsPerFetch() {
+		Properties properties = properties(SHARD_GETRECORDS_MAX, "2");
+		PollingRecordPublisherConfiguration configuration = new PollingRecordPublisherConfiguration(properties);
+
+		assertEquals(configuration.getMaxNumberOfRecordsPerFetch(), 2);
+	}
+
+	@Test
+	public void testIsAdaptiveReads() {
+		Properties properties = properties(SHARD_USE_ADAPTIVE_READS, "true");
+		PollingRecordPublisherConfiguration configuration = new PollingRecordPublisherConfiguration(properties);
+
+		assertTrue(configuration.isAdaptiveReads());
+	}
+
+	private Properties properties(final String key, final String value) {
+		final Properties properties = new Properties();
+		properties.setProperty(key, value);
+		return properties;
+	}
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactoryTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactoryTest.java
new file mode 100644
index 0000000..c249db7
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactoryTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS;
+import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link PollingRecordPublisherFactory}.
+ */
+public class PollingRecordPublisherFactoryTest {
+
+	private final PollingRecordPublisherFactory factory = new PollingRecordPublisherFactory(props -> mock(KinesisProxy.class));
+
+	@Test
+	public void testBuildPollingRecordPublisher() throws Exception {
+		RecordPublisher recordPublisher = factory.create(
+			StartingPosition.restartFromSequenceNumber(SENTINEL_LATEST_SEQUENCE_NUM.get()),
+			new Properties(),
+			mock(MetricGroup.class),
+			mock(StreamShardHandle.class));
+
+		assertTrue(recordPublisher instanceof PollingRecordPublisher);
+		assertFalse(recordPublisher instanceof AdaptivePollingRecordPublisher);
+	}
+
+	@Test
+	public void testBuildAdaptivePollingRecordPublisher() throws Exception {
+		Properties properties = new Properties();
+		properties.setProperty(SHARD_USE_ADAPTIVE_READS, "true");
+
+		RecordPublisher recordPublisher = factory.create(
+			StartingPosition.restartFromSequenceNumber(SENTINEL_LATEST_SEQUENCE_NUM.get()),
+			properties,
+			mock(MetricGroup.class),
+			mock(StreamShardHandle.class));
+
+		assertTrue(recordPublisher instanceof PollingRecordPublisher);
+		assertTrue(recordPublisher instanceof AdaptivePollingRecordPublisher);
+	}
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherTest.java
new file mode 100644
index 0000000..2bbe3da
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordBatchConsumer;
+import org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
+
+import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
+import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
+import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM;
+import static org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for {@link PollingRecordPublisher}.
+ */
+public class PollingRecordPublisherTest {
+
+	@Rule
+	public ExpectedException thrown = ExpectedException.none();
+
+	@Test
+	public void testRunPublishesRecordsToConsumer() throws Exception {
+		KinesisProxyInterface fakeKinesis = totalNumOfRecordsAfterNumOfGetRecordsCalls(5, 1, 100);
+		PollingRecordPublisher recordPublisher = createPollingRecordPublisher(fakeKinesis);
+
+		TestConsumer consumer = new TestConsumer();
+		recordPublisher.run(consumer);
+
+		assertEquals(1, consumer.recordBatches.size());
+		assertEquals(5, consumer.recordBatches.get(0).getDeaggregatedRecordSize());
+		assertEquals(100L, consumer.recordBatches.get(0).getMillisBehindLatest(), 0);
+	}
+
+	@Test
+	public void testRunReturnsCompleteWhenShardExpires() throws Exception {
+		// There are 2 batches available in the stream
+		KinesisProxyInterface fakeKinesis = totalNumOfRecordsAfterNumOfGetRecordsCalls(5, 2, 100);
+		PollingRecordPublisher recordPublisher = createPollingRecordPublisher(fakeKinesis);
+
+		// First call results in INCOMPLETE, there is one batch left
+		assertEquals(INCOMPLETE, recordPublisher.run(new TestConsumer()));
+
+		// After second call the shard is complete
+		assertEquals(COMPLETE, recordPublisher.run(new TestConsumer()));
+	}
+
+	@Test
+	public void testRunOnCompletelyConsumedShardReturnsComplete() throws Exception {
+		KinesisProxyInterface fakeKinesis = totalNumOfRecordsAfterNumOfGetRecordsCalls(5, 1, 100);
+		PollingRecordPublisher recordPublisher = createPollingRecordPublisher(fakeKinesis);
+
+		assertEquals(COMPLETE, recordPublisher.run(new TestConsumer()));
+		assertEquals(COMPLETE, recordPublisher.run(new TestConsumer()));
+	}
+
+	@Test
+	public void testRunGetShardIteratorReturnsNullIsComplete() throws Exception {
+		KinesisProxyInterface fakeKinesis = FakeKinesisBehavioursFactory.noShardsFoundForRequestedStreamsBehaviour();
+		PollingRecordPublisher recordPublisher = createPollingRecordPublisher(fakeKinesis);
+
+		assertEquals(COMPLETE, recordPublisher.run(new TestConsumer()));
+	}
+
+	@Test
+	public void testRunGetRecordsRecoversFromExpiredIteratorException() throws Exception  {
+		KinesisProxyInterface fakeKinesis = spy(FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(2, 2, 1, 500));
+		PollingRecordPublisher recordPublisher = createPollingRecordPublisher(fakeKinesis);
+
+		recordPublisher.run(new TestConsumer());
+
+		// Get shard iterator is called twice, once during first run, secondly to refresh expired iterator
+		verify(fakeKinesis, times(2)).getShardIterator(any(), any(), any());
+	}
+
+	@Test
+	public void validateExpiredIteratorBackoffMillisNegativeThrows() throws Exception {
+		thrown.expect(IllegalArgumentException.class);
+
+		new PollingRecordPublisher(
+			StartingPosition.restartFromSequenceNumber(SENTINEL_EARLIEST_SEQUENCE_NUM.get()),
+			TestUtils.createDummyStreamShardHandle(),
+			mock(PollingRecordPublisherMetricsReporter.class),
+			mock(KinesisProxyInterface.class),
+			100,
+			-1);
+	}
+
+	@Test
+	public void validateMaxNumberOfRecordsPerFetchZeroThrows() throws Exception  {
+		thrown.expect(IllegalArgumentException.class);
+
+		new PollingRecordPublisher(
+			StartingPosition.restartFromSequenceNumber(SENTINEL_EARLIEST_SEQUENCE_NUM.get()),
+			TestUtils.createDummyStreamShardHandle(),
+			mock(PollingRecordPublisherMetricsReporter.class),
+			mock(KinesisProxyInterface.class),
+			0,
+			100);
+	}
+
+	PollingRecordPublisher createPollingRecordPublisher(final KinesisProxyInterface kinesis) throws Exception {
+		PollingRecordPublisherMetricsReporter metricsReporter = new PollingRecordPublisherMetricsReporter(mock(MetricGroup.class));
+
+		return new PollingRecordPublisher(
+			StartingPosition.restartFromSequenceNumber(SENTINEL_EARLIEST_SEQUENCE_NUM.get()),
+			TestUtils.createDummyStreamShardHandle(),
+			metricsReporter,
+			kinesis,
+			10000,
+			500L);
+	}
+
+	private static class TestConsumer implements RecordBatchConsumer {
+		private final List<RecordBatch> recordBatches = new ArrayList<>();
+		private String latestSequenceNumber;
+
+		@Override
+		public SequenceNumber accept(final RecordBatch batch) {
+			recordBatches.add(batch);
+
+			if (batch.getDeaggregatedRecordSize() > 0) {
+				List<UserRecord> records = batch.getDeaggregatedRecords();
+				latestSequenceNumber = records.get(records.size() - 1).getSequenceNumber();
+			}
+
+			return new SequenceNumber(latestSequenceNumber);
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/metrics/PollingRecordPublisherMetricsReporterTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/metrics/PollingRecordPublisherMetricsReporterTest.java
new file mode 100644
index 0000000..8cc2e17
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/metrics/PollingRecordPublisherMetricsReporterTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.metrics;
+
+import org.apache.flink.metrics.MetricGroup;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for {@link PollingRecordPublisherMetricsReporter}.
+ */
+public class PollingRecordPublisherMetricsReporterTest {
+
+	@InjectMocks
+	private PollingRecordPublisherMetricsReporter metricsReporter;
+
+	@Mock
+	private MetricGroup metricGroup;
+
+	@Before
+	public void setUp() {
+		MockitoAnnotations.initMocks(this);
+	}
+
+	@Test
+	public void testMetricIdentifiers() {
+		verify(metricGroup).gauge(eq(KinesisConsumerMetricConstants.BYTES_PER_READ), any());
+		verify(metricGroup).gauge(eq(KinesisConsumerMetricConstants.LOOP_FREQUENCY_HZ), any());
+		verify(metricGroup).gauge(eq(KinesisConsumerMetricConstants.MAX_RECORDS_PER_FETCH), any());
+		verify(metricGroup).gauge(eq(KinesisConsumerMetricConstants.RUNTIME_LOOP_NANOS), any());
+		verify(metricGroup).gauge(eq(KinesisConsumerMetricConstants.SLEEP_TIME_MILLIS), any());
+	}
+
+	@Test
+	public void testGettersAndSetters() {
+		metricsReporter.setBytesPerRead(1);
+		metricsReporter.setLoopFrequencyHz(2);
+		metricsReporter.setMaxNumberOfRecordsPerFetch(3);
+		metricsReporter.setRunLoopTimeNanos(4);
+		metricsReporter.setSleepTimeMillis(5);
+
+		assertEquals(1, metricsReporter.getBytesPerRead(), 0);
+		assertEquals(2, metricsReporter.getLoopFrequencyHz(), 0);
+		assertEquals(3, metricsReporter.getMaxNumberOfRecordsPerFetch());
+		assertEquals(4, metricsReporter.getRunLoopTimeNanos());
+		assertEquals(5, metricsReporter.getSleepTimeMillis());
+	}
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardConsumerMetricsReporterTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardConsumerMetricsReporterTest.java
new file mode 100644
index 0000000..33e6fd3
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardConsumerMetricsReporterTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.metrics;
+
+import org.apache.flink.metrics.MetricGroup;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for {@link ShardConsumerMetricsReporter}.
+ */
+public class ShardConsumerMetricsReporterTest {
+
+	@InjectMocks
+	private ShardConsumerMetricsReporter metricsReporter;
+
+	@Mock
+	private MetricGroup metricGroup;
+
+	@Before
+	public void setUp() {
+		MockitoAnnotations.initMocks(this);
+	}
+
+	@Test
+	public void testMetricIdentifiers() {
+		verify(metricGroup).gauge(eq(KinesisConsumerMetricConstants.AVG_RECORD_SIZE_BYTES), any());
+		verify(metricGroup).gauge(eq(KinesisConsumerMetricConstants.MILLIS_BEHIND_LATEST_GAUGE), any());
+		verify(metricGroup).gauge(eq(KinesisConsumerMetricConstants.NUM_AGGREGATED_RECORDS_PER_FETCH), any());
+		verify(metricGroup).gauge(eq(KinesisConsumerMetricConstants.NUM_DEAGGREGATED_RECORDS_PER_FETCH), any());
+	}
+
+	@Test
+	public void testGettersAndSetters() {
+		metricsReporter.setAverageRecordSizeBytes(1);
+		metricsReporter.setMillisBehindLatest(2);
+		metricsReporter.setNumberOfAggregatedRecords(3);
+		metricsReporter.setNumberOfDeaggregatedRecords(4);
+
+		assertEquals(1, metricsReporter.getAverageRecordSizeBytes());
+		assertEquals(2, metricsReporter.getMillisBehindLatest());
+		assertEquals(3, metricsReporter.getNumberOfAggregatedRecords());
+		assertEquals(4, metricsReporter.getNumberOfDeaggregatedRecords());
+	}
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/model/StartingPositionTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/model/StartingPositionTest.java
new file mode 100644
index 0000000..d2c507d
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/model/StartingPositionTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Date;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Tests for {@link StartingPosition}.
+ */
+public class StartingPositionTest {
+
+	@Rule
+	public final ExpectedException thrown = ExpectedException.none();
+
+	@Test
+	public void testStartingPositionFromTimestamp() {
+		Date date = new Date();
+		StartingPosition position = StartingPosition.fromTimestamp(date);
+		assertEquals(ShardIteratorType.AT_TIMESTAMP, position.getShardIteratorType());
+		assertEquals(date, position.getStartingMarker());
+	}
+
+	@Test
+	public void testStartingPositionRestartFromSequenceNumber() {
+		SequenceNumber sequenceNumber = new SequenceNumber("100");
+		StartingPosition position = StartingPosition.restartFromSequenceNumber(sequenceNumber);
+		assertEquals(ShardIteratorType.AFTER_SEQUENCE_NUMBER, position.getShardIteratorType());
+		assertEquals("100", position.getStartingMarker());
+	}
+
+	@Test
+	public void testStartingPositionRestartFromAggregatedSequenceNumber() {
+		SequenceNumber sequenceNumber = new SequenceNumber("200", 3);
+		StartingPosition position = StartingPosition.restartFromSequenceNumber(sequenceNumber);
+		assertEquals(ShardIteratorType.AT_SEQUENCE_NUMBER, position.getShardIteratorType());
+		assertEquals("200", position.getStartingMarker());
+	}
+
+	@Test
+	public void testStartingPositionContinueFromAggregatedSequenceNumber() {
+		SequenceNumber sequenceNumber = new SequenceNumber("200", 3);
+		StartingPosition position = StartingPosition.continueFromSequenceNumber(sequenceNumber);
+		assertEquals(ShardIteratorType.AFTER_SEQUENCE_NUMBER, position.getShardIteratorType());
+		assertEquals("200", position.getStartingMarker());
+	}
+
+	@Test
+	public void testStartingPositionRestartFromSentinelEarliest() {
+		SequenceNumber sequenceNumber = SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get();
+		StartingPosition position = StartingPosition.restartFromSequenceNumber(sequenceNumber);
+		assertEquals(ShardIteratorType.TRIM_HORIZON, position.getShardIteratorType());
+		assertNull(position.getStartingMarker());
+	}
+
+	@Test
+	public void testStartingPositionRestartFromSentinelLatest() {
+		SequenceNumber sequenceNumber = SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get();
+		StartingPosition position = StartingPosition.restartFromSequenceNumber(sequenceNumber);
+		assertEquals(ShardIteratorType.LATEST, position.getShardIteratorType());
+		assertNull(position.getStartingMarker());
+	}
+
+	@Test
+	public void testStartingPositionRestartFromSentinelEnding() {
+		thrown.expect(IllegalArgumentException.class);
+
+		SequenceNumber sequenceNumber = SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get();
+		StartingPosition position = StartingPosition.restartFromSequenceNumber(sequenceNumber);
+		assertEquals(ShardIteratorType.LATEST, position.getShardIteratorType());
+		assertNull(position.getStartingMarker());
+	}
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index 177a7cf..af3bdab 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -43,6 +43,7 @@ import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils.createDummyStreamShardHandle;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
@@ -121,10 +122,10 @@ public class FakeKinesisBehavioursFactory {
 	 * @param numOfGetRecordsCalls the number batches available in the fake stream
 	 */
 	public static KinesisProxyInterface aggregatedRecords(
-			final int numOfAggregatedRecords,
-			final int numOfChildRecords,
-			final int numOfGetRecordsCalls) {
-			return new SingleShardEmittingAggregatedRecordsKinesis(numOfAggregatedRecords, numOfChildRecords, numOfGetRecordsCalls);
+		final int numOfAggregatedRecords,
+		final int numOfChildRecords,
+		final int numOfGetRecordsCalls) {
+		return new SingleShardEmittingAggregatedRecordsKinesis(numOfAggregatedRecords, numOfChildRecords, numOfGetRecordsCalls);
 	}
 
 	public static KinesisProxyInterface blockingQueueGetRecords(Map<String, List<BlockingQueue<String>>> streamsToShardQueues) {
@@ -335,14 +336,14 @@ public class FakeKinesisBehavioursFactory {
 
 		public SingleShardEmittingAggregatedRecordsKinesis(
 				final int numOfAggregatedRecords,
-				final int numOfChildRecords,
-				final int numOfGetRecordsCalls) {
+			final int numOfChildRecords,
+			final int numOfGetRecordsCalls) {
 			super(initShardItrToRecordBatch(numOfAggregatedRecords, numOfChildRecords, numOfGetRecordsCalls));
 		}
 
 		private static Map<String, List<Record>> initShardItrToRecordBatch(final int numOfAggregatedRecords,
-				final int numOfChildRecords,
-				final int numOfGetRecordsCalls) {
+			final int numOfChildRecords,
+			final int numOfGetRecordsCalls) {
 
 			Map<String, List<Record>> shardToRecordBatch = new HashMap<>();
 
@@ -415,9 +416,7 @@ public class FakeKinesisBehavioursFactory {
 					List<StreamShardHandle> shardsOfStream = new ArrayList<>(shardCount);
 					for (int i = 0; i < shardCount; i++) {
 						shardsOfStream.add(
-							new StreamShardHandle(
-								streamName,
-								new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i))));
+							createDummyStreamShardHandle(streamName, KinesisShardIdGenerator.generateFromShardOrder(i)));
 					}
 					streamsWithListOfShards.put(streamName, shardsOfStream);
 				}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
index 7fce762..7c5c786 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
@@ -19,12 +19,18 @@ package org.apache.flink.streaming.connectors.kinesis.testutils;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 
 import com.amazonaws.kinesis.agg.AggRecord;
 import com.amazonaws.kinesis.agg.RecordAggregator;
+import com.amazonaws.services.kinesis.model.HashKeyRange;
 import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
+import com.amazonaws.services.kinesis.model.Shard;
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.StringUtils;
 
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Date;
@@ -93,4 +99,21 @@ public class TestUtils {
 		return recordBatch;
 	}
 
+	public static StreamShardHandle createDummyStreamShardHandle() {
+		return createDummyStreamShardHandle("stream-name", "000000");
+	}
+
+	public static StreamShardHandle createDummyStreamShardHandle(final String streamName, final String shardId) {
+		final Shard shard = new Shard()
+			.withSequenceNumberRange(new SequenceNumberRange()
+				.withStartingSequenceNumber("0")
+				.withEndingSequenceNumber("9999999999999"))
+			.withHashKeyRange(new HashKeyRange()
+				.withStartingHashKey("0")
+				.withEndingHashKey(new BigInteger(StringUtils.repeat("FF", 16), 16).toString()))
+			.withShardId(shardId);
+
+		return new StreamShardHandle(streamName, shard);
+	}
+
 }
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtilTest.java
index 47ea500..64fd5d1 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtilTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtilTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.kinesis.util;
 
 import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
 
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
@@ -29,9 +30,19 @@ import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.text.SimpleDateFormat;
+import java.util.Date;
 import java.util.Properties;
 
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.LATEST;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP;
+import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM;
+import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -80,4 +91,26 @@ public class AWSUtilTest {
 	public void testInvalidRegion() {
 		assertFalse(AWSUtil.isValidRegion("ur-east-1"));
 	}
+
+	@Test
+	public void testGetStartingPositionForLatest() {
+		StartingPosition position = AWSUtil.getStartingPosition(SENTINEL_LATEST_SEQUENCE_NUM.get(), new Properties());
+
+		assertEquals(LATEST, position.getShardIteratorType());
+		assertNull(position.getStartingMarker());
+	}
+
+	@Test
+	public void testGetStartingPositionForTimestamp() throws Exception {
+		String timestamp = "2020-08-13T09:18:00.0+01:00";
+		Date expectedTimestamp = new SimpleDateFormat(DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT).parse(timestamp);
+
+		Properties consumerProperties = new Properties();
+		consumerProperties.setProperty(STREAM_INITIAL_TIMESTAMP, timestamp);
+
+		StartingPosition position = AWSUtil.getStartingPosition(SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get(), consumerProperties);
+
+		assertEquals(AT_TIMESTAMP, position.getShardIteratorType());
+		assertEquals(expectedTimestamp, position.getStartingMarker());
+	}
 }
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
index f589514..bd298d0 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
@@ -30,8 +30,13 @@ import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.text.SimpleDateFormat;
+import java.util.Date;
 import java.util.Properties;
 
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -41,6 +46,7 @@ import static org.junit.Assert.fail;
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(KinesisConfigUtil.class)
 public class KinesisConfigUtilTest {
+
 	@Rule
 	private ExpectedException exception = ExpectedException.none();
 
@@ -509,4 +515,52 @@ public class KinesisConfigUtilTest {
 
 		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
 	}
+
+	@Test
+	public void testParseStreamTimestampStartingPositionUsingDefaultFormat() throws Exception {
+		String timestamp = "2020-08-13T09:18:00.0+01:00";
+		Date expectedTimestamp = new SimpleDateFormat(DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT).parse(timestamp);
+
+		Properties consumerProperties = new Properties();
+		consumerProperties.setProperty(STREAM_INITIAL_TIMESTAMP, timestamp);
+
+		Date actualimestamp = KinesisConfigUtil.parseStreamTimestampStartingPosition(consumerProperties);
+
+		assertEquals(expectedTimestamp, actualimestamp);
+	}
+
+	@Test
+	public void testParseStreamTimestampStartingPositionUsingCustomFormat() throws Exception {
+		String format = "yyyy-MM-dd'T'HH:mm";
+		String timestamp = "2020-08-13T09:23";
+		Date expectedTimestamp = new SimpleDateFormat(format).parse(timestamp);
+
+		Properties consumerProperties = new Properties();
+		consumerProperties.setProperty(STREAM_INITIAL_TIMESTAMP, timestamp);
+		consumerProperties.setProperty(STREAM_TIMESTAMP_DATE_FORMAT, format);
+
+		Date actualimestamp = KinesisConfigUtil.parseStreamTimestampStartingPosition(consumerProperties);
+
+		assertEquals(expectedTimestamp, actualimestamp);
+	}
+
+	@Test
+	public void testParseStreamTimestampStartingPositionUsingParseError() {
+		exception.expect(NumberFormatException.class);
+
+		Properties consumerProperties = new Properties();
+		consumerProperties.setProperty(STREAM_INITIAL_TIMESTAMP, "bad");
+
+		KinesisConfigUtil.parseStreamTimestampStartingPosition(consumerProperties);
+	}
+
+	@Test
+	public void testParseStreamTimestampStartingPositionIllegalArgumentException() {
+		exception.expect(IllegalArgumentException.class);
+
+		Properties consumerProperties = new Properties();
+
+		KinesisConfigUtil.parseStreamTimestampStartingPosition(consumerProperties);
+	}
+
 }