You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/08/11 09:09:58 UTC

[GitHub] [flink] tzulitai commented on a change in pull request #12881: [FLINK-18512][kinesis] Introducing RecordPublisher. Refactor ShardConsumer to use PollingRecordPublisher

tzulitai commented on a change in pull request #12881:
URL: https://github.com/apache/flink/pull/12881#discussion_r468417898



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.function.Consumer;
+
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.LATEST;
+import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
+import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
+
+/**
+ * A {@link RecordPublisher} that will read records from Kinesis and forward them to the subscriber.
+ * Records are consumed by polling the GetRecords KDS API using a ShardIterator.
+ */
+@Internal
+public class PollingRecordPublisher implements RecordPublisher {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PollingRecordPublisher.class);
+
+	private final PollingRecordPublisherMetricsReporter metricsReporter;
+
+	private final KinesisProxyInterface kinesisProxy;
+
+	private final StreamShardHandle subscribedShard;
+
+	private String nextShardItr;
+
+	private final int maxNumberOfRecordsPerFetch;
+
+	private final long expiredIteratorBackoffMillis;
+
+	/**
+	 * A Polling implementation of {@link RecordPublisher} that polls kinesis for records.
+	 * The following KDS services are used: GetRecords and GetShardIterator.
+	 *
+	 * @param subscribedShard the shard in which to consume from
+	 * @param metricsReporter a metric reporter used to output metrics
+	 * @param kinesisProxy the proxy used to communicate with kinesis
+	 * @param maxNumberOfRecordsPerFetch the maximum number of records to retrieve per batch
+	 * @param expiredIteratorBackoffMillis the duration to sleep in the event of an {@link ExpiredIteratorException}
+	 */
+	public PollingRecordPublisher(

Review comment:
       constructor can be package-private

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.function.Consumer;
+
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.LATEST;
+import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
+import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
+
+/**
+ * A {@link RecordPublisher} that will read records from Kinesis and forward them to the subscriber.
+ * Records are consumed by polling the GetRecords KDS API using a ShardIterator.
+ */
+@Internal
+public class PollingRecordPublisher implements RecordPublisher {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PollingRecordPublisher.class);
+
+	private final PollingRecordPublisherMetricsReporter metricsReporter;
+
+	private final KinesisProxyInterface kinesisProxy;
+
+	private final StreamShardHandle subscribedShard;
+
+	private String nextShardItr;
+
+	private final int maxNumberOfRecordsPerFetch;
+
+	private final long expiredIteratorBackoffMillis;
+
+	/**
+	 * A Polling implementation of {@link RecordPublisher} that polls kinesis for records.
+	 * The following KDS services are used: GetRecords and GetShardIterator.
+	 *
+	 * @param subscribedShard the shard in which to consume from
+	 * @param metricsReporter a metric reporter used to output metrics
+	 * @param kinesisProxy the proxy used to communicate with kinesis
+	 * @param maxNumberOfRecordsPerFetch the maximum number of records to retrieve per batch
+	 * @param expiredIteratorBackoffMillis the duration to sleep in the event of an {@link ExpiredIteratorException}
+	 */
+	public PollingRecordPublisher(
+			final StreamShardHandle subscribedShard,
+			final PollingRecordPublisherMetricsReporter metricsReporter,
+			final KinesisProxyInterface kinesisProxy,
+			final int maxNumberOfRecordsPerFetch,
+			final long expiredIteratorBackoffMillis) {
+		this.subscribedShard = subscribedShard;
+		this.metricsReporter = metricsReporter;
+		this.kinesisProxy = kinesisProxy;
+		this.maxNumberOfRecordsPerFetch = maxNumberOfRecordsPerFetch;
+		this.expiredIteratorBackoffMillis = expiredIteratorBackoffMillis;
+	}
+
+	@Override
+	public RecordPublisherRunResult run(final StartingPosition startingPosition, final Consumer<RecordBatch> consumer) throws InterruptedException {
+		return run(startingPosition, consumer, maxNumberOfRecordsPerFetch);
+	}
+
+	public RecordPublisherRunResult run(final StartingPosition startingPosition, final Consumer<RecordBatch> consumer, int maxNumberOfRecords) throws InterruptedException {
+		if (nextShardItr == null) {
+			nextShardItr = getShardIterator(startingPosition);
+		}
+
+		if (nextShardItr == null) {
+			return COMPLETE;
+		}
+
+		metricsReporter.setMaxNumberOfRecordsPerFetch(maxNumberOfRecords);
+
+		GetRecordsResult result = getRecords(nextShardItr, startingPosition, maxNumberOfRecords);
+
+		consumer.accept(new RecordBatch(result.getRecords(), subscribedShard, result.getMillisBehindLatest()));
+
+		nextShardItr = result.getNextShardIterator();
+		return nextShardItr == null ? COMPLETE : INCOMPLETE;
+	}
+
+	/**
+	 * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected
+	 * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on
+	 * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should
+	 * be used for the next call to this method.
+	 *
+	 * <p>Note: it is important that this method is not called again before all the records from the last result have been
+	 * fully collected with {@code ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise
+	 * {@code ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to
+	 * incorrect shard iteration if the iterator had to be refreshed.
+	 *
+	 * @param shardItr shard iterator to use
+	 * @param startingPosition the position in the stream in which to consume from
+	 * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt
+	 * @return get records result
+	 */
+	private GetRecordsResult getRecords(String shardItr, final StartingPosition startingPosition, int maxNumberOfRecords) throws InterruptedException {
+		GetRecordsResult getRecordsResult = null;
+		while (getRecordsResult == null) {
+			try {
+				getRecordsResult = kinesisProxy.getRecords(shardItr, maxNumberOfRecords);
+			} catch (ExpiredIteratorException eiEx) {
+				LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
+					" refreshing the iterator ...", shardItr, subscribedShard);
+
+				shardItr = getShardIterator(startingPosition);
+
+				// sleep for the fetch interval before the next getRecords attempt with the refreshed iterator
+				if (expiredIteratorBackoffMillis != 0) {
+					Thread.sleep(expiredIteratorBackoffMillis);
+				}
+			}
+		}
+		return getRecordsResult;
+	}
+
+	/**
+	 * Returns a shard iterator for the given {@link SequenceNumber}.
+	 *
+	 * @return shard iterator
+	 */
+	@Nullable
+	private String getShardIterator(final StartingPosition startingPosition) throws InterruptedException {
+		if (startingPosition.getShardIteratorType() == LATEST && subscribedShard.isClosed()) {
+			return null;
+		}
+
+		return kinesisProxy.getShardIterator(

Review comment:
       Double checking for my understanding:
   Does this return `null` for an out-of-bounds position?
   Say, the given timestamp is out of bounds or the sequence number is non-existent.

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.function.Consumer;
+
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.LATEST;
+import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
+import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
+
+/**
+ * A {@link RecordPublisher} that will read records from Kinesis and forward them to the subscriber.
+ * Records are consumed by polling the GetRecords KDS API using a ShardIterator.
+ */
+@Internal
+public class PollingRecordPublisher implements RecordPublisher {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PollingRecordPublisher.class);
+
+	private final PollingRecordPublisherMetricsReporter metricsReporter;
+
+	private final KinesisProxyInterface kinesisProxy;
+
+	private final StreamShardHandle subscribedShard;
+
+	private String nextShardItr;
+
+	private final int maxNumberOfRecordsPerFetch;
+
+	private final long expiredIteratorBackoffMillis;
+
+	/**
+	 * A Polling implementation of {@link RecordPublisher} that polls kinesis for records.
+	 * The following KDS services are used: GetRecords and GetShardIterator.
+	 *
+	 * @param subscribedShard the shard in which to consume from
+	 * @param metricsReporter a metric reporter used to output metrics
+	 * @param kinesisProxy the proxy used to communicate with kinesis
+	 * @param maxNumberOfRecordsPerFetch the maximum number of records to retrieve per batch
+	 * @param expiredIteratorBackoffMillis the duration to sleep in the event of an {@link ExpiredIteratorException}
+	 */
+	public PollingRecordPublisher(
+			final StreamShardHandle subscribedShard,
+			final PollingRecordPublisherMetricsReporter metricsReporter,
+			final KinesisProxyInterface kinesisProxy,
+			final int maxNumberOfRecordsPerFetch,
+			final long expiredIteratorBackoffMillis) {
+		this.subscribedShard = subscribedShard;

Review comment:
       Could you add simple sanity checks on these arguments, such as `Preconditions.checkNonNull`?

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
##########
@@ -53,283 +49,111 @@
  */
 @Internal
 public class ShardConsumer<T> implements Runnable {

Review comment:
       With the record polling refactored out, could you rephrase the class-level Javadoc of this class?
   Essentially this is now a thread that just subscribes to a `RecordPublisher` which publishes records from a single Kinesis shard.

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
##########
@@ -379,49 +206,18 @@ private void deserializeRecordForCollectionAndUpdateState(UserRecord record)
 	}
 
 	/**
-	 * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected
-	 * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on
-	 * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should
-	 * be used for the next call to this method.
+	 * Filters out aggregated records that have previously been processed.
+	 * This method is to support restarting from a partially consumed aggregated sequence number.
 	 *
-	 * <p>Note: it is important that this method is not called again before all the records from the last result have been
-	 * fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise
-	 * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to
-	 * incorrect shard iteration if the iterator had to be refreshed.
-	 *
-	 * @param shardItr shard iterator to use
-	 * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt
-	 * @return get records result
-	 * @throws InterruptedException
+	 * @param record the record to filter
+	 * @return {@code true} if the record should be retained
 	 */
-	private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws Exception {
-		GetRecordsResult getRecordsResult = null;
-		while (getRecordsResult == null) {
-			try {
-				getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords);
-
-				// Update millis behind latest so it gets reported by the millisBehindLatest gauge
-				Long millisBehindLatest = getRecordsResult.getMillisBehindLatest();
-				if (millisBehindLatest != null) {
-					shardMetricsReporter.setMillisBehindLatest(millisBehindLatest);
-				}
-			} catch (ExpiredIteratorException eiEx) {
-				LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
-					" refreshing the iterator ...", shardItr, subscribedShard);
-
-				shardItr = getShardIterator(lastSequenceNum);
-
-				// sleep for the fetch interval before the next getRecords attempt with the refreshed iterator
-				if (fetchIntervalMillis != 0) {
-					Thread.sleep(fetchIntervalMillis);
-				}
-			}
+	private boolean filterDeaggregatedRecord(final UserRecord record) {
+		if (lastSequenceNum.isAggregated()) {
+			return !record.getSequenceNumber().equals(lastSequenceNum.getSequenceNumber()) ||
+				record.getSubSequenceNumber() > lastSequenceNum.getSubSequenceNumber();
 		}
-		return getRecordsResult;
-	}
 
-	@SuppressWarnings("unchecked")
-	protected static List<UserRecord> deaggregateRecords(List<Record> records, String startingHashKey, String endingHashKey) {
-		return UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey));
+		return true;

Review comment:
       Just for easier code readability, can we rewrite this as:
   
   ```
   if (!lastSequenceNum.isAggregated()) {
       return true;
   }
   
   return !record.getSequenceNumber().equals(lastSequenceNum.getSequenceNumber()) ||
   				record.getSubSequenceNumber() > lastSequenceNum.getSubSequenceNumber();
   ```
   
   I've found early returns always helps with understanding these :)

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordBatch.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher;
+
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
+import com.amazonaws.services.kinesis.model.Record;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+import java.util.List;
+
+/**
+ * A batch of UserRecords received from Kinesis.
+ * Input records are de-aggregated using KCL 1.x library.
+ * It is expected that AWS SDK v2.x messages are converted to KCL 1.x {@link UserRecord}.
+ */
+public class RecordBatch {
+
+	private final int aggregatedRecordSize;
+
+	private final List<UserRecord> deaggregatedRecords;
+
+	private final long totalSizeInBytes;
+
+	private final Long millisBehindLatest;
+
+	public RecordBatch(final List<Record> records, final StreamShardHandle subscribedShard, final Long millisBehindLatest) {
+		this.aggregatedRecordSize = records.size();

Review comment:
       Argument sanity / precondition checks.

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/AdaptivePollingRecordPublisher.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
+import org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+
+import java.util.function.Consumer;
+
+/**
+ * An adaptive record publisher to add a dynamic loop delay and batch read size for {@link PollingRecordPublisher}.
+ * Kinesis Streams have quotas on the transactions per second, and throughout. This class attempts to balance
+ * quotas and mitigate back off errors.
+ */
+@Internal
+public class AdaptivePollingRecordPublisher extends PollingRecordPublisher {
+	// AWS Kinesis has a read limit of 2 Mb/sec
+	// https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html
+	private static final long KINESIS_SHARD_BYTES_PER_SECOND_LIMIT = 2 * 1024L * 1024L;
+
+	private int lastRecordBatchSize = 0;
+
+	private long lastRecordBatchSizeInBytes = 0;
+
+	private long processingStartTimeNanos = System.nanoTime();
+
+	private int maxNumberOfRecordsPerFetch;
+
+	private final long fetchIntervalMillis;
+
+	private final PollingRecordPublisherMetricsReporter metricsReporter;
+
+	public AdaptivePollingRecordPublisher(

Review comment:
       constructor can be package-private

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisher.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+
+import java.util.function.Consumer;
+
+/**
+ * A {@code RecordPublisher} will consume records from an external stream and deliver them to the registered subscriber.
+ */
+@Internal
+public interface RecordPublisher {
+
+	/**
+	 * Run the record publisher. Records will be consumed from the stream and published to the consumer.
+	 * The number of batches retrieved by a single invocation will vary based on implementation.
+	 *
+	 * @param startingPosition the position in the stream from which to consume
+	 * @param recordConsumer the record consumer in which to output records
+	 * @return a status enum to represent whether a shard has been fully consumed
+	 * @throws InterruptedException
+	 */
+	RecordPublisherRunResult run(StartingPosition startingPosition, Consumer<RecordBatch> recordConsumer) throws InterruptedException;

Review comment:
       I found that the argument `StartingPosition` was slightly confusing to be passed in here, since from the implementations it seem like this `startingPosition` is only ever used on the first getRecords call, but with this interface we would be passing in the starting position on every invocation.
   
   It's also slightly more error-prone for the call site of `RecordPublisher`s, imagine if the call site continued to invoke `run` with a starting position, after the previous attempt already returned `COMPLETE` as a result.
   
   As a food for thought: having the `StartingPosition` passed in via the `RecordPublisherFactory` could potentially be a better option?

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StartingPosition.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+
+import javax.annotation.Nullable;
+
+import java.util.Date;
+
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.LATEST;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.TRIM_HORIZON;
+import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.isSentinelSequenceNumber;
+
+/**
+ * The position in which to start consuming from a stream.
+ */
+public class StartingPosition {

Review comment:
       Mark with `@Internal`

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
##########
@@ -53,283 +49,111 @@
  */
 @Internal
 public class ShardConsumer<T> implements Runnable {
-
-	private static final Logger LOG = LoggerFactory.getLogger(ShardConsumer.class);
-
-	// AWS Kinesis has a read limit of 2 Mb/sec
-	// https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html
-	private static final long KINESIS_SHARD_BYTES_PER_SECOND_LIMIT = 2 * 1024L * 1024L;
-
 	private final KinesisDeserializationSchema<T> deserializer;
 
-	private final KinesisProxyInterface kinesis;
-
 	private final int subscribedShardStateIndex;
 
 	private final KinesisDataFetcher<T> fetcherRef;
 
 	private final StreamShardHandle subscribedShard;
 
-	private int maxNumberOfRecordsPerFetch;
-	private final long fetchIntervalMillis;
-	private final boolean useAdaptiveReads;
+	private final ShardConsumerMetricsReporter shardConsumerMetricsReporter;
 
-	private final ShardMetricsReporter shardMetricsReporter;
+	private StartingPosition startingPosition;
 
 	private SequenceNumber lastSequenceNum;
 
-	private Date initTimestamp;
+	private final RecordPublisher recordPublisher;
 
 	/**
 	 * Creates a shard consumer.
 	 *
 	 * @param fetcherRef reference to the owning fetcher
+	 * @param recordPublisher the record publisher used to read records from kinesis
 	 * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to
 	 * @param subscribedShard the shard this consumer is subscribed to
 	 * @param lastSequenceNum the sequence number in the shard to start consuming
-	 * @param kinesis the proxy instance to interact with Kinesis
-	 * @param shardMetricsReporter the reporter to report metrics to
+	 * @param shardConsumerMetricsReporter the reporter to report metrics to
+	 * @param shardDeserializer used to deserialize incoming records
 	 */
 	public ShardConsumer(KinesisDataFetcher<T> fetcherRef,
+						RecordPublisher recordPublisher,
 						Integer subscribedShardStateIndex,
 						StreamShardHandle subscribedShard,
 						SequenceNumber lastSequenceNum,
-						KinesisProxyInterface kinesis,
-						ShardMetricsReporter shardMetricsReporter,
+						ShardConsumerMetricsReporter shardConsumerMetricsReporter,
 						KinesisDeserializationSchema<T> shardDeserializer) {
 		this.fetcherRef = checkNotNull(fetcherRef);
+		this.recordPublisher = checkNotNull(recordPublisher);
 		this.subscribedShardStateIndex = checkNotNull(subscribedShardStateIndex);
 		this.subscribedShard = checkNotNull(subscribedShard);
+		this.shardConsumerMetricsReporter = checkNotNull(shardConsumerMetricsReporter);
 		this.lastSequenceNum = checkNotNull(lastSequenceNum);
 
-		this.shardMetricsReporter = checkNotNull(shardMetricsReporter);
-
 		checkArgument(
 			!lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()),
 			"Should not start a ShardConsumer if the shard has already been completely read.");
 
 		this.deserializer = shardDeserializer;
 
 		Properties consumerConfig = fetcherRef.getConsumerConfiguration();
-		this.kinesis = kinesis;
-		this.maxNumberOfRecordsPerFetch = Integer.valueOf(consumerConfig.getProperty(
-			ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
-			Integer.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX)));
-		this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty(
-			ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
-			Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
-		this.useAdaptiveReads = Boolean.valueOf(consumerConfig.getProperty(
-			ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS,
-			Boolean.toString(ConsumerConfigConstants.DEFAULT_SHARD_USE_ADAPTIVE_READS)));
-
-		if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
+
+		if (lastSequenceNum.equals(SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
+			Date initTimestamp;
 			String timestamp = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP);
 
 			try {
 				String format = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT,
 					ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT);
 				SimpleDateFormat customDateFormat = new SimpleDateFormat(format);
-				this.initTimestamp = customDateFormat.parse(timestamp);
+				initTimestamp = customDateFormat.parse(timestamp);
 			} catch (IllegalArgumentException | NullPointerException exception) {
 				throw new IllegalArgumentException(exception);
 			} catch (ParseException exception) {
-				this.initTimestamp = new Date((long) (Double.parseDouble(timestamp) * 1000));
+				initTimestamp = new Date((long) (Double.parseDouble(timestamp) * 1000));
 			}
-		} else {
-			this.initTimestamp = null;
-		}
-	}
-
-	/**
-	 * Returns a shard iterator for the given {@link SequenceNumber}.
-	 *
-	 * @return shard iterator
-	 * @throws Exception
-	 */
-	protected String getShardIterator(SequenceNumber sequenceNumber) throws Exception {
 
-		if (isSentinelSequenceNumber(sequenceNumber)) {
-			return getShardIteratorForSentinel(sequenceNumber);
+			startingPosition = StartingPosition.fromTimestamp(initTimestamp);
 		} else {
-			// we will be starting from an actual sequence number (due to restore from failure).
-			return getShardIteratorForRealSequenceNumber(sequenceNumber);
+			startingPosition = StartingPosition.restartFromSequenceNumber(checkNotNull(lastSequenceNum));
 		}
 	}
 
-	protected String getShardIteratorForSentinel(SequenceNumber sentinelSequenceNumber) throws InterruptedException {
-		String nextShardItr;
-
-		if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get())) {
-			// if the shard is already closed, there will be no latest next record to get for this shard
-			if (subscribedShard.isClosed()) {
-				nextShardItr = null;
-			} else {
-				nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null);
-			}
-		} else if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())) {
-			nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null);
-		} else if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
-			nextShardItr = null;
-		} else if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
-			nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_TIMESTAMP.toString(), initTimestamp);
-		} else {
-			throw new RuntimeException("Unknown sentinel type: " + sentinelSequenceNumber);
-		}
-
-		return nextShardItr;
-	}
-
-	protected String getShardIteratorForRealSequenceNumber(SequenceNumber sequenceNumber)
-			throws Exception {
-
-		// if the last sequence number refers to an aggregated record, we need to clean up any dangling sub-records
-		// from the last aggregated record; otherwise, we can simply start iterating from the record right after.
-
-		if (sequenceNumber.isAggregated()) {
-			return getShardIteratorForAggregatedSequenceNumber(sequenceNumber);
-		} else {
-			// the last record was non-aggregated, so we can simply start from the next record
-			return kinesis.getShardIterator(
-					subscribedShard,
-					ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(),
-					sequenceNumber.getSequenceNumber());
-		}
-	}
-
-	protected String getShardIteratorForAggregatedSequenceNumber(SequenceNumber sequenceNumber)
-			throws Exception {
-
-		String itrForLastAggregatedRecord =
-				kinesis.getShardIterator(
-						subscribedShard,
-						ShardIteratorType.AT_SEQUENCE_NUMBER.toString(),
-						sequenceNumber.getSequenceNumber());
-
-		// get only the last aggregated record
-		GetRecordsResult getRecordsResult = getRecords(itrForLastAggregatedRecord, 1);
-
-		List<UserRecord> fetchedRecords = deaggregateRecords(
-				getRecordsResult.getRecords(),
-				subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
-				subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
-
-		long lastSubSequenceNum = sequenceNumber.getSubSequenceNumber();
-		for (UserRecord record : fetchedRecords) {
-			// we have found a dangling sub-record if it has a larger subsequence number
-			// than our last sequence number; if so, collect the record and update state
-			if (record.getSubSequenceNumber() > lastSubSequenceNum) {
-				deserializeRecordForCollectionAndUpdateState(record);
-			}
-		}
-
-		return getRecordsResult.getNextShardIterator();
-	}
-
-	@SuppressWarnings("unchecked")
 	@Override
 	public void run() {
 		try {
-			String nextShardItr = getShardIterator(lastSequenceNum);
+			while (isRunning()) {
+				final RecordPublisherRunResult result = recordPublisher.run(startingPosition, batch -> {
+					for (UserRecord userRecord : batch.getDeaggregatedRecords()) {
+						if (filterDeaggregatedRecord(userRecord)) {
+							deserializeRecordForCollectionAndUpdateState(userRecord);
+						}
+					}
 
-			long processingStartTimeNanos = System.nanoTime();
+					shardConsumerMetricsReporter.setAverageRecordSizeBytes(batch.getAverageRecordSizeBytes());
+					shardConsumerMetricsReporter.setNumberOfAggregatedRecords(batch.getAggregatedRecordSize());
+					shardConsumerMetricsReporter.setNumberOfDeaggregatedRecords(batch.getDeaggregatedRecordSize());
+					ofNullable(batch.getMillisBehindLatest()).ifPresent(shardConsumerMetricsReporter::setMillisBehindLatest);
+				});
 
-			while (isRunning()) {
-				if (nextShardItr == null) {
+				if (result == COMPLETE) {
 					fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
 
 					// we can close this consumer thread once we've reached the end of the subscribed shard
 					break;
 				} else {
-					shardMetricsReporter.setMaxNumberOfRecordsPerFetch(maxNumberOfRecordsPerFetch);
-					GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
-
-					List<Record> aggregatedRecords = getRecordsResult.getRecords();
-					int numberOfAggregatedRecords = aggregatedRecords.size();
-					shardMetricsReporter.setNumberOfAggregatedRecords(numberOfAggregatedRecords);
-
-					// each of the Kinesis records may be aggregated, so we must deaggregate them before proceeding
-					List<UserRecord> fetchedRecords = deaggregateRecords(
-						aggregatedRecords,
-						subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
-						subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
-
-					long recordBatchSizeBytes = 0L;
-					for (UserRecord record : fetchedRecords) {
-						recordBatchSizeBytes += record.getData().remaining();
-						deserializeRecordForCollectionAndUpdateState(record);
-					}
-
-					int numberOfDeaggregatedRecords = fetchedRecords.size();
-					shardMetricsReporter.setNumberOfDeaggregatedRecords(numberOfDeaggregatedRecords);
-
-					nextShardItr = getRecordsResult.getNextShardIterator();
-
-					long adjustmentEndTimeNanos = adjustRunLoopFrequency(processingStartTimeNanos, System.nanoTime());
-					long runLoopTimeNanos = adjustmentEndTimeNanos - processingStartTimeNanos;
-					maxNumberOfRecordsPerFetch = adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), recordBatchSizeBytes, maxNumberOfRecordsPerFetch);
-					shardMetricsReporter.setRunLoopTimeNanos(runLoopTimeNanos);
-					processingStartTimeNanos = adjustmentEndTimeNanos; // for next time through the loop
+					startingPosition = StartingPosition.continueFromSequenceNumber(lastSequenceNum);

Review comment:
       Why does the `ShardConsumer` need to keep track and update the `startingPosition`?
   It feels like this should be an internal state to the record publisher, and also as how I understand it, the record publisher only ever respects the `nextShardItr` that it maintains.

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
##########
@@ -53,283 +49,111 @@
  */
 @Internal
 public class ShardConsumer<T> implements Runnable {
-
-	private static final Logger LOG = LoggerFactory.getLogger(ShardConsumer.class);
-
-	// AWS Kinesis has a read limit of 2 Mb/sec
-	// https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html
-	private static final long KINESIS_SHARD_BYTES_PER_SECOND_LIMIT = 2 * 1024L * 1024L;
-
 	private final KinesisDeserializationSchema<T> deserializer;
 
-	private final KinesisProxyInterface kinesis;
-
 	private final int subscribedShardStateIndex;
 
 	private final KinesisDataFetcher<T> fetcherRef;
 
 	private final StreamShardHandle subscribedShard;
 
-	private int maxNumberOfRecordsPerFetch;
-	private final long fetchIntervalMillis;
-	private final boolean useAdaptiveReads;
+	private final ShardConsumerMetricsReporter shardConsumerMetricsReporter;
 
-	private final ShardMetricsReporter shardMetricsReporter;
+	private StartingPosition startingPosition;
 
 	private SequenceNumber lastSequenceNum;
 
-	private Date initTimestamp;
+	private final RecordPublisher recordPublisher;
 
 	/**
 	 * Creates a shard consumer.
 	 *
 	 * @param fetcherRef reference to the owning fetcher
+	 * @param recordPublisher the record publisher used to read records from kinesis
 	 * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to
 	 * @param subscribedShard the shard this consumer is subscribed to
 	 * @param lastSequenceNum the sequence number in the shard to start consuming
-	 * @param kinesis the proxy instance to interact with Kinesis
-	 * @param shardMetricsReporter the reporter to report metrics to
+	 * @param shardConsumerMetricsReporter the reporter to report metrics to
+	 * @param shardDeserializer used to deserialize incoming records
 	 */
 	public ShardConsumer(KinesisDataFetcher<T> fetcherRef,
+						RecordPublisher recordPublisher,
 						Integer subscribedShardStateIndex,
 						StreamShardHandle subscribedShard,
 						SequenceNumber lastSequenceNum,
-						KinesisProxyInterface kinesis,
-						ShardMetricsReporter shardMetricsReporter,
+						ShardConsumerMetricsReporter shardConsumerMetricsReporter,
 						KinesisDeserializationSchema<T> shardDeserializer) {
 		this.fetcherRef = checkNotNull(fetcherRef);
+		this.recordPublisher = checkNotNull(recordPublisher);
 		this.subscribedShardStateIndex = checkNotNull(subscribedShardStateIndex);
 		this.subscribedShard = checkNotNull(subscribedShard);
+		this.shardConsumerMetricsReporter = checkNotNull(shardConsumerMetricsReporter);
 		this.lastSequenceNum = checkNotNull(lastSequenceNum);
 
-		this.shardMetricsReporter = checkNotNull(shardMetricsReporter);
-
 		checkArgument(
 			!lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()),
 			"Should not start a ShardConsumer if the shard has already been completely read.");
 
 		this.deserializer = shardDeserializer;
 
 		Properties consumerConfig = fetcherRef.getConsumerConfiguration();
-		this.kinesis = kinesis;
-		this.maxNumberOfRecordsPerFetch = Integer.valueOf(consumerConfig.getProperty(
-			ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
-			Integer.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX)));
-		this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty(
-			ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
-			Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
-		this.useAdaptiveReads = Boolean.valueOf(consumerConfig.getProperty(
-			ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS,
-			Boolean.toString(ConsumerConfigConstants.DEFAULT_SHARD_USE_ADAPTIVE_READS)));
-
-		if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
+
+		if (lastSequenceNum.equals(SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
+			Date initTimestamp;
 			String timestamp = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP);
 
 			try {
 				String format = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT,
 					ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT);
 				SimpleDateFormat customDateFormat = new SimpleDateFormat(format);
-				this.initTimestamp = customDateFormat.parse(timestamp);
+				initTimestamp = customDateFormat.parse(timestamp);
 			} catch (IllegalArgumentException | NullPointerException exception) {
 				throw new IllegalArgumentException(exception);
 			} catch (ParseException exception) {
-				this.initTimestamp = new Date((long) (Double.parseDouble(timestamp) * 1000));
+				initTimestamp = new Date((long) (Double.parseDouble(timestamp) * 1000));
 			}
-		} else {
-			this.initTimestamp = null;
-		}
-	}
-
-	/**
-	 * Returns a shard iterator for the given {@link SequenceNumber}.
-	 *
-	 * @return shard iterator
-	 * @throws Exception
-	 */
-	protected String getShardIterator(SequenceNumber sequenceNumber) throws Exception {
 
-		if (isSentinelSequenceNumber(sequenceNumber)) {
-			return getShardIteratorForSentinel(sequenceNumber);
+			startingPosition = StartingPosition.fromTimestamp(initTimestamp);
 		} else {
-			// we will be starting from an actual sequence number (due to restore from failure).
-			return getShardIteratorForRealSequenceNumber(sequenceNumber);
+			startingPosition = StartingPosition.restartFromSequenceNumber(checkNotNull(lastSequenceNum));
 		}
 	}
 
-	protected String getShardIteratorForSentinel(SequenceNumber sentinelSequenceNumber) throws InterruptedException {

Review comment:
       👍 really nice to see this go away :)

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/AdaptivePollingRecordPublisher.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
+import org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+
+import java.util.function.Consumer;
+
+/**
+ * An adaptive record publisher to add a dynamic loop delay and batch read size for {@link PollingRecordPublisher}.
+ * Kinesis Streams have quotas on the transactions per second, and throughout. This class attempts to balance
+ * quotas and mitigate back off errors.
+ */
+@Internal
+public class AdaptivePollingRecordPublisher extends PollingRecordPublisher {

Review comment:
       👍 really like the approach you took in refactoring this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org