You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/08/24 06:39:48 UTC
[flink] branch master updated: [FLINK-18512][kinesis] Introducing
RecordPublisher. Refactor ShardConsumer to use PollingRecordPublisher
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 5487fcf [FLINK-18512][kinesis] Introducing RecordPublisher. Refactor ShardConsumer to use PollingRecordPublisher
5487fcf is described below
commit 5487fcf2c3451e8ec4a21160f370226a2f4dc1f5
Author: Danny Cranmer <cr...@amazon.com>
AuthorDate: Fri Jul 10 22:12:23 2020 +0100
[FLINK-18512][kinesis] Introducing RecordPublisher. Refactor ShardConsumer to use PollingRecordPublisher
This closes #12881.
---
.../internals/DynamoDBStreamsDataFetcher.java | 15 +-
.../kinesis/internals/KinesisDataFetcher.java | 62 ++--
.../kinesis/internals/ShardConsumer.java | 358 ++++-----------------
.../kinesis/internals/publisher/RecordBatch.java | 90 ++++++
.../internals/publisher/RecordPublisher.java | 57 ++++
.../publisher/RecordPublisherFactory.java | 48 +++
.../polling/AdaptivePollingRecordPublisher.java | 130 ++++++++
.../publisher/polling/PollingRecordPublisher.java | 168 ++++++++++
.../PollingRecordPublisherConfiguration.java | 62 ++++
.../polling/PollingRecordPublisherFactory.java | 88 +++++
... => PollingRecordPublisherMetricsReporter.java} | 47 +--
...rter.java => ShardConsumerMetricsReporter.java} | 55 +---
.../connectors/kinesis/model/StartingPosition.java | 109 +++++++
.../connectors/kinesis/proxy/KinesisProxy.java | 36 +--
.../streaming/connectors/kinesis/util/AWSUtil.java | 24 ++
.../connectors/kinesis/util/KinesisConfigUtil.java | 32 +-
.../kinesis/internals/KinesisDataFetcherTest.java | 2 +-
.../kinesis/internals/ShardConsumerTest.java | 55 ++--
.../internals/publisher/RecordBatchTest.java | 94 ++++++
.../PollingRecordPublisherConfigurationTest.java | 74 +++++
.../polling/PollingRecordPublisherFactoryTest.java | 69 ++++
.../polling/PollingRecordPublisherTest.java | 165 ++++++++++
.../PollingRecordPublisherMetricsReporterTest.java | 72 +++++
.../metrics/ShardConsumerMetricsReporterTest.java | 69 ++++
.../kinesis/model/StartingPositionTest.java | 96 ++++++
.../testutils/FakeKinesisBehavioursFactory.java | 21 +-
.../connectors/kinesis/testutils/TestUtils.java | 23 ++
.../connectors/kinesis/util/AWSUtilTest.java | 33 ++
.../kinesis/util/KinesisConfigUtilTest.java | 54 ++++
29 files changed, 1746 insertions(+), 462 deletions(-)
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java
index e33a0c8..afa1c28 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java
@@ -18,9 +18,10 @@
package org.apache.flink.streaming.connectors.kinesis.internals;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
-import org.apache.flink.streaming.connectors.kinesis.metrics.ShardMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.metrics.ShardConsumerMetricsReporter;
import org.apache.flink.streaming.connectors.kinesis.model.DynamoDBStreamsShardHandle;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
@@ -89,7 +90,7 @@ public class DynamoDBStreamsDataFetcher<T> extends KinesisDataFetcher<T> {
* @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to
* @param handle stream handle
* @param lastSeqNum last sequence number
- * @param shardMetricsReporter the reporter to report metrics to
+ * @param metricGroup the metric group to report metrics to
* @return
*/
@Override
@@ -97,16 +98,16 @@ public class DynamoDBStreamsDataFetcher<T> extends KinesisDataFetcher<T> {
Integer subscribedShardStateIndex,
StreamShardHandle handle,
SequenceNumber lastSeqNum,
- ShardMetricsReporter shardMetricsReporter,
- KinesisDeserializationSchema<T> shardDeserializer) {
+ MetricGroup metricGroup,
+ KinesisDeserializationSchema<T> shardDeserializer) throws InterruptedException {
- return new ShardConsumer(
+ return new ShardConsumer<T>(
this,
+ createRecordPublisher(lastSeqNum, getConsumerConfiguration(), metricGroup, handle),
subscribedShardStateIndex,
handle,
lastSeqNum,
- DynamoDBStreamsProxy.create(getConsumerConfiguration()),
- shardMetricsReporter,
+ new ShardConsumerMetricsReporter(metricGroup),
shardDeserializer);
}
}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 59e2cb1..2be9b1c 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -27,17 +27,22 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisherFactory;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisherFactory;
import org.apache.flink.streaming.connectors.kinesis.metrics.KinesisConsumerMetricConstants;
-import org.apache.flink.streaming.connectors.kinesis.metrics.ShardMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.metrics.ShardConsumerMetricsReporter;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
import org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter;
import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
@@ -181,6 +186,9 @@ public class KinesisDataFetcher<T> {
/** The Kinesis proxy that the fetcher will be using to discover new shards. */
private final KinesisProxyInterface kinesis;
+ /** The factory used to create record publishers that consumer from Kinesis shards. */
+ private final RecordPublisherFactory recordPublisherFactory;
+
/** Thread that executed runFetcher(). */
private volatile Thread mainThread;
@@ -360,6 +368,7 @@ public class KinesisDataFetcher<T> {
this.watermarkTracker = watermarkTracker;
this.kinesisProxyFactory = checkNotNull(kinesisProxyFactory);
this.kinesis = kinesisProxyFactory.create(configProps);
+ this.recordPublisherFactory = createRecordPublisherFactory();
this.consumerMetricGroup = runtimeContext.getMetricGroup()
.addGroup(KinesisConsumerMetricConstants.KINESIS_CONSUMER_METRICS_GROUP);
@@ -389,25 +398,40 @@ public class KinesisDataFetcher<T> {
* @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to
* @param subscribedShard the shard this consumer is subscribed to
* @param lastSequenceNum the sequence number in the shard to start consuming
- * @param shardMetricsReporter the reporter to report metrics to
+ * @param metricGroup the metric group to report metrics to
* @return shard consumer
*/
protected ShardConsumer<T> createShardConsumer(
Integer subscribedShardStateIndex,
StreamShardHandle subscribedShard,
SequenceNumber lastSequenceNum,
- ShardMetricsReporter shardMetricsReporter,
- KinesisDeserializationSchema<T> shardDeserializer) {
+ MetricGroup metricGroup,
+ KinesisDeserializationSchema<T> shardDeserializer) throws InterruptedException {
+
return new ShardConsumer<>(
this,
+ createRecordPublisher(lastSequenceNum, configProps, metricGroup, subscribedShard),
subscribedShardStateIndex,
subscribedShard,
lastSequenceNum,
- this.kinesisProxyFactory.create(configProps),
- shardMetricsReporter,
+ new ShardConsumerMetricsReporter(metricGroup),
shardDeserializer);
}
+ private RecordPublisherFactory createRecordPublisherFactory() {
+ return new PollingRecordPublisherFactory(kinesisProxyFactory);
+ }
+
+ protected RecordPublisher createRecordPublisher(
+ final SequenceNumber sequenceNumber,
+ final Properties configProps,
+ final MetricGroup metricGroup,
+ final StreamShardHandle subscribedShard) throws InterruptedException {
+
+ StartingPosition startingPosition = AWSUtil.getStartingPosition(sequenceNumber, configProps);
+ return recordPublisherFactory.create(startingPosition, configProps, metricGroup, subscribedShard);
+ }
+
/**
* Starts the fetcher. After starting the fetcher, it can only
* be stopped by calling {@link KinesisDataFetcher#shutdownFetcher()}.
@@ -475,7 +499,7 @@ public class KinesisDataFetcher<T> {
seededStateIndex,
streamShardHandle,
subscribedShardsState.get(seededStateIndex).getLastProcessedSequenceNum(),
- registerShardMetrics(consumerMetricGroup, subscribedShardsState.get(seededStateIndex)),
+ registerShardMetricGroup(consumerMetricGroup, subscribedShardsState.get(seededStateIndex)),
shardDeserializationSchema));
}
}
@@ -576,7 +600,7 @@ public class KinesisDataFetcher<T> {
newStateIndex,
newShardState.getStreamShardHandle(),
newShardState.getLastProcessedSequenceNum(),
- registerShardMetrics(consumerMetricGroup, newShardState),
+ registerShardMetricGroup(consumerMetricGroup, newShardState),
shardDeserializationSchema));
}
@@ -1057,29 +1081,16 @@ public class KinesisDataFetcher<T> {
/**
* Registers a metric group associated with the shard id of the provided {@link KinesisStreamShardState shardState}.
*
- * @return a {@link ShardMetricsReporter} that can be used to update metric values
+ * @return a {@link MetricGroup} that can be used to update metric values
*/
- private static ShardMetricsReporter registerShardMetrics(MetricGroup metricGroup, KinesisStreamShardState shardState) {
- ShardMetricsReporter shardMetrics = new ShardMetricsReporter();
-
- MetricGroup streamShardMetricGroup = metricGroup
+ private MetricGroup registerShardMetricGroup(final MetricGroup metricGroup, final KinesisStreamShardState shardState) {
+ return metricGroup
.addGroup(
KinesisConsumerMetricConstants.STREAM_METRICS_GROUP,
shardState.getStreamShardHandle().getStreamName())
.addGroup(
KinesisConsumerMetricConstants.SHARD_METRICS_GROUP,
shardState.getStreamShardHandle().getShard().getShardId());
-
- streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.MILLIS_BEHIND_LATEST_GAUGE, shardMetrics::getMillisBehindLatest);
- streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.MAX_RECORDS_PER_FETCH, shardMetrics::getMaxNumberOfRecordsPerFetch);
- streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.NUM_AGGREGATED_RECORDS_PER_FETCH, shardMetrics::getNumberOfAggregatedRecords);
- streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.NUM_DEAGGREGATED_RECORDS_PER_FETCH, shardMetrics::getNumberOfDeaggregatedRecords);
- streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.AVG_RECORD_SIZE_BYTES, shardMetrics::getAverageRecordSizeBytes);
- streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.BYTES_PER_READ, shardMetrics::getBytesPerRead);
- streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.RUNTIME_LOOP_NANOS, shardMetrics::getRunLoopTimeNanos);
- streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.LOOP_FREQUENCY_HZ, shardMetrics::getLoopFrequencyHz);
- streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.SLEEP_TIME_MILLIS, shardMetrics::getSleepTimeMillis);
- return shardMetrics;
}
// ------------------------------------------------------------------------
@@ -1102,9 +1113,10 @@ public class KinesisDataFetcher<T> {
@VisibleForTesting
protected ExecutorService createShardConsumersThreadPool(final String subtaskName) {
return Executors.newCachedThreadPool(new ThreadFactory() {
+ private final AtomicLong threadCount = new AtomicLong(0);
+
@Override
public Thread newThread(Runnable runnable) {
- final AtomicLong threadCount = new AtomicLong(0);
Thread thread = new Thread(runnable);
thread.setName("shardConsumers-" + subtaskName + "-thread-" + threadCount.getAndIncrement());
thread.setDaemon(true);
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 8e9c88b..d9c0d9d 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -19,256 +19,108 @@ package org.apache.flink.streaming.connectors.kinesis.internals;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.metrics.ShardMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult;
+import org.apache.flink.streaming.connectors.kinesis.metrics.ShardConsumerMetricsReporter;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
-import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
-import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
-import com.amazonaws.services.kinesis.model.GetRecordsResult;
-import com.amazonaws.services.kinesis.model.Record;
-import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.math.BigInteger;
import java.nio.ByteBuffer;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.List;
-import java.util.Properties;
-import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.isSentinelSequenceNumber;
+import static java.util.Optional.ofNullable;
+import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * Thread that does the actual data pulling from AWS Kinesis shards. Each thread is in charge of one Kinesis shard only.
+ * Thread that subscribes to the given {@link RecordPublisher}. Each thread is in charge of one Kinesis shard only.
+ * <p>
+ * A {@link ShardConsumer} is responsible for:
+ * <ul>
+ * <li>Running the {@link RecordPublisher} to consume all records from the subscribed shard</li>
+ * <li>Deserializing and deaggregating incoming records from Kinesis</li>
+ * <li>Logging metrics</li>
+ * <li>Passing the records up to the {@link KinesisDataFetcher}</li>
+ * </ul>
+ * </p>
*/
@Internal
public class ShardConsumer<T> implements Runnable {
-
- private static final Logger LOG = LoggerFactory.getLogger(ShardConsumer.class);
-
- // AWS Kinesis has a read limit of 2 Mb/sec
- // https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html
- private static final long KINESIS_SHARD_BYTES_PER_SECOND_LIMIT = 2 * 1024L * 1024L;
-
private final KinesisDeserializationSchema<T> deserializer;
- private final KinesisProxyInterface kinesis;
-
private final int subscribedShardStateIndex;
private final KinesisDataFetcher<T> fetcherRef;
private final StreamShardHandle subscribedShard;
- private int maxNumberOfRecordsPerFetch;
- private final long fetchIntervalMillis;
- private final boolean useAdaptiveReads;
-
- private final ShardMetricsReporter shardMetricsReporter;
+ private final ShardConsumerMetricsReporter shardConsumerMetricsReporter;
private SequenceNumber lastSequenceNum;
- private Date initTimestamp;
+ private final RecordPublisher recordPublisher;
/**
* Creates a shard consumer.
*
* @param fetcherRef reference to the owning fetcher
+ * @param recordPublisher the record publisher used to read records from kinesis
* @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to
* @param subscribedShard the shard this consumer is subscribed to
* @param lastSequenceNum the sequence number in the shard to start consuming
- * @param kinesis the proxy instance to interact with Kinesis
- * @param shardMetricsReporter the reporter to report metrics to
+ * @param shardConsumerMetricsReporter the reporter to report metrics to
+ * @param shardDeserializer used to deserialize incoming records
*/
public ShardConsumer(KinesisDataFetcher<T> fetcherRef,
+ RecordPublisher recordPublisher,
Integer subscribedShardStateIndex,
StreamShardHandle subscribedShard,
SequenceNumber lastSequenceNum,
- KinesisProxyInterface kinesis,
- ShardMetricsReporter shardMetricsReporter,
+ ShardConsumerMetricsReporter shardConsumerMetricsReporter,
KinesisDeserializationSchema<T> shardDeserializer) {
this.fetcherRef = checkNotNull(fetcherRef);
+ this.recordPublisher = checkNotNull(recordPublisher);
this.subscribedShardStateIndex = checkNotNull(subscribedShardStateIndex);
this.subscribedShard = checkNotNull(subscribedShard);
+ this.shardConsumerMetricsReporter = checkNotNull(shardConsumerMetricsReporter);
this.lastSequenceNum = checkNotNull(lastSequenceNum);
- this.shardMetricsReporter = checkNotNull(shardMetricsReporter);
-
checkArgument(
!lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()),
"Should not start a ShardConsumer if the shard has already been completely read.");
this.deserializer = shardDeserializer;
-
- Properties consumerConfig = fetcherRef.getConsumerConfiguration();
- this.kinesis = kinesis;
- this.maxNumberOfRecordsPerFetch = Integer.valueOf(consumerConfig.getProperty(
- ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
- Integer.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX)));
- this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty(
- ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
- Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
- this.useAdaptiveReads = Boolean.valueOf(consumerConfig.getProperty(
- ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS,
- Boolean.toString(ConsumerConfigConstants.DEFAULT_SHARD_USE_ADAPTIVE_READS)));
-
- if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
- String timestamp = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP);
-
- try {
- String format = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT,
- ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT);
- SimpleDateFormat customDateFormat = new SimpleDateFormat(format);
- this.initTimestamp = customDateFormat.parse(timestamp);
- } catch (IllegalArgumentException | NullPointerException exception) {
- throw new IllegalArgumentException(exception);
- } catch (ParseException exception) {
- this.initTimestamp = new Date((long) (Double.parseDouble(timestamp) * 1000));
- }
- } else {
- this.initTimestamp = null;
- }
- }
-
- /**
- * Returns a shard iterator for the given {@link SequenceNumber}.
- *
- * @return shard iterator
- * @throws Exception
- */
- protected String getShardIterator(SequenceNumber sequenceNumber) throws Exception {
-
- if (isSentinelSequenceNumber(sequenceNumber)) {
- return getShardIteratorForSentinel(sequenceNumber);
- } else {
- // we will be starting from an actual sequence number (due to restore from failure).
- return getShardIteratorForRealSequenceNumber(sequenceNumber);
- }
- }
-
- protected String getShardIteratorForSentinel(SequenceNumber sentinelSequenceNumber) throws InterruptedException {
- String nextShardItr;
-
- if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get())) {
- // if the shard is already closed, there will be no latest next record to get for this shard
- if (subscribedShard.isClosed()) {
- nextShardItr = null;
- } else {
- nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null);
- }
- } else if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())) {
- nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null);
- } else if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
- nextShardItr = null;
- } else if (sentinelSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
- nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_TIMESTAMP.toString(), initTimestamp);
- } else {
- throw new RuntimeException("Unknown sentinel type: " + sentinelSequenceNumber);
- }
-
- return nextShardItr;
- }
-
- protected String getShardIteratorForRealSequenceNumber(SequenceNumber sequenceNumber)
- throws Exception {
-
- // if the last sequence number refers to an aggregated record, we need to clean up any dangling sub-records
- // from the last aggregated record; otherwise, we can simply start iterating from the record right after.
-
- if (sequenceNumber.isAggregated()) {
- return getShardIteratorForAggregatedSequenceNumber(sequenceNumber);
- } else {
- // the last record was non-aggregated, so we can simply start from the next record
- return kinesis.getShardIterator(
- subscribedShard,
- ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(),
- sequenceNumber.getSequenceNumber());
- }
- }
-
- protected String getShardIteratorForAggregatedSequenceNumber(SequenceNumber sequenceNumber)
- throws Exception {
-
- String itrForLastAggregatedRecord =
- kinesis.getShardIterator(
- subscribedShard,
- ShardIteratorType.AT_SEQUENCE_NUMBER.toString(),
- sequenceNumber.getSequenceNumber());
-
- // get only the last aggregated record
- GetRecordsResult getRecordsResult = getRecords(itrForLastAggregatedRecord, 1);
-
- List<UserRecord> fetchedRecords = deaggregateRecords(
- getRecordsResult.getRecords(),
- subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
- subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
-
- long lastSubSequenceNum = sequenceNumber.getSubSequenceNumber();
- for (UserRecord record : fetchedRecords) {
- // we have found a dangling sub-record if it has a larger subsequence number
- // than our last sequence number; if so, collect the record and update state
- if (record.getSubSequenceNumber() > lastSubSequenceNum) {
- deserializeRecordForCollectionAndUpdateState(record);
- }
- }
-
- return getRecordsResult.getNextShardIterator();
}
- @SuppressWarnings("unchecked")
@Override
public void run() {
try {
- String nextShardItr = getShardIterator(lastSequenceNum);
+ while (isRunning()) {
+ final RecordPublisherRunResult result = recordPublisher.run(batch -> {
+ for (UserRecord userRecord : batch.getDeaggregatedRecords()) {
+ if (filterDeaggregatedRecord(userRecord)) {
+ deserializeRecordForCollectionAndUpdateState(userRecord);
+ }
+ }
- long processingStartTimeNanos = System.nanoTime();
+ shardConsumerMetricsReporter.setAverageRecordSizeBytes(batch.getAverageRecordSizeBytes());
+ shardConsumerMetricsReporter.setNumberOfAggregatedRecords(batch.getAggregatedRecordSize());
+ shardConsumerMetricsReporter.setNumberOfDeaggregatedRecords(batch.getDeaggregatedRecordSize());
+ ofNullable(batch.getMillisBehindLatest()).ifPresent(shardConsumerMetricsReporter::setMillisBehindLatest);
- while (isRunning()) {
- if (nextShardItr == null) {
+ return lastSequenceNum;
+ });
+
+ if (result == COMPLETE) {
fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
// we can close this consumer thread once we've reached the end of the subscribed shard
break;
- } else {
- shardMetricsReporter.setMaxNumberOfRecordsPerFetch(maxNumberOfRecordsPerFetch);
- GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
-
- List<Record> aggregatedRecords = getRecordsResult.getRecords();
- int numberOfAggregatedRecords = aggregatedRecords.size();
- shardMetricsReporter.setNumberOfAggregatedRecords(numberOfAggregatedRecords);
-
- // each of the Kinesis records may be aggregated, so we must deaggregate them before proceeding
- List<UserRecord> fetchedRecords = deaggregateRecords(
- aggregatedRecords,
- subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
- subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
-
- long recordBatchSizeBytes = 0L;
- for (UserRecord record : fetchedRecords) {
- recordBatchSizeBytes += record.getData().remaining();
- deserializeRecordForCollectionAndUpdateState(record);
- }
-
- int numberOfDeaggregatedRecords = fetchedRecords.size();
- shardMetricsReporter.setNumberOfDeaggregatedRecords(numberOfDeaggregatedRecords);
-
- nextShardItr = getRecordsResult.getNextShardIterator();
-
- long adjustmentEndTimeNanos = adjustRunLoopFrequency(processingStartTimeNanos, System.nanoTime());
- long runLoopTimeNanos = adjustmentEndTimeNanos - processingStartTimeNanos;
- maxNumberOfRecordsPerFetch = adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), recordBatchSizeBytes, maxNumberOfRecordsPerFetch);
- shardMetricsReporter.setRunLoopTimeNanos(runLoopTimeNanos);
- processingStartTimeNanos = adjustmentEndTimeNanos; // for next time through the loop
}
}
} catch (Throwable t) {
@@ -277,59 +129,9 @@ public class ShardConsumer<T> implements Runnable {
}
/**
- * Adjusts loop timing to match target frequency if specified.
- * @param processingStartTimeNanos The start time of the run loop "work"
- * @param processingEndTimeNanos The end time of the run loop "work"
- * @return The System.nanoTime() after the sleep (if any)
- * @throws InterruptedException
- */
- protected long adjustRunLoopFrequency(long processingStartTimeNanos, long processingEndTimeNanos)
- throws InterruptedException {
- long endTimeNanos = processingEndTimeNanos;
- if (fetchIntervalMillis != 0) {
- long processingTimeNanos = processingEndTimeNanos - processingStartTimeNanos;
- long sleepTimeMillis = fetchIntervalMillis - (processingTimeNanos / 1_000_000);
- if (sleepTimeMillis > 0) {
- Thread.sleep(sleepTimeMillis);
- endTimeNanos = System.nanoTime();
- shardMetricsReporter.setSleepTimeMillis(sleepTimeMillis);
- }
- }
- return endTimeNanos;
- }
-
- /**
- * Calculates how many records to read each time through the loop based on a target throughput
- * and the measured frequenecy of the loop.
- * @param runLoopTimeNanos The total time of one pass through the loop
- * @param numRecords The number of records of the last read operation
- * @param recordBatchSizeBytes The total batch size of the last read operation
- * @param maxNumberOfRecordsPerFetch The current maxNumberOfRecordsPerFetch
- */
- private int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, long recordBatchSizeBytes,
- int maxNumberOfRecordsPerFetch) {
- if (useAdaptiveReads && numRecords != 0 && runLoopTimeNanos != 0) {
- long averageRecordSizeBytes = recordBatchSizeBytes / numRecords;
- // Adjust number of records to fetch from the shard depending on current average record size
- // to optimize 2 Mb / sec read limits
- double loopFrequencyHz = 1000000000.0d / runLoopTimeNanos;
- double bytesPerRead = KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / loopFrequencyHz;
- maxNumberOfRecordsPerFetch = (int) (bytesPerRead / averageRecordSizeBytes);
- // Ensure the value is greater than 0 and not more than 10000L
- maxNumberOfRecordsPerFetch = Math.max(1, Math.min(maxNumberOfRecordsPerFetch, ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX));
-
- // Set metrics
- shardMetricsReporter.setAverageRecordSizeBytes(averageRecordSizeBytes);
- shardMetricsReporter.setLoopFrequencyHz(loopFrequencyHz);
- shardMetricsReporter.setBytesPerRead(bytesPerRead);
- }
- return maxNumberOfRecordsPerFetch;
- }
-
- /**
* The loop in run() checks this before fetching next batch of records. Since this runnable will be executed
- * by the ExecutorService {@link KinesisDataFetcher#shardConsumersExecutor}, the only way to close down this thread
- * would be by calling shutdownNow() on {@link KinesisDataFetcher#shardConsumersExecutor} and let the executor service
+ * by the ExecutorService {@code KinesisDataFetcher#shardConsumersExecutor}, the only way to close down this thread
+ * would be by calling shutdownNow() on {@code KinesisDataFetcher#shardConsumersExecutor} and let the executor service
* interrupt all currently running {@link ShardConsumer}s.
*/
private boolean isRunning() {
@@ -338,18 +140,16 @@ public class ShardConsumer<T> implements Runnable {
/**
* Deserializes a record for collection, and accordingly updates the shard state in the fetcher. The last
- * successfully collected sequence number in this shard consumer is also updated so that
- * {@link ShardConsumer#getRecords(String, int)} may be able to use the correct sequence number to refresh shard
+ * successfully collected sequence number in this shard consumer is also updated so that a
+ * {@link RecordPublisher} may be able to use the correct sequence number to refresh shard
* iterators if necessary.
*
* <p>Note that the server-side Kinesis timestamp is attached to the record when collected. When the
* user programs uses {@link TimeCharacteristic#EventTime}, this timestamp will be used by default.
*
* @param record record to deserialize and collect
- * @throws IOException
*/
- private void deserializeRecordForCollectionAndUpdateState(UserRecord record)
- throws IOException {
+ private void deserializeRecordForCollectionAndUpdateState(final UserRecord record) {
ByteBuffer recordData = record.getData();
byte[] dataBytes = new byte[recordData.remaining()];
@@ -357,13 +157,18 @@ public class ShardConsumer<T> implements Runnable {
final long approxArrivalTimestamp = record.getApproximateArrivalTimestamp().getTime();
- final T value = deserializer.deserialize(
- dataBytes,
- record.getPartitionKey(),
- record.getSequenceNumber(),
- approxArrivalTimestamp,
- subscribedShard.getStreamName(),
- subscribedShard.getShard().getShardId());
+ final T value;
+ try {
+ value = deserializer.deserialize(
+ dataBytes,
+ record.getPartitionKey(),
+ record.getSequenceNumber(),
+ approxArrivalTimestamp,
+ subscribedShard.getStreamName(),
+ subscribedShard.getShard().getShardId());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
SequenceNumber collectedSequenceNumber = (record.isAggregated())
? new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber())
@@ -375,53 +180,22 @@ public class ShardConsumer<T> implements Runnable {
subscribedShardStateIndex,
collectedSequenceNumber);
- lastSequenceNum = collectedSequenceNumber;
+ this.lastSequenceNum = collectedSequenceNumber;
}
/**
- * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected
- * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on
- * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should
- * be used for the next call to this method.
- *
- * <p>Note: it is important that this method is not called again before all the records from the last result have been
- * fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise
- * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to
- * incorrect shard iteration if the iterator had to be refreshed.
+ * Filters out aggregated records that have previously been processed.
+ * This method is to support restarting from a partially consumed aggregated sequence number.
*
- * @param shardItr shard iterator to use
- * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt
- * @return get records result
- * @throws InterruptedException
+ * @param record the record to filter
+ * @return {@code true} if the record should be retained
*/
- private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws Exception {
- GetRecordsResult getRecordsResult = null;
- while (getRecordsResult == null) {
- try {
- getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords);
-
- // Update millis behind latest so it gets reported by the millisBehindLatest gauge
- Long millisBehindLatest = getRecordsResult.getMillisBehindLatest();
- if (millisBehindLatest != null) {
- shardMetricsReporter.setMillisBehindLatest(millisBehindLatest);
- }
- } catch (ExpiredIteratorException eiEx) {
- LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
- " refreshing the iterator ...", shardItr, subscribedShard);
-
- shardItr = getShardIterator(lastSequenceNum);
-
- // sleep for the fetch interval before the next getRecords attempt with the refreshed iterator
- if (fetchIntervalMillis != 0) {
- Thread.sleep(fetchIntervalMillis);
- }
- }
+ private boolean filterDeaggregatedRecord(final UserRecord record) {
+ if (!lastSequenceNum.isAggregated()) {
+ return true;
}
- return getRecordsResult;
- }
- @SuppressWarnings("unchecked")
- protected static List<UserRecord> deaggregateRecords(List<Record> records, String startingHashKey, String endingHashKey) {
- return UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey));
+ return !record.getSequenceNumber().equals(lastSequenceNum.getSequenceNumber()) ||
+ record.getSubSequenceNumber() > lastSequenceNum.getSubSequenceNumber();
}
}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordBatch.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordBatch.java
new file mode 100644
index 0000000..f6ce271
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordBatch.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.util.Preconditions;
+
+import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
+import com.amazonaws.services.kinesis.model.Record;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+import java.util.List;
+
+/**
+ * A batch of UserRecords received from Kinesis.
+ * Input records are de-aggregated using KCL 1.x library.
+ * It is expected that AWS SDK v2.x messages are converted to KCL 1.x {@link UserRecord}.
+ */
+@Internal
+public class RecordBatch {
+
+ private final int aggregatedRecordSize;
+
+ private final List<UserRecord> deaggregatedRecords;
+
+ private final long totalSizeInBytes;
+
+ private final Long millisBehindLatest;
+
+ public RecordBatch(
+ final List<Record> records,
+ final StreamShardHandle subscribedShard,
+ @Nullable final Long millisBehindLatest) {
+ Preconditions.checkNotNull(subscribedShard);
+ this.aggregatedRecordSize = Preconditions.checkNotNull(records).size();
+ this.deaggregatedRecords = deaggregateRecords(records, subscribedShard);
+ this.totalSizeInBytes = this.deaggregatedRecords.stream().mapToInt(r -> r.getData().remaining()).sum();
+ this.millisBehindLatest = millisBehindLatest;
+ }
+
+ public int getAggregatedRecordSize() {
+ return aggregatedRecordSize;
+ }
+
+ public int getDeaggregatedRecordSize() {
+ return deaggregatedRecords.size();
+ }
+
+ public List<UserRecord> getDeaggregatedRecords() {
+ return deaggregatedRecords;
+ }
+
+ public long getTotalSizeInBytes() {
+ return totalSizeInBytes;
+ }
+
+ public long getAverageRecordSizeBytes() {
+ return deaggregatedRecords.isEmpty() ? 0 : getTotalSizeInBytes() / getDeaggregatedRecordSize();
+ }
+
+ @Nullable
+ public Long getMillisBehindLatest() {
+ return millisBehindLatest;
+ }
+
+ private List<UserRecord> deaggregateRecords(final List<Record> records, final StreamShardHandle subscribedShard) {
+ BigInteger start = new BigInteger(subscribedShard.getShard().getHashKeyRange().getStartingHashKey());
+ BigInteger end = new BigInteger(subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
+
+ return UserRecord.deaggregate(records, start, end);
+ }
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisher.java
new file mode 100644
index 0000000..29e4fb4
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisher.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+
+/**
+ * A {@code RecordPublisher} will consume records from an external stream and deliver them to the registered subscriber.
+ */
+@Internal
+public interface RecordPublisher {
+
+ /**
+ * Run the record publisher. Records will be consumed from the stream and published to the consumer.
+ * The number of batches retrieved by a single invocation will vary based on implementation.
+ *
+ * @param recordBatchConsumer the record batch consumer in which to output records
+ * @return a status enum to represent whether a shard has been fully consumed
+ * @throws InterruptedException
+ */
+ RecordPublisherRunResult run(RecordBatchConsumer recordBatchConsumer) throws InterruptedException;
+
+ /**
+ * A status enum to represent whether a shard has been fully consumed.
+ */
+ enum RecordPublisherRunResult {
+ /** There are no more records to consume from this shard. */
+ COMPLETE,
+
+ /** There are more records to consume from this shard. */
+ INCOMPLETE
+ }
+
+ /**
+ * An interface used to collect record batches, and reply with the latest consumed sequence number.
+ */
+ interface RecordBatchConsumer {
+
+ SequenceNumber accept(RecordBatch recordBatch);
+ }
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisherFactory.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisherFactory.java
new file mode 100644
index 0000000..0dcd0cb
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisherFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import java.util.Properties;
+
+/**
+ * A factory interface used to create instances of {@link RecordPublisher}.
+ */
+@Internal
+public interface RecordPublisherFactory {
+
+ /**
+ * Create a {@link RecordPublisher}.
+ *
+ * @param startingPosition the position in the shard to start consuming records from
+ * @param consumerConfig the properties used to configure the {@link RecordPublisher}.
+ * @param metricGroup the {@link MetricGroup} used to report metrics to
+ * @param streamShardHandle the stream shard in which to consume from
+ * @return the constructed {@link RecordPublisher}
+ */
+ RecordPublisher create(
+ StartingPosition startingPosition,
+ Properties consumerConfig,
+ MetricGroup metricGroup,
+ StreamShardHandle streamShardHandle) throws InterruptedException;
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/AdaptivePollingRecordPublisher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/AdaptivePollingRecordPublisher.java
new file mode 100644
index 0000000..bdb7167
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/AdaptivePollingRecordPublisher.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+
+/**
+ * An adaptive record publisher to add a dynamic loop delay and batch read size for {@link PollingRecordPublisher}.
+ * Kinesis Streams have quotas on the transactions per second, and throughout. This class attempts to balance
+ * quotas and mitigate back off errors.
+ */
+@Internal
+public class AdaptivePollingRecordPublisher extends PollingRecordPublisher {
+ // AWS Kinesis has a read limit of 2 Mb/sec
+ // https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html
+ private static final long KINESIS_SHARD_BYTES_PER_SECOND_LIMIT = 2 * 1024L * 1024L;
+
+ private int lastRecordBatchSize = 0;
+
+ private long lastRecordBatchSizeInBytes = 0;
+
+ private long processingStartTimeNanos = System.nanoTime();
+
+ private int maxNumberOfRecordsPerFetch;
+
+ private final long fetchIntervalMillis;
+
+ private final PollingRecordPublisherMetricsReporter metricsReporter;
+
+ AdaptivePollingRecordPublisher(
+ final StartingPosition startingPosition,
+ final StreamShardHandle subscribedShard,
+ final PollingRecordPublisherMetricsReporter metricsReporter,
+ final KinesisProxyInterface kinesisProxy,
+ final int maxNumberOfRecordsPerFetch,
+ final long fetchIntervalMillis) throws InterruptedException {
+ super(startingPosition, subscribedShard, metricsReporter, kinesisProxy, maxNumberOfRecordsPerFetch, fetchIntervalMillis);
+ this.maxNumberOfRecordsPerFetch = maxNumberOfRecordsPerFetch;
+ this.fetchIntervalMillis = fetchIntervalMillis;
+ this.metricsReporter = metricsReporter;
+ }
+
+ @Override
+ public RecordPublisherRunResult run(final RecordBatchConsumer consumer) throws InterruptedException {
+ final RecordPublisherRunResult result = super.run(batch -> {
+ SequenceNumber latestSequenceNumber = consumer.accept(batch);
+ lastRecordBatchSize = batch.getDeaggregatedRecordSize();
+ lastRecordBatchSizeInBytes = batch.getTotalSizeInBytes();
+ return latestSequenceNumber;
+ }, maxNumberOfRecordsPerFetch);
+
+ long adjustmentEndTimeNanos = adjustRunLoopFrequency(processingStartTimeNanos, System.nanoTime());
+ long runLoopTimeNanos = adjustmentEndTimeNanos - processingStartTimeNanos;
+ maxNumberOfRecordsPerFetch = adaptRecordsToRead(runLoopTimeNanos, lastRecordBatchSize, lastRecordBatchSizeInBytes, maxNumberOfRecordsPerFetch);
+ processingStartTimeNanos = adjustmentEndTimeNanos;
+ metricsReporter.setRunLoopTimeNanos(runLoopTimeNanos);
+
+ return result;
+ }
+
+ /**
+ * Adjusts loop timing to match target frequency if specified.
+ * @param processingStartTimeNanos The start time of the run loop "work"
+ * @param processingEndTimeNanos The end time of the run loop "work"
+ * @return The System.nanoTime() after the sleep (if any)
+ * @throws InterruptedException
+ */
+ private long adjustRunLoopFrequency(long processingStartTimeNanos, long processingEndTimeNanos)
+ throws InterruptedException {
+ long endTimeNanos = processingEndTimeNanos;
+ if (fetchIntervalMillis != 0) {
+ long processingTimeNanos = processingEndTimeNanos - processingStartTimeNanos;
+ long sleepTimeMillis = fetchIntervalMillis - (processingTimeNanos / 1_000_000);
+ if (sleepTimeMillis > 0) {
+ Thread.sleep(sleepTimeMillis);
+ endTimeNanos = System.nanoTime();
+ metricsReporter.setSleepTimeMillis(sleepTimeMillis);
+ }
+ }
+ return endTimeNanos;
+ }
+
+ /**
+ * Calculates how many records to read each time through the loop based on a target throughput
+ * and the measured frequenecy of the loop.
+ * @param runLoopTimeNanos The total time of one pass through the loop
+ * @param numRecords The number of records of the last read operation
+ * @param recordBatchSizeBytes The total batch size of the last read operation
+ * @param maxNumberOfRecordsPerFetch The current maxNumberOfRecordsPerFetch
+ */
+ private int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, long recordBatchSizeBytes,
+ int maxNumberOfRecordsPerFetch) {
+ if (numRecords != 0 && runLoopTimeNanos != 0) {
+ long averageRecordSizeBytes = recordBatchSizeBytes / numRecords;
+ // Adjust number of records to fetch from the shard depending on current average record size
+ // to optimize 2 Mb / sec read limits
+ double loopFrequencyHz = 1000000000.0d / runLoopTimeNanos;
+ double bytesPerRead = KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / loopFrequencyHz;
+ maxNumberOfRecordsPerFetch = (int) (bytesPerRead / averageRecordSizeBytes);
+ // Ensure the value is greater than 0 and not more than 10000L
+ maxNumberOfRecordsPerFetch = Math.max(1, Math.min(maxNumberOfRecordsPerFetch, ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX));
+
+ // Set metrics
+ metricsReporter.setLoopFrequencyHz(loopFrequencyHz);
+ metricsReporter.setBytesPerRead(bytesPerRead);
+ }
+ return maxNumberOfRecordsPerFetch;
+ }
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java
new file mode 100644
index 0000000..4edc1f0
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.util.Preconditions;
+
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.LATEST;
+import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
+import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
+
+/**
+ * A {@link RecordPublisher} that will read records from Kinesis and forward them to the subscriber.
+ * Records are consumed by polling the GetRecords KDS API using a ShardIterator.
+ */
+@Internal
+public class PollingRecordPublisher implements RecordPublisher {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PollingRecordPublisher.class);
+
+ private final PollingRecordPublisherMetricsReporter metricsReporter;
+
+ private final KinesisProxyInterface kinesisProxy;
+
+ private final StreamShardHandle subscribedShard;
+
+ private String nextShardItr;
+
+ private StartingPosition nextStartingPosition;
+
+ private final int maxNumberOfRecordsPerFetch;
+
+ private final long expiredIteratorBackoffMillis;
+
+ /**
+ * A Polling implementation of {@link RecordPublisher} that polls kinesis for records.
+ * The following KDS services are used: GetRecords and GetShardIterator.
+ *
+ * @param startingPosition the position in the stream to start consuming from
+ * @param subscribedShard the shard in which to consume from
+ * @param metricsReporter a metric reporter used to output metrics
+ * @param kinesisProxy the proxy used to communicate with kinesis
+ * @param maxNumberOfRecordsPerFetch the maximum number of records to retrieve per batch
+ * @param expiredIteratorBackoffMillis the duration to sleep in the event of an {@link ExpiredIteratorException}
+ */
+ PollingRecordPublisher(
+ final StartingPosition startingPosition,
+ final StreamShardHandle subscribedShard,
+ final PollingRecordPublisherMetricsReporter metricsReporter,
+ final KinesisProxyInterface kinesisProxy,
+ final int maxNumberOfRecordsPerFetch,
+ final long expiredIteratorBackoffMillis) throws InterruptedException {
+ this.nextStartingPosition = Preconditions.checkNotNull(startingPosition);
+ this.subscribedShard = Preconditions.checkNotNull(subscribedShard);
+ this.metricsReporter = Preconditions.checkNotNull(metricsReporter);
+ this.kinesisProxy = Preconditions.checkNotNull(kinesisProxy);
+ this.maxNumberOfRecordsPerFetch = maxNumberOfRecordsPerFetch;
+ this.expiredIteratorBackoffMillis = expiredIteratorBackoffMillis;
+
+ Preconditions.checkArgument(expiredIteratorBackoffMillis >= 0);
+ Preconditions.checkArgument(maxNumberOfRecordsPerFetch > 0);
+
+ this.nextShardItr = getShardIterator();
+ }
+
+ @Override
+ public RecordPublisherRunResult run(final RecordBatchConsumer consumer) throws InterruptedException {
+ return run(consumer, maxNumberOfRecordsPerFetch);
+ }
+
+ public RecordPublisherRunResult run(final RecordBatchConsumer consumer, int maxNumberOfRecords) throws InterruptedException {
+ if (nextShardItr == null) {
+ return COMPLETE;
+ }
+
+ metricsReporter.setMaxNumberOfRecordsPerFetch(maxNumberOfRecords);
+
+ GetRecordsResult result = getRecords(nextShardItr, maxNumberOfRecords);
+
+ RecordBatch recordBatch = new RecordBatch(result.getRecords(), subscribedShard, result.getMillisBehindLatest());
+ SequenceNumber latestSeequenceNumber = consumer.accept(recordBatch);
+
+ nextStartingPosition = StartingPosition.continueFromSequenceNumber(latestSeequenceNumber);
+ nextShardItr = result.getNextShardIterator();
+ return nextShardItr == null ? COMPLETE : INCOMPLETE;
+ }
+
+ /**
+ * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected
+ * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on
+ * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should
+ * be used for the next call to this method.
+ *
+ * <p>Note: it is important that this method is not called again before all the records from the last result have been
+ * fully collected with {@code ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise
+ * {@code ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to
+ * incorrect shard iteration if the iterator had to be refreshed.
+ *
+ * @param shardItr shard iterator to use
+ * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt
+ * @return get records result
+ */
+ private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException {
+ GetRecordsResult getRecordsResult = null;
+ while (getRecordsResult == null) {
+ try {
+ getRecordsResult = kinesisProxy.getRecords(shardItr, maxNumberOfRecords);
+ } catch (ExpiredIteratorException eiEx) {
+ LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
+ " refreshing the iterator ...", shardItr, subscribedShard);
+
+ shardItr = getShardIterator();
+
+ // sleep for the fetch interval before the next getRecords attempt with the refreshed iterator
+ if (expiredIteratorBackoffMillis != 0) {
+ Thread.sleep(expiredIteratorBackoffMillis);
+ }
+ }
+ }
+ return getRecordsResult;
+ }
+
+ /**
+ * Returns a shard iterator for the given {@link SequenceNumber}.
+ *
+ * @return shard iterator
+ */
+ @Nullable
+ private String getShardIterator() throws InterruptedException {
+ if (nextStartingPosition.getShardIteratorType() == LATEST && subscribedShard.isClosed()) {
+ return null;
+ }
+
+ return kinesisProxy.getShardIterator(
+ subscribedShard,
+ nextStartingPosition.getShardIteratorType().toString(),
+ nextStartingPosition.getStartingMarker());
+ }
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherConfiguration.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherConfiguration.java
new file mode 100644
index 0000000..d5a6baf
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherConfiguration.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+
+import java.util.Properties;
+
+/**
+ * A configuration class for {@link PollingRecordPublisher} instantiated from a properties map.
+ */
+@Internal
+public class PollingRecordPublisherConfiguration {
+
+ private final boolean adaptiveReads;
+
+ private final int maxNumberOfRecordsPerFetch;
+
+ private final long fetchIntervalMillis;
+
+ public PollingRecordPublisherConfiguration(final Properties consumerConfig) {
+ this.maxNumberOfRecordsPerFetch = Integer.parseInt(consumerConfig.getProperty(
+ ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
+ Integer.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX)));
+
+ this.fetchIntervalMillis = Long.parseLong(consumerConfig.getProperty(
+ ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
+ Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
+
+ this.adaptiveReads = Boolean.parseBoolean(consumerConfig.getProperty(
+ ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS,
+ Boolean.toString(ConsumerConfigConstants.DEFAULT_SHARD_USE_ADAPTIVE_READS)));
+ }
+
+ public boolean isAdaptiveReads() {
+ return adaptiveReads;
+ }
+
+ public int getMaxNumberOfRecordsPerFetch() {
+ return maxNumberOfRecordsPerFetch;
+ }
+
+ public long getFetchIntervalMillis() {
+ return fetchIntervalMillis;
+ }
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactory.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactory.java
new file mode 100644
index 0000000..ee5034f
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactory.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.FlinkKinesisProxyFactory;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisherFactory;
+import org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Properties;
+
+/**
+ * A {@link RecordPublisher} factory used to create instances of {@link PollingRecordPublisher}.
+ */
+@Internal
+public class PollingRecordPublisherFactory implements RecordPublisherFactory {
+
+ private final FlinkKinesisProxyFactory kinesisProxyFactory;
+
+ public PollingRecordPublisherFactory(FlinkKinesisProxyFactory kinesisProxyFactory) {
+ this.kinesisProxyFactory = kinesisProxyFactory;
+ }
+
+ /**
+ * Create a {@link PollingRecordPublisher}.
+ * An {@link AdaptivePollingRecordPublisher} will be created should adaptive reads be enabled in the configuration.
+ *
+ * @param startingPosition the position in the shard to start consuming records from
+ * @param consumerConfig the consumer configuration properties
+ * @param metricGroup the metric group to report metrics to
+ * @param streamShardHandle the shard this consumer is subscribed to
+ * @return a {@link PollingRecordPublisher}
+ */
+ @Override
+ public PollingRecordPublisher create(
+ final StartingPosition startingPosition,
+ final Properties consumerConfig,
+ final MetricGroup metricGroup,
+ final StreamShardHandle streamShardHandle) throws InterruptedException {
+ Preconditions.checkNotNull(startingPosition);
+ Preconditions.checkNotNull(consumerConfig);
+ Preconditions.checkNotNull(metricGroup);
+ Preconditions.checkNotNull(streamShardHandle);
+
+ final PollingRecordPublisherConfiguration configuration = new PollingRecordPublisherConfiguration(consumerConfig);
+ final PollingRecordPublisherMetricsReporter metricsReporter = new PollingRecordPublisherMetricsReporter(metricGroup);
+ final KinesisProxyInterface kinesisProxy = kinesisProxyFactory.create(consumerConfig);
+
+ if (configuration.isAdaptiveReads()) {
+ return new AdaptivePollingRecordPublisher(
+ startingPosition,
+ streamShardHandle,
+ metricsReporter,
+ kinesisProxy,
+ configuration.getMaxNumberOfRecordsPerFetch(),
+ configuration.getFetchIntervalMillis());
+ } else {
+ return new PollingRecordPublisher(
+ startingPosition,
+ streamShardHandle,
+ metricsReporter,
+ kinesisProxy,
+ configuration.getMaxNumberOfRecordsPerFetch(),
+ configuration.getFetchIntervalMillis());
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/PollingRecordPublisherMetricsReporter.java
similarity index 62%
copy from flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java
copy to flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/PollingRecordPublisherMetricsReporter.java
index 4a27b9c..c0f77a8 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/PollingRecordPublisherMetricsReporter.java
@@ -19,30 +19,27 @@
package org.apache.flink.streaming.connectors.kinesis.metrics;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher;
/**
- * A container for {@link ShardConsumer}s to report metric values.
+ * A container for {@link PollingRecordPublisher}s to report metric values.
*/
@Internal
-public class ShardMetricsReporter {
+public class PollingRecordPublisherMetricsReporter {
- private volatile long millisBehindLatest = -1;
private volatile double loopFrequencyHz = 0.0;
private volatile double bytesPerRead = 0.0;
private volatile long runLoopTimeNanos = 0L;
- private volatile long averageRecordSizeBytes = 0L;
private volatile long sleepTimeMillis = 0L;
- private volatile int numberOfAggregatedRecords = 0;
- private volatile int numberOfDeaggregatedRecords = 0;
private volatile int maxNumberOfRecordsPerFetch = 0;
- public long getMillisBehindLatest() {
- return millisBehindLatest;
- }
-
- public void setMillisBehindLatest(long millisBehindLatest) {
- this.millisBehindLatest = millisBehindLatest;
+ public PollingRecordPublisherMetricsReporter(final MetricGroup metricGroup) {
+ metricGroup.gauge(KinesisConsumerMetricConstants.MAX_RECORDS_PER_FETCH, this::getMaxNumberOfRecordsPerFetch);
+ metricGroup.gauge(KinesisConsumerMetricConstants.BYTES_PER_READ, this::getBytesPerRead);
+ metricGroup.gauge(KinesisConsumerMetricConstants.RUNTIME_LOOP_NANOS, this::getRunLoopTimeNanos);
+ metricGroup.gauge(KinesisConsumerMetricConstants.LOOP_FREQUENCY_HZ, this::getLoopFrequencyHz);
+ metricGroup.gauge(KinesisConsumerMetricConstants.SLEEP_TIME_MILLIS, this::getSleepTimeMillis);
}
public double getLoopFrequencyHz() {
@@ -69,14 +66,6 @@ public class ShardMetricsReporter {
this.runLoopTimeNanos = runLoopTimeNanos;
}
- public long getAverageRecordSizeBytes() {
- return averageRecordSizeBytes;
- }
-
- public void setAverageRecordSizeBytes(long averageRecordSizeBytes) {
- this.averageRecordSizeBytes = averageRecordSizeBytes;
- }
-
public long getSleepTimeMillis() {
return sleepTimeMillis;
}
@@ -85,22 +74,6 @@ public class ShardMetricsReporter {
this.sleepTimeMillis = sleepTimeMillis;
}
- public int getNumberOfAggregatedRecords() {
- return numberOfAggregatedRecords;
- }
-
- public void setNumberOfAggregatedRecords(int numberOfAggregatedRecords) {
- this.numberOfAggregatedRecords = numberOfAggregatedRecords;
- }
-
- public int getNumberOfDeaggregatedRecords() {
- return numberOfDeaggregatedRecords;
- }
-
- public void setNumberOfDeaggregatedRecords(int numberOfDeaggregatedRecords) {
- this.numberOfDeaggregatedRecords = numberOfDeaggregatedRecords;
- }
-
public int getMaxNumberOfRecordsPerFetch() {
return maxNumberOfRecordsPerFetch;
}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardConsumerMetricsReporter.java
similarity index 64%
rename from flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java
rename to flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardConsumerMetricsReporter.java
index 4a27b9c..81cce80 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardConsumerMetricsReporter.java
@@ -19,23 +19,26 @@
package org.apache.flink.streaming.connectors.kinesis.metrics;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer;
/**
* A container for {@link ShardConsumer}s to report metric values.
*/
@Internal
-public class ShardMetricsReporter {
+public class ShardConsumerMetricsReporter {
private volatile long millisBehindLatest = -1;
- private volatile double loopFrequencyHz = 0.0;
- private volatile double bytesPerRead = 0.0;
- private volatile long runLoopTimeNanos = 0L;
private volatile long averageRecordSizeBytes = 0L;
- private volatile long sleepTimeMillis = 0L;
private volatile int numberOfAggregatedRecords = 0;
private volatile int numberOfDeaggregatedRecords = 0;
- private volatile int maxNumberOfRecordsPerFetch = 0;
+
+ public ShardConsumerMetricsReporter(final MetricGroup metricGroup) {
+ metricGroup.gauge(KinesisConsumerMetricConstants.MILLIS_BEHIND_LATEST_GAUGE, this::getMillisBehindLatest);
+ metricGroup.gauge(KinesisConsumerMetricConstants.NUM_AGGREGATED_RECORDS_PER_FETCH, this::getNumberOfAggregatedRecords);
+ metricGroup.gauge(KinesisConsumerMetricConstants.NUM_DEAGGREGATED_RECORDS_PER_FETCH, this::getNumberOfDeaggregatedRecords);
+ metricGroup.gauge(KinesisConsumerMetricConstants.AVG_RECORD_SIZE_BYTES, this::getAverageRecordSizeBytes);
+ }
public long getMillisBehindLatest() {
return millisBehindLatest;
@@ -45,30 +48,6 @@ public class ShardMetricsReporter {
this.millisBehindLatest = millisBehindLatest;
}
- public double getLoopFrequencyHz() {
- return loopFrequencyHz;
- }
-
- public void setLoopFrequencyHz(double loopFrequencyHz) {
- this.loopFrequencyHz = loopFrequencyHz;
- }
-
- public double getBytesPerRead() {
- return bytesPerRead;
- }
-
- public void setBytesPerRead(double bytesPerRead) {
- this.bytesPerRead = bytesPerRead;
- }
-
- public long getRunLoopTimeNanos() {
- return runLoopTimeNanos;
- }
-
- public void setRunLoopTimeNanos(long runLoopTimeNanos) {
- this.runLoopTimeNanos = runLoopTimeNanos;
- }
-
public long getAverageRecordSizeBytes() {
return averageRecordSizeBytes;
}
@@ -77,14 +56,6 @@ public class ShardMetricsReporter {
this.averageRecordSizeBytes = averageRecordSizeBytes;
}
- public long getSleepTimeMillis() {
- return sleepTimeMillis;
- }
-
- public void setSleepTimeMillis(long sleepTimeMillis) {
- this.sleepTimeMillis = sleepTimeMillis;
- }
-
public int getNumberOfAggregatedRecords() {
return numberOfAggregatedRecords;
}
@@ -101,12 +72,4 @@ public class ShardMetricsReporter {
this.numberOfDeaggregatedRecords = numberOfDeaggregatedRecords;
}
- public int getMaxNumberOfRecordsPerFetch() {
- return maxNumberOfRecordsPerFetch;
- }
-
- public void setMaxNumberOfRecordsPerFetch(int maxNumberOfRecordsPerFetch) {
- this.maxNumberOfRecordsPerFetch = maxNumberOfRecordsPerFetch;
- }
-
}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StartingPosition.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StartingPosition.java
new file mode 100644
index 0000000..e2090de
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StartingPosition.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import org.apache.flink.annotation.Internal;
+
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+
+import javax.annotation.Nullable;
+
+import java.util.Date;
+
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.LATEST;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.TRIM_HORIZON;
+import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.isSentinelSequenceNumber;
+
+/**
+ * The position in which to start consuming from a stream.
+ */
+@Internal
+public class StartingPosition {
+
+ private final ShardIteratorType shardIteratorType;
+
+ private final Object startingMarker;
+
+ private StartingPosition(final ShardIteratorType shardIteratorType, @Nullable final Object startingMarker) {
+ this.shardIteratorType = shardIteratorType;
+ this.startingMarker = startingMarker;
+ }
+
+ public ShardIteratorType getShardIteratorType() {
+ return shardIteratorType;
+ }
+
+ @Nullable
+ public Object getStartingMarker() {
+ return startingMarker;
+ }
+
+ public static StartingPosition fromTimestamp(final Date date) {
+ return new StartingPosition(AT_TIMESTAMP, date);
+ }
+
+ /**
+ * Returns the starting position for the next record to consume from the given sequence number.
+ * The difference between {@code restartFromSequenceNumber()} and {@code continueFromSequenceNumber()} is that
+ * for {@code restartFromSequenceNumber()} aggregated records are reread to support subsequence failure.
+ *
+ * @param sequenceNumber the last successful sequence number, or sentinel marker
+ * @return the start position in which to consume from
+ */
+ public static StartingPosition continueFromSequenceNumber(final SequenceNumber sequenceNumber) {
+ return fromSequenceNumber(sequenceNumber, false);
+ }
+
+ /**
+ * Returns the starting position to restart record consumption from the given sequence number after failure.
+ * The difference between {@code restartFromSequenceNumber()} and {@code continueFromSequenceNumber()} is that
+ * for {@code restartFromSequenceNumber()} aggregated records are reread to support subsequence failure.
+ *
+ * @param sequenceNumber the last successful sequence number, or sentinel marker
+ * @return the start position in which to consume from
+ */
+ public static StartingPosition restartFromSequenceNumber(final SequenceNumber sequenceNumber) {
+ return fromSequenceNumber(sequenceNumber, true);
+ }
+
+ private static StartingPosition fromSequenceNumber(final SequenceNumber sequenceNumber, final boolean restart) {
+ if (isSentinelSequenceNumber(sequenceNumber)) {
+ return new StartingPosition(fromSentinelSequenceNumber(sequenceNumber), null);
+ } else {
+ // we will be starting from an actual sequence number (due to restore from failure).
+ return new StartingPosition(getShardIteratorType(sequenceNumber, restart), sequenceNumber.getSequenceNumber());
+ }
+ }
+
+ private static ShardIteratorType getShardIteratorType(final SequenceNumber sequenceNumber, final boolean restart) {
+ return restart && sequenceNumber.isAggregated() ? AT_SEQUENCE_NUMBER : AFTER_SEQUENCE_NUMBER;
+ }
+
+ private static ShardIteratorType fromSentinelSequenceNumber(final SequenceNumber sequenceNumber) {
+ if (sequenceNumber.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get())) {
+ return LATEST;
+ } else if (sequenceNumber.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())) {
+ return TRIM_HORIZON;
+ } else {
+ throw new IllegalArgumentException("Unexpected sentinel type: " + sequenceNumber);
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index fe4ae32b..5f2d6d5 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -128,13 +128,13 @@ public class KinesisProxy implements KinesisProxyInterface {
/** Maximum retry attempts for the get shard iterator operation. */
private final int getShardIteratorMaxRetries;
- /* Backoff millis for the describe stream operation. */
+ /** Backoff millis for the describe stream operation. */
private final long describeStreamBaseBackoffMillis;
- /* Maximum backoff millis for the describe stream operation. */
+ /** Maximum backoff millis for the describe stream operation. */
private final long describeStreamMaxBackoffMillis;
- /* Exponential backoff power constant for the describe stream operation. */
+ /** Exponential backoff power constant for the describe stream operation. */
private final double describeStreamExpConstant;
/**
@@ -148,61 +148,61 @@ public class KinesisProxy implements KinesisProxyInterface {
this.kinesisClient = createKinesisClient(configProps);
- this.listShardsBaseBackoffMillis = Long.valueOf(
+ this.listShardsBaseBackoffMillis = Long.parseLong(
configProps.getProperty(
ConsumerConfigConstants.LIST_SHARDS_BACKOFF_BASE,
Long.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_BACKOFF_BASE)));
- this.listShardsMaxBackoffMillis = Long.valueOf(
+ this.listShardsMaxBackoffMillis = Long.parseLong(
configProps.getProperty(
ConsumerConfigConstants.LIST_SHARDS_BACKOFF_MAX,
Long.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_BACKOFF_MAX)));
- this.listShardsExpConstant = Double.valueOf(
+ this.listShardsExpConstant = Double.parseDouble(
configProps.getProperty(
ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT,
Double.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT)));
- this.listShardsMaxRetries = Integer.valueOf(
+ this.listShardsMaxRetries = Integer.parseInt(
configProps.getProperty(
ConsumerConfigConstants.LIST_SHARDS_RETRIES,
Long.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_RETRIES)));
- this.describeStreamBaseBackoffMillis = Long.valueOf(
+ this.describeStreamBaseBackoffMillis = Long.parseLong(
configProps.getProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE,
Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE)));
- this.describeStreamMaxBackoffMillis = Long.valueOf(
+ this.describeStreamMaxBackoffMillis = Long.parseLong(
configProps.getProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX,
Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX)));
- this.describeStreamExpConstant = Double.valueOf(
+ this.describeStreamExpConstant = Double.parseDouble(
configProps.getProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
Double.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT)));
- this.getRecordsBaseBackoffMillis = Long.valueOf(
+ this.getRecordsBaseBackoffMillis = Long.parseLong(
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE,
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE)));
- this.getRecordsMaxBackoffMillis = Long.valueOf(
+ this.getRecordsMaxBackoffMillis = Long.parseLong(
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX,
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX)));
- this.getRecordsExpConstant = Double.valueOf(
+ this.getRecordsExpConstant = Double.parseDouble(
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT)));
- this.getRecordsMaxRetries = Integer.valueOf(
+ this.getRecordsMaxRetries = Integer.parseInt(
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES,
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_RETRIES)));
- this.getShardIteratorBaseBackoffMillis = Long.valueOf(
+ this.getShardIteratorBaseBackoffMillis = Long.parseLong(
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE,
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE)));
- this.getShardIteratorMaxBackoffMillis = Long.valueOf(
+ this.getShardIteratorMaxBackoffMillis = Long.parseLong(
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX,
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX)));
- this.getShardIteratorExpConstant = Double.valueOf(
+ this.getShardIteratorExpConstant = Double.parseDouble(
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT)));
- this.getShardIteratorMaxRetries = Integer.valueOf(
+ this.getShardIteratorMaxRetries = Integer.parseInt(
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES,
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_RETRIES)));
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
index b5926b1..6652529 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -21,6 +21,9 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider;
+import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.ClientConfigurationFactory;
@@ -47,10 +50,13 @@ import com.fasterxml.jackson.databind.deser.DefaultDeserializationContext;
import com.fasterxml.jackson.databind.deser.DeserializerFactory;
import java.io.IOException;
+import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM;
+
/**
* Some utilities specific to Amazon Web Service.
*/
@@ -240,4 +246,22 @@ public class AWSUtil {
}
}
+ /**
+ * Creates a {@link StartingPosition} from the given {@link SequenceNumber} and {@link Properties}.
+ * In the case we are restating from a {@link SentinelSequenceNumber#SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM}, the date
+ * is parsed from the properties.
+ *
+ * @param sequenceNumber the sequence number to resume from
+ * @param configProps the properties to parse date from
+ * @return the starting position
+ */
+ public static StartingPosition getStartingPosition(final SequenceNumber sequenceNumber, final Properties configProps) {
+ if (SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get().equals(sequenceNumber)) {
+ Date timestamp = KinesisConfigUtil.parseStreamTimestampStartingPosition(configProps);
+ return StartingPosition.fromTimestamp(timestamp);
+ } else {
+ return StartingPosition.restartFromSequenceNumber(sequenceNumber);
+ }
+ }
+
}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index ace075e..6bd5548 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -31,10 +31,14 @@ import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import java.text.ParseException;
import java.text.SimpleDateFormat;
+import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -105,13 +109,13 @@ public class KinesisConfigUtil {
// specified initial timestamp in stream when using AT_TIMESTAMP
if (InitialPosition.valueOf(initPosType) == InitialPosition.AT_TIMESTAMP) {
- if (!config.containsKey(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP)) {
+ if (!config.containsKey(STREAM_INITIAL_TIMESTAMP)) {
throw new IllegalArgumentException("Please set value for initial timestamp ('"
- + ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position.");
+ + STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position.");
}
validateOptionalDateProperty(config,
- ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP,
- config.getProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT),
+ STREAM_INITIAL_TIMESTAMP,
+ config.getProperty(STREAM_TIMESTAMP_DATE_FORMAT, DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT),
"Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream. "
+ "Must be a valid format: yyyy-MM-dd'T'HH:mm:ss.SSSXXX or non-negative double value. For example, 2016-04-04T19:58:46.480-00:00 or 1459799926.480 .");
}
@@ -300,6 +304,26 @@ public class KinesisConfigUtil {
}
}
+ /**
+ * Parses the timestamp in which to start consuming from the stream, from the given properties.
+ *
+ * @param consumerConfig the properties to parse timestamp from
+ * @return the timestamp
+ */
+ public static Date parseStreamTimestampStartingPosition(final Properties consumerConfig) {
+ String timestamp = consumerConfig.getProperty(STREAM_INITIAL_TIMESTAMP);
+
+ try {
+ String format = consumerConfig.getProperty(STREAM_TIMESTAMP_DATE_FORMAT, DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT);
+ SimpleDateFormat customDateFormat = new SimpleDateFormat(format);
+ return customDateFormat.parse(timestamp);
+ } catch (IllegalArgumentException | NullPointerException exception) {
+ throw new IllegalArgumentException(exception);
+ } catch (ParseException exception) {
+ return new Date((long) (Double.parseDouble(timestamp) * 1000));
+ }
+ }
+
private static void validateOptionalPositiveLongProperty(Properties config, String key, String message) {
if (config.containsKey(key)) {
try {
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index 2815193..478564b 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -152,7 +152,7 @@ public class KinesisDataFetcherTest extends TestLogger {
// emitting a null (i.e., a corrupt record) should not produce any output, but still have the shard state updated
fetcher.emitRecordAndUpdateState(null, 10L, 1, new SequenceNumber("seq-num-2"));
assertEquals(new SequenceNumber("seq-num-2"), testShardStates.get(1).getLastProcessedSequenceNum());
- assertEquals(null, sourceContext.removeLatestOutput()); // no output should have been collected
+ assertNull(sourceContext.removeLatestOutput()); // no output should have been collected
}
@Test
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
index 6fbe4ee..b39b99e 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
@@ -18,9 +18,13 @@
package org.apache.flink.streaming.connectors.kinesis.internals;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.streaming.connectors.kinesis.metrics.ShardMetricsReporter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisherFactory;
+import org.apache.flink.streaming.connectors.kinesis.metrics.ShardConsumerMetricsReporter;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
@@ -28,6 +32,7 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavi
import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
import org.apache.flink.streaming.connectors.kinesis.testutils.TestSourceContext;
import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcher;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.Shard;
@@ -52,6 +57,7 @@ import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequen
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
@@ -61,12 +67,11 @@ import static org.mockito.Mockito.verify;
public class ShardConsumerTest {
@Test
- public void testMetricsReporting() {
+ public void testMetricsReporting() throws Exception {
KinesisProxyInterface kinesis = FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(500, 5, 500);
- ShardMetricsReporter metrics = assertNumberOfMessagesReceivedFromKinesis(500, kinesis, fakeSequenceNumber());
+ ShardConsumerMetricsReporter metrics = assertNumberOfMessagesReceivedFromKinesis(500, kinesis, fakeSequenceNumber());
assertEquals(500, metrics.getMillisBehindLatest());
- assertEquals(10000, metrics.getMaxNumberOfRecordsPerFetch());
}
@Test
@@ -105,7 +110,7 @@ public class ShardConsumerTest {
}
@Test
- public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithUnexpectedExpiredIterator() {
+ public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithUnexpectedExpiredIterator() throws Exception {
KinesisProxyInterface kinesis = FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(1000, 9, 7, 500L);
// Get a total of 1000 records with 9 getRecords() calls,
@@ -114,7 +119,7 @@ public class ShardConsumerTest {
}
@Test
- public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithAdaptiveReads() {
+ public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithAdaptiveReads() throws Exception {
Properties consumerProperties = new Properties();
consumerProperties.setProperty(SHARD_USE_ADAPTIVE_READS, "true");
@@ -133,7 +138,7 @@ public class ShardConsumerTest {
// Expecting to receive all messages
// 10 batches of 3 aggregated records each with 5 child records
// 10 * 3 * 5 = 150
- ShardMetricsReporter metrics = assertNumberOfMessagesReceivedFromKinesis(150, kinesis, fakeSequenceNumber());
+ ShardConsumerMetricsReporter metrics = assertNumberOfMessagesReceivedFromKinesis(150, kinesis, fakeSequenceNumber());
assertEquals(3, metrics.getNumberOfAggregatedRecords());
assertEquals(15, metrics.getNumberOfDeaggregatedRecords());
@@ -149,7 +154,7 @@ public class ShardConsumerTest {
// 5 batches of 1 aggregated record each with 10 child records
// Last consumed message was sub-sequence 5 (6/10) (zero based) (remaining are 6, 7, 8, 9)
// 5 * 1 * 10 - 6 = 44
- ShardMetricsReporter metrics = assertNumberOfMessagesReceivedFromKinesis(44, kinesis, sequenceNumber);
+ ShardConsumerMetricsReporter metrics = assertNumberOfMessagesReceivedFromKinesis(44, kinesis, sequenceNumber);
assertEquals(1, metrics.getNumberOfAggregatedRecords());
assertEquals(10, metrics.getNumberOfDeaggregatedRecords());
@@ -160,19 +165,20 @@ public class ShardConsumerTest {
return new SequenceNumber("fakeStartingState");
}
- private ShardMetricsReporter assertNumberOfMessagesReceivedFromKinesis(
- final int expectedNumberOfMessages,
- final KinesisProxyInterface kinesis,
- final SequenceNumber startingSequenceNumber) {
+ private ShardConsumerMetricsReporter assertNumberOfMessagesReceivedFromKinesis(
+ final int expectedNumberOfMessages,
+ final KinesisProxyInterface kinesis,
+ final SequenceNumber startingSequenceNumber) throws Exception {
return assertNumberOfMessagesReceivedFromKinesis(expectedNumberOfMessages, kinesis, startingSequenceNumber, new Properties());
}
- private ShardMetricsReporter assertNumberOfMessagesReceivedFromKinesis(
- final int expectedNumberOfMessages,
- final KinesisProxyInterface kinesis,
- final SequenceNumber startingSequenceNumber,
- final Properties consumerProperties) {
- ShardMetricsReporter shardMetricsReporter = new ShardMetricsReporter();
+ private ShardConsumerMetricsReporter assertNumberOfMessagesReceivedFromKinesis(
+ final int expectedNumberOfMessages,
+ final KinesisProxyInterface kinesis,
+ final SequenceNumber startingSequenceNumber,
+ final Properties consumerProperties) throws Exception {
+ ShardConsumerMetricsReporter shardMetricsReporter = new ShardConsumerMetricsReporter(mock(MetricGroup.class));
+
StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);
LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
@@ -197,13 +203,20 @@ public class ShardConsumerTest {
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
Mockito.mock(KinesisProxyInterface.class));
+ final StreamShardHandle shardHandle = subscribedShardsStateUnderTest.get(0).getStreamShardHandle();
+ SequenceNumber lastProcessedSequenceNum = subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum();
+ StartingPosition startingPosition = AWSUtil.getStartingPosition(lastProcessedSequenceNum, consumerProperties);
+
+ final RecordPublisher recordPublisher = new PollingRecordPublisherFactory(config -> kinesis)
+ .create(startingPosition, fetcher.getConsumerConfiguration(), mock(MetricGroup.class), shardHandle);
+
int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
new ShardConsumer<>(
fetcher,
+ recordPublisher,
shardIndex,
- subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
- subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
- kinesis,
+ shardHandle,
+ lastProcessedSequenceNum,
shardMetricsReporter,
deserializationSchema)
.run();
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordBatchTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordBatchTest.java
new file mode 100644
index 0000000..2446093
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordBatchTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
+
+import com.amazonaws.services.kinesis.model.Record;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils.createDummyStreamShardHandle;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link RecordBatch}.
+ */
+public class RecordBatchTest {
+
+ @Test
+ public void testDeaggregateRecordsPassThrough() {
+ RecordBatch result = new RecordBatch(Arrays.asList(
+ record("1"),
+ record("2"),
+ record("3"),
+ record("4")
+ ), createDummyStreamShardHandle(), 100L);
+
+ assertEquals(4, result.getAggregatedRecordSize());
+ assertEquals(4, result.getDeaggregatedRecordSize());
+ assertEquals(128, result.getTotalSizeInBytes());
+ assertEquals(32, result.getAverageRecordSizeBytes());
+ }
+
+ @Test
+ public void testDeaggregateRecordsWithAggregatedRecords() {
+ final List<Record> records = TestUtils.createAggregatedRecordBatch(5, 5, new AtomicInteger());
+ RecordBatch result = new RecordBatch(records, createDummyStreamShardHandle(), 100L);
+
+ assertEquals(5, result.getAggregatedRecordSize());
+ assertEquals(25, result.getDeaggregatedRecordSize());
+ assertEquals(25 * 1024, result.getTotalSizeInBytes());
+ assertEquals(1024, result.getAverageRecordSizeBytes());
+ }
+
+ @Test
+ public void testGetAverageRecordSizeBytesEmptyList() {
+ RecordBatch result = new RecordBatch(emptyList(), createDummyStreamShardHandle(), 100L);
+
+ assertEquals(0, result.getAggregatedRecordSize());
+ assertEquals(0, result.getDeaggregatedRecordSize());
+ assertEquals(0, result.getAverageRecordSizeBytes());
+ }
+
+ @Test
+ public void testGetMillisBehindLatest() {
+ RecordBatch result = new RecordBatch(singletonList(record("1")), createDummyStreamShardHandle(), 100L);
+
+ assertEquals(Long.valueOf(100), result.getMillisBehindLatest());
+ }
+
+ private Record record(final String sequenceNumber) {
+ byte[] data = RandomStringUtils.randomAlphabetic(32)
+ .getBytes(ConfigConstants.DEFAULT_CHARSET);
+
+ return new Record()
+ .withData(ByteBuffer.wrap(data))
+ .withPartitionKey("pk")
+ .withSequenceNumber(sequenceNumber);
+ }
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherConfigurationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherConfigurationTest.java
new file mode 100644
index 0000000..b091256
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherConfigurationTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
+
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SHARD_GETRECORDS_MAX;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link PollingRecordPublisherConfiguration}.
+ */
+public class PollingRecordPublisherConfigurationTest {
+
+ @Test
+ public void testDefaults() {
+ PollingRecordPublisherConfiguration configuration = new PollingRecordPublisherConfiguration(new Properties());
+ assertEquals(configuration.getFetchIntervalMillis(), 200);
+ assertEquals(configuration.getMaxNumberOfRecordsPerFetch(), 10000);
+ assertFalse(configuration.isAdaptiveReads());
+ }
+
+ @Test
+ public void testGetFetchIntervalMillis() {
+ Properties properties = properties(SHARD_GETRECORDS_INTERVAL_MILLIS, "1");
+ PollingRecordPublisherConfiguration configuration = new PollingRecordPublisherConfiguration(properties);
+
+ assertEquals(configuration.getFetchIntervalMillis(), 1);
+ }
+
+ @Test
+ public void testGetMaxNumberOfRecordsPerFetch() {
+ Properties properties = properties(SHARD_GETRECORDS_MAX, "2");
+ PollingRecordPublisherConfiguration configuration = new PollingRecordPublisherConfiguration(properties);
+
+ assertEquals(configuration.getMaxNumberOfRecordsPerFetch(), 2);
+ }
+
+ @Test
+ public void testIsAdaptiveReads() {
+ Properties properties = properties(SHARD_USE_ADAPTIVE_READS, "true");
+ PollingRecordPublisherConfiguration configuration = new PollingRecordPublisherConfiguration(properties);
+
+ assertTrue(configuration.isAdaptiveReads());
+ }
+
+ private Properties properties(final String key, final String value) {
+ final Properties properties = new Properties();
+ properties.setProperty(key, value);
+ return properties;
+ }
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactoryTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactoryTest.java
new file mode 100644
index 0000000..c249db7
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactoryTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS;
+import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link PollingRecordPublisherFactory}.
+ */
+public class PollingRecordPublisherFactoryTest {
+
+ private final PollingRecordPublisherFactory factory = new PollingRecordPublisherFactory(props -> mock(KinesisProxy.class));
+
+ @Test
+ public void testBuildPollingRecordPublisher() throws Exception {
+ RecordPublisher recordPublisher = factory.create(
+ StartingPosition.restartFromSequenceNumber(SENTINEL_LATEST_SEQUENCE_NUM.get()),
+ new Properties(),
+ mock(MetricGroup.class),
+ mock(StreamShardHandle.class));
+
+ assertTrue(recordPublisher instanceof PollingRecordPublisher);
+ assertFalse(recordPublisher instanceof AdaptivePollingRecordPublisher);
+ }
+
+ @Test
+ public void testBuildAdaptivePollingRecordPublisher() throws Exception {
+ Properties properties = new Properties();
+ properties.setProperty(SHARD_USE_ADAPTIVE_READS, "true");
+
+ RecordPublisher recordPublisher = factory.create(
+ StartingPosition.restartFromSequenceNumber(SENTINEL_LATEST_SEQUENCE_NUM.get()),
+ properties,
+ mock(MetricGroup.class),
+ mock(StreamShardHandle.class));
+
+ assertTrue(recordPublisher instanceof PollingRecordPublisher);
+ assertTrue(recordPublisher instanceof AdaptivePollingRecordPublisher);
+ }
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherTest.java
new file mode 100644
index 0000000..2bbe3da
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordBatchConsumer;
+import org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
+
+import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
+import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
+import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM;
+import static org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for {@link PollingRecordPublisher}.
+ */
+public class PollingRecordPublisherTest {
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testRunPublishesRecordsToConsumer() throws Exception {
+ KinesisProxyInterface fakeKinesis = totalNumOfRecordsAfterNumOfGetRecordsCalls(5, 1, 100);
+ PollingRecordPublisher recordPublisher = createPollingRecordPublisher(fakeKinesis);
+
+ TestConsumer consumer = new TestConsumer();
+ recordPublisher.run(consumer);
+
+ assertEquals(1, consumer.recordBatches.size());
+ assertEquals(5, consumer.recordBatches.get(0).getDeaggregatedRecordSize());
+ assertEquals(100L, consumer.recordBatches.get(0).getMillisBehindLatest(), 0);
+ }
+
+ @Test
+ public void testRunReturnsCompleteWhenShardExpires() throws Exception {
+ // There are 2 batches available in the stream
+ KinesisProxyInterface fakeKinesis = totalNumOfRecordsAfterNumOfGetRecordsCalls(5, 2, 100);
+ PollingRecordPublisher recordPublisher = createPollingRecordPublisher(fakeKinesis);
+
+ // First call results in INCOMPLETE, there is one batch left
+ assertEquals(INCOMPLETE, recordPublisher.run(new TestConsumer()));
+
+ // After second call the shard is complete
+ assertEquals(COMPLETE, recordPublisher.run(new TestConsumer()));
+ }
+
+ @Test
+ public void testRunOnCompletelyConsumedShardReturnsComplete() throws Exception {
+ KinesisProxyInterface fakeKinesis = totalNumOfRecordsAfterNumOfGetRecordsCalls(5, 1, 100);
+ PollingRecordPublisher recordPublisher = createPollingRecordPublisher(fakeKinesis);
+
+ assertEquals(COMPLETE, recordPublisher.run(new TestConsumer()));
+ assertEquals(COMPLETE, recordPublisher.run(new TestConsumer()));
+ }
+
+ @Test
+ public void testRunGetShardIteratorReturnsNullIsComplete() throws Exception {
+ KinesisProxyInterface fakeKinesis = FakeKinesisBehavioursFactory.noShardsFoundForRequestedStreamsBehaviour();
+ PollingRecordPublisher recordPublisher = createPollingRecordPublisher(fakeKinesis);
+
+ assertEquals(COMPLETE, recordPublisher.run(new TestConsumer()));
+ }
+
+ @Test
+ public void testRunGetRecordsRecoversFromExpiredIteratorException() throws Exception {
+ KinesisProxyInterface fakeKinesis = spy(FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(2, 2, 1, 500));
+ PollingRecordPublisher recordPublisher = createPollingRecordPublisher(fakeKinesis);
+
+ recordPublisher.run(new TestConsumer());
+
+ // Get shard iterator is called twice, once during first run, secondly to refresh expired iterator
+ verify(fakeKinesis, times(2)).getShardIterator(any(), any(), any());
+ }
+
+ @Test
+ public void validateExpiredIteratorBackoffMillisNegativeThrows() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+
+ new PollingRecordPublisher(
+ StartingPosition.restartFromSequenceNumber(SENTINEL_EARLIEST_SEQUENCE_NUM.get()),
+ TestUtils.createDummyStreamShardHandle(),
+ mock(PollingRecordPublisherMetricsReporter.class),
+ mock(KinesisProxyInterface.class),
+ 100,
+ -1);
+ }
+
+ @Test
+ public void validateMaxNumberOfRecordsPerFetchZeroThrows() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+
+ new PollingRecordPublisher(
+ StartingPosition.restartFromSequenceNumber(SENTINEL_EARLIEST_SEQUENCE_NUM.get()),
+ TestUtils.createDummyStreamShardHandle(),
+ mock(PollingRecordPublisherMetricsReporter.class),
+ mock(KinesisProxyInterface.class),
+ 0,
+ 100);
+ }
+
+ PollingRecordPublisher createPollingRecordPublisher(final KinesisProxyInterface kinesis) throws Exception {
+ PollingRecordPublisherMetricsReporter metricsReporter = new PollingRecordPublisherMetricsReporter(mock(MetricGroup.class));
+
+ return new PollingRecordPublisher(
+ StartingPosition.restartFromSequenceNumber(SENTINEL_EARLIEST_SEQUENCE_NUM.get()),
+ TestUtils.createDummyStreamShardHandle(),
+ metricsReporter,
+ kinesis,
+ 10000,
+ 500L);
+ }
+
+ private static class TestConsumer implements RecordBatchConsumer {
+ private final List<RecordBatch> recordBatches = new ArrayList<>();
+ private String latestSequenceNumber;
+
+ @Override
+ public SequenceNumber accept(final RecordBatch batch) {
+ recordBatches.add(batch);
+
+ if (batch.getDeaggregatedRecordSize() > 0) {
+ List<UserRecord> records = batch.getDeaggregatedRecords();
+ latestSequenceNumber = records.get(records.size() - 1).getSequenceNumber();
+ }
+
+ return new SequenceNumber(latestSequenceNumber);
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/metrics/PollingRecordPublisherMetricsReporterTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/metrics/PollingRecordPublisherMetricsReporterTest.java
new file mode 100644
index 0000000..8cc2e17
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/metrics/PollingRecordPublisherMetricsReporterTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.metrics;
+
+import org.apache.flink.metrics.MetricGroup;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for {@link PollingRecordPublisherMetricsReporter}.
+ */
+public class PollingRecordPublisherMetricsReporterTest {
+
+ @InjectMocks
+ private PollingRecordPublisherMetricsReporter metricsReporter;
+
+ @Mock
+ private MetricGroup metricGroup;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void testMetricIdentifiers() {
+ verify(metricGroup).gauge(eq(KinesisConsumerMetricConstants.BYTES_PER_READ), any());
+ verify(metricGroup).gauge(eq(KinesisConsumerMetricConstants.LOOP_FREQUENCY_HZ), any());
+ verify(metricGroup).gauge(eq(KinesisConsumerMetricConstants.MAX_RECORDS_PER_FETCH), any());
+ verify(metricGroup).gauge(eq(KinesisConsumerMetricConstants.RUNTIME_LOOP_NANOS), any());
+ verify(metricGroup).gauge(eq(KinesisConsumerMetricConstants.SLEEP_TIME_MILLIS), any());
+ }
+
+ @Test
+ public void testGettersAndSetters() {
+ metricsReporter.setBytesPerRead(1);
+ metricsReporter.setLoopFrequencyHz(2);
+ metricsReporter.setMaxNumberOfRecordsPerFetch(3);
+ metricsReporter.setRunLoopTimeNanos(4);
+ metricsReporter.setSleepTimeMillis(5);
+
+ assertEquals(1, metricsReporter.getBytesPerRead(), 0);
+ assertEquals(2, metricsReporter.getLoopFrequencyHz(), 0);
+ assertEquals(3, metricsReporter.getMaxNumberOfRecordsPerFetch());
+ assertEquals(4, metricsReporter.getRunLoopTimeNanos());
+ assertEquals(5, metricsReporter.getSleepTimeMillis());
+ }
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardConsumerMetricsReporterTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardConsumerMetricsReporterTest.java
new file mode 100644
index 0000000..33e6fd3
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardConsumerMetricsReporterTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.metrics;
+
+import org.apache.flink.metrics.MetricGroup;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for {@link ShardConsumerMetricsReporter}.
+ */
+public class ShardConsumerMetricsReporterTest {
+
+ @InjectMocks
+ private ShardConsumerMetricsReporter metricsReporter;
+
+ @Mock
+ private MetricGroup metricGroup;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void testMetricIdentifiers() {
+ verify(metricGroup).gauge(eq(KinesisConsumerMetricConstants.AVG_RECORD_SIZE_BYTES), any());
+ verify(metricGroup).gauge(eq(KinesisConsumerMetricConstants.MILLIS_BEHIND_LATEST_GAUGE), any());
+ verify(metricGroup).gauge(eq(KinesisConsumerMetricConstants.NUM_AGGREGATED_RECORDS_PER_FETCH), any());
+ verify(metricGroup).gauge(eq(KinesisConsumerMetricConstants.NUM_DEAGGREGATED_RECORDS_PER_FETCH), any());
+ }
+
+ @Test
+ public void testGettersAndSetters() {
+ metricsReporter.setAverageRecordSizeBytes(1);
+ metricsReporter.setMillisBehindLatest(2);
+ metricsReporter.setNumberOfAggregatedRecords(3);
+ metricsReporter.setNumberOfDeaggregatedRecords(4);
+
+ assertEquals(1, metricsReporter.getAverageRecordSizeBytes());
+ assertEquals(2, metricsReporter.getMillisBehindLatest());
+ assertEquals(3, metricsReporter.getNumberOfAggregatedRecords());
+ assertEquals(4, metricsReporter.getNumberOfDeaggregatedRecords());
+ }
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/model/StartingPositionTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/model/StartingPositionTest.java
new file mode 100644
index 0000000..d2c507d
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/model/StartingPositionTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Date;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Tests for {@link StartingPosition}.
+ */
+public class StartingPositionTest {
+
+ @Rule
+ public final ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testStartingPositionFromTimestamp() {
+ Date date = new Date();
+ StartingPosition position = StartingPosition.fromTimestamp(date);
+ assertEquals(ShardIteratorType.AT_TIMESTAMP, position.getShardIteratorType());
+ assertEquals(date, position.getStartingMarker());
+ }
+
+ @Test
+ public void testStartingPositionRestartFromSequenceNumber() {
+ SequenceNumber sequenceNumber = new SequenceNumber("100");
+ StartingPosition position = StartingPosition.restartFromSequenceNumber(sequenceNumber);
+ assertEquals(ShardIteratorType.AFTER_SEQUENCE_NUMBER, position.getShardIteratorType());
+ assertEquals("100", position.getStartingMarker());
+ }
+
+ @Test
+ public void testStartingPositionRestartFromAggregatedSequenceNumber() {
+ SequenceNumber sequenceNumber = new SequenceNumber("200", 3);
+ StartingPosition position = StartingPosition.restartFromSequenceNumber(sequenceNumber);
+ assertEquals(ShardIteratorType.AT_SEQUENCE_NUMBER, position.getShardIteratorType());
+ assertEquals("200", position.getStartingMarker());
+ }
+
+ @Test
+ public void testStartingPositionContinueFromAggregatedSequenceNumber() {
+ SequenceNumber sequenceNumber = new SequenceNumber("200", 3);
+ StartingPosition position = StartingPosition.continueFromSequenceNumber(sequenceNumber);
+ assertEquals(ShardIteratorType.AFTER_SEQUENCE_NUMBER, position.getShardIteratorType());
+ assertEquals("200", position.getStartingMarker());
+ }
+
+ @Test
+ public void testStartingPositionRestartFromSentinelEarliest() {
+ SequenceNumber sequenceNumber = SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get();
+ StartingPosition position = StartingPosition.restartFromSequenceNumber(sequenceNumber);
+ assertEquals(ShardIteratorType.TRIM_HORIZON, position.getShardIteratorType());
+ assertNull(position.getStartingMarker());
+ }
+
+ @Test
+ public void testStartingPositionRestartFromSentinelLatest() {
+ SequenceNumber sequenceNumber = SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get();
+ StartingPosition position = StartingPosition.restartFromSequenceNumber(sequenceNumber);
+ assertEquals(ShardIteratorType.LATEST, position.getShardIteratorType());
+ assertNull(position.getStartingMarker());
+ }
+
+ @Test
+ public void testStartingPositionRestartFromSentinelEnding() {
+ thrown.expect(IllegalArgumentException.class);
+
+ SequenceNumber sequenceNumber = SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get();
+ StartingPosition position = StartingPosition.restartFromSequenceNumber(sequenceNumber);
+ assertEquals(ShardIteratorType.LATEST, position.getShardIteratorType());
+ assertNull(position.getStartingMarker());
+ }
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index 177a7cf..af3bdab 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -43,6 +43,7 @@ import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils.createDummyStreamShardHandle;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
@@ -121,10 +122,10 @@ public class FakeKinesisBehavioursFactory {
* @param numOfGetRecordsCalls the number batches available in the fake stream
*/
public static KinesisProxyInterface aggregatedRecords(
- final int numOfAggregatedRecords,
- final int numOfChildRecords,
- final int numOfGetRecordsCalls) {
- return new SingleShardEmittingAggregatedRecordsKinesis(numOfAggregatedRecords, numOfChildRecords, numOfGetRecordsCalls);
+ final int numOfAggregatedRecords,
+ final int numOfChildRecords,
+ final int numOfGetRecordsCalls) {
+ return new SingleShardEmittingAggregatedRecordsKinesis(numOfAggregatedRecords, numOfChildRecords, numOfGetRecordsCalls);
}
public static KinesisProxyInterface blockingQueueGetRecords(Map<String, List<BlockingQueue<String>>> streamsToShardQueues) {
@@ -335,14 +336,14 @@ public class FakeKinesisBehavioursFactory {
public SingleShardEmittingAggregatedRecordsKinesis(
final int numOfAggregatedRecords,
- final int numOfChildRecords,
- final int numOfGetRecordsCalls) {
+ final int numOfChildRecords,
+ final int numOfGetRecordsCalls) {
super(initShardItrToRecordBatch(numOfAggregatedRecords, numOfChildRecords, numOfGetRecordsCalls));
}
private static Map<String, List<Record>> initShardItrToRecordBatch(final int numOfAggregatedRecords,
- final int numOfChildRecords,
- final int numOfGetRecordsCalls) {
+ final int numOfChildRecords,
+ final int numOfGetRecordsCalls) {
Map<String, List<Record>> shardToRecordBatch = new HashMap<>();
@@ -415,9 +416,7 @@ public class FakeKinesisBehavioursFactory {
List<StreamShardHandle> shardsOfStream = new ArrayList<>(shardCount);
for (int i = 0; i < shardCount; i++) {
shardsOfStream.add(
- new StreamShardHandle(
- streamName,
- new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i))));
+ createDummyStreamShardHandle(streamName, KinesisShardIdGenerator.generateFromShardOrder(i)));
}
streamsWithListOfShards.put(streamName, shardsOfStream);
}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
index 7fce762..7c5c786 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
@@ -19,12 +19,18 @@ package org.apache.flink.streaming.connectors.kinesis.testutils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import com.amazonaws.kinesis.agg.AggRecord;
import com.amazonaws.kinesis.agg.RecordAggregator;
+import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
+import com.amazonaws.services.kinesis.model.Shard;
import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.StringUtils;
+import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Date;
@@ -93,4 +99,21 @@ public class TestUtils {
return recordBatch;
}
+ public static StreamShardHandle createDummyStreamShardHandle() {
+ return createDummyStreamShardHandle("stream-name", "000000");
+ }
+
+ public static StreamShardHandle createDummyStreamShardHandle(final String streamName, final String shardId) {
+ final Shard shard = new Shard()
+ .withSequenceNumberRange(new SequenceNumberRange()
+ .withStartingSequenceNumber("0")
+ .withEndingSequenceNumber("9999999999999"))
+ .withHashKeyRange(new HashKeyRange()
+ .withStartingHashKey("0")
+ .withEndingHashKey(new BigInteger(StringUtils.repeat("FF", 16), 16).toString()))
+ .withShardId(shardId);
+
+ return new StreamShardHandle(streamName, shard);
+ }
+
}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtilTest.java
index 47ea500..64fd5d1 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtilTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtilTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.kinesis.util;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
@@ -29,9 +30,19 @@ import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import java.text.SimpleDateFormat;
+import java.util.Date;
import java.util.Properties;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.LATEST;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP;
+import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM;
+import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
@@ -80,4 +91,26 @@ public class AWSUtilTest {
public void testInvalidRegion() {
assertFalse(AWSUtil.isValidRegion("ur-east-1"));
}
+
+ @Test
+ public void testGetStartingPositionForLatest() {
+ StartingPosition position = AWSUtil.getStartingPosition(SENTINEL_LATEST_SEQUENCE_NUM.get(), new Properties());
+
+ assertEquals(LATEST, position.getShardIteratorType());
+ assertNull(position.getStartingMarker());
+ }
+
+ @Test
+ public void testGetStartingPositionForTimestamp() throws Exception {
+ String timestamp = "2020-08-13T09:18:00.0+01:00";
+ Date expectedTimestamp = new SimpleDateFormat(DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT).parse(timestamp);
+
+ Properties consumerProperties = new Properties();
+ consumerProperties.setProperty(STREAM_INITIAL_TIMESTAMP, timestamp);
+
+ StartingPosition position = AWSUtil.getStartingPosition(SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get(), consumerProperties);
+
+ assertEquals(AT_TIMESTAMP, position.getShardIteratorType());
+ assertEquals(expectedTimestamp, position.getStartingMarker());
+ }
}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
index f589514..bd298d0 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
@@ -30,8 +30,13 @@ import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import java.text.SimpleDateFormat;
+import java.util.Date;
import java.util.Properties;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -41,6 +46,7 @@ import static org.junit.Assert.fail;
@RunWith(PowerMockRunner.class)
@PrepareForTest(KinesisConfigUtil.class)
public class KinesisConfigUtilTest {
+
@Rule
private ExpectedException exception = ExpectedException.none();
@@ -509,4 +515,52 @@ public class KinesisConfigUtilTest {
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
}
+
+ @Test
+ public void testParseStreamTimestampStartingPositionUsingDefaultFormat() throws Exception {
+ String timestamp = "2020-08-13T09:18:00.0+01:00";
+ Date expectedTimestamp = new SimpleDateFormat(DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT).parse(timestamp);
+
+ Properties consumerProperties = new Properties();
+ consumerProperties.setProperty(STREAM_INITIAL_TIMESTAMP, timestamp);
+
+ Date actualimestamp = KinesisConfigUtil.parseStreamTimestampStartingPosition(consumerProperties);
+
+ assertEquals(expectedTimestamp, actualimestamp);
+ }
+
+ @Test
+ public void testParseStreamTimestampStartingPositionUsingCustomFormat() throws Exception {
+ String format = "yyyy-MM-dd'T'HH:mm";
+ String timestamp = "2020-08-13T09:23";
+ Date expectedTimestamp = new SimpleDateFormat(format).parse(timestamp);
+
+ Properties consumerProperties = new Properties();
+ consumerProperties.setProperty(STREAM_INITIAL_TIMESTAMP, timestamp);
+ consumerProperties.setProperty(STREAM_TIMESTAMP_DATE_FORMAT, format);
+
+ Date actualimestamp = KinesisConfigUtil.parseStreamTimestampStartingPosition(consumerProperties);
+
+ assertEquals(expectedTimestamp, actualimestamp);
+ }
+
+ @Test
+ public void testParseStreamTimestampStartingPositionUsingParseError() {
+ exception.expect(NumberFormatException.class);
+
+ Properties consumerProperties = new Properties();
+ consumerProperties.setProperty(STREAM_INITIAL_TIMESTAMP, "bad");
+
+ KinesisConfigUtil.parseStreamTimestampStartingPosition(consumerProperties);
+ }
+
+ @Test
+ public void testParseStreamTimestampStartingPositionIllegalArgumentException() {
+ exception.expect(IllegalArgumentException.class);
+
+ Properties consumerProperties = new Properties();
+
+ KinesisConfigUtil.parseStreamTimestampStartingPosition(consumerProperties);
+ }
+
}