You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/09/21 08:33:01 UTC
[flink] 01/03: [FLINK-18515][Kinesis] Adding FanOutRecordPublisher
for Kinesis EFO support
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit bbcd0c791371c2c6b3e477a83adfbd78dbee2602
Author: Danny Cranmer <cr...@amazon.com>
AuthorDate: Fri Sep 4 10:35:35 2020 +0100
[FLINK-18515][Kinesis] Adding FanOutRecordPublisher for Kinesis EFO support
This closes #13189.
---
flink-connectors/flink-connector-kinesis/pom.xml | 1 +
.../kinesis/config/ConsumerConfigConstants.java | 15 +-
.../internals/DynamoDBStreamsDataFetcher.java | 3 +-
.../kinesis/internals/KinesisDataFetcher.java | 100 +++--
.../kinesis/internals/ShardConsumer.java | 13 +-
.../publisher/RecordPublisherFactory.java | 7 +
.../publisher/fanout/FanOutRecordPublisher.java | 246 +++++++++++
.../fanout/FanOutRecordPublisherConfiguration.java | 2 +-
.../FanOutRecordPublisherFactory.java} | 77 ++--
.../publisher/fanout/FanOutShardSubscriber.java | 468 +++++++++++++++++++++
.../publisher/polling/PollingRecordPublisher.java | 5 -
.../polling/PollingRecordPublisherFactory.java | 2 +-
.../kinesis/proxy/FullJitterBackoff.java | 61 +++
.../connectors/kinesis/proxy/KinesisProxy.java | 41 +-
.../kinesis/proxy/KinesisProxyInterface.java | 1 +
.../connectors/kinesis/proxy/KinesisProxyV2.java | 19 +-
.../kinesis/proxy/KinesisProxyV2Interface.java | 14 +
.../streaming/connectors/kinesis/util/AWSUtil.java | 12 +-
.../connectors/kinesis/util/AwsV2Util.java | 36 +-
.../connectors/kinesis/util/KinesisConfigUtil.java | 3 +
.../kinesis/FlinkKinesisConsumerTest.java | 10 +-
.../kinesis/internals/KinesisDataFetcherTest.java | 45 +-
.../kinesis/internals/ShardConsumerFanOutTest.java | 242 +++++++++++
.../kinesis/internals/ShardConsumerTest.java | 111 +----
.../kinesis/internals/ShardConsumerTestUtils.java | 129 ++++++
.../FanOutRecordPublisherConfigurationTest.java | 1 -
.../fanout/FanOutRecordPublisherTest.java | 443 +++++++++++++++++++
.../polling/PollingRecordPublisherTest.java | 30 +-
.../kinesis/proxy/KinesisProxyV2Test.java | 60 +++
.../FakeKinesisFanOutBehavioursFactory.java | 391 +++++++++++++++++
.../connectors/kinesis/testutils/TestUtils.java | 41 ++
.../testutils/TestableKinesisDataFetcher.java | 39 +-
...inesisDataFetcherForShardConsumerException.java | 4 +-
.../connectors/kinesis/util/AWSUtilTest.java | 8 +-
.../connectors/kinesis/util/AwsV2UtilTest.java | 38 +-
.../kinesis/util/KinesisConfigUtilTest.java | 35 +-
36 files changed, 2484 insertions(+), 269 deletions(-)
diff --git a/flink-connectors/flink-connector-kinesis/pom.xml b/flink-connectors/flink-connector-kinesis/pom.xml
index 2a96598..9d22d8c 100644
--- a/flink-connectors/flink-connector-kinesis/pom.xml
+++ b/flink-connectors/flink-connector-kinesis/pom.xml
@@ -103,6 +103,7 @@ under the License.
<scope>test</scope>
</dependency>
+ <!-- Amazon AWS SDK v1.x dependencies -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index dde4821..f003b3b 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -224,6 +224,8 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
/** The maximum number of records that will be buffered before suspending consumption of a shard. */
public static final String WATERMARK_SYNC_QUEUE_CAPACITY = "flink.watermark.sync.queue.capacity";
+ public static final String EFO_HTTP_CLIENT_MAX_CONCURRENCY = "flink.stream.efo.http-client.max-concurrency";
+
// ------------------------------------------------------------------------
// Default values for consumer configuration
// ------------------------------------------------------------------------
@@ -272,7 +274,7 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
public static final double DEFAULT_DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
- public static final int DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES = 5;
+ public static final int DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES = 10;
public static final long DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_BASE = 1000L;
@@ -308,10 +310,21 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
public static final long DEFAULT_WATERMARK_SYNC_MILLIS = 30_000;
+ public static final int DEFAULT_EFO_HTTP_CLIENT_MAX_CONURRENCY = 10_000;
+
/**
* To avoid shard iterator expires in {@link ShardConsumer}s, the value for the configured
* getRecords interval can not exceed 5 minutes, which is the expire time for retrieved iterators.
*/
public static final long MAX_SHARD_GETRECORDS_INTERVAL_MILLIS = 300000L;
+ /**
+ * Build the key of an EFO consumer ARN according to a stream name.
+ * @param streamName the stream name the key is built upon.
+ * @return a key of EFO consumer ARN.
+ */
+ public static String efoConsumerArn(final String streamName) {
+ return EFO_CONSUMER_ARN_PREFIX + "." + streamName;
+ }
+
}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java
index afa1c28..fbf0d01 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java
@@ -70,7 +70,8 @@ public class DynamoDBStreamsDataFetcher<T> extends KinesisDataFetcher<T> {
new ArrayList<>(),
createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
// use DynamoDBStreamsProxy
- DynamoDBStreamsProxy::create);
+ DynamoDBStreamsProxy::create,
+ null);
}
@Override
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 2be9b1c..133a4d3 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -27,8 +27,10 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisherFactory;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisherFactory;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisherFactory;
import org.apache.flink.streaming.connectors.kinesis.metrics.KinesisConsumerMetricConstants;
import org.apache.flink.streaming.connectors.kinesis.metrics.ShardConsumerMetricsReporter;
@@ -41,8 +43,11 @@ import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.connectors.kinesis.util.AwsV2Util;
import org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter;
import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
@@ -56,6 +61,9 @@ import com.amazonaws.services.kinesis.model.SequenceNumberRange;
import com.amazonaws.services.kinesis.model.Shard;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+
+import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
@@ -74,6 +82,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RECORD_PUBLISHER_TYPE;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType.POLLING;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -183,6 +193,9 @@ public class KinesisDataFetcher<T> {
/** The Kinesis proxy factory that will be used to create instances for discovery and shard consumers. */
private final FlinkKinesisProxyFactory kinesisProxyFactory;
+ /** The Kinesis proxy V2 factory that will be used to create instances for EFO shard consumers. */
+ private final FlinkKinesisProxyV2Factory kinesisProxyV2Factory;
+
/** The Kinesis proxy that the fetcher will be using to discover new shards. */
private final KinesisProxyInterface kinesis;
@@ -243,6 +256,13 @@ public class KinesisDataFetcher<T> {
}
/**
+ * Factory to create Kinesis proxy V@ instances used by a fetcher.
+ */
+ public interface FlinkKinesisProxyV2Factory {
+ KinesisProxyV2Interface create(Properties configProps);
+ }
+
+ /**
* The wrapper that holds the watermark handling related parameters
* of a record produced by the shard consumer thread.
*
@@ -318,14 +338,15 @@ public class KinesisDataFetcher<T> {
* @param configProps the consumer configuration properties
* @param deserializationSchema deserialization schema
*/
- public KinesisDataFetcher(List<String> streams,
- SourceFunction.SourceContext<T> sourceContext,
- RuntimeContext runtimeContext,
- Properties configProps,
- KinesisDeserializationSchema<T> deserializationSchema,
- KinesisShardAssigner shardAssigner,
- AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
- WatermarkTracker watermarkTracker) {
+ public KinesisDataFetcher(
+ final List<String> streams,
+ final SourceFunction.SourceContext<T> sourceContext,
+ final RuntimeContext runtimeContext,
+ final Properties configProps,
+ final KinesisDeserializationSchema<T> deserializationSchema,
+ final KinesisShardAssigner shardAssigner,
+ final AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
+ final WatermarkTracker watermarkTracker) {
this(streams,
sourceContext,
sourceContext.getCheckpointLock(),
@@ -338,23 +359,26 @@ public class KinesisDataFetcher<T> {
new AtomicReference<>(),
new ArrayList<>(),
createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
- KinesisProxy::create);
+ KinesisProxy::create,
+ KinesisDataFetcher::createKinesisProxyV2);
}
@VisibleForTesting
- protected KinesisDataFetcher(List<String> streams,
- SourceFunction.SourceContext<T> sourceContext,
- Object checkpointLock,
- RuntimeContext runtimeContext,
- Properties configProps,
- KinesisDeserializationSchema<T> deserializationSchema,
- KinesisShardAssigner shardAssigner,
- AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
- WatermarkTracker watermarkTracker,
- AtomicReference<Throwable> error,
- List<KinesisStreamShardState> subscribedShardsState,
- HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds,
- FlinkKinesisProxyFactory kinesisProxyFactory) {
+ protected KinesisDataFetcher(
+ final List<String> streams,
+ final SourceFunction.SourceContext<T> sourceContext,
+ final Object checkpointLock,
+ final RuntimeContext runtimeContext,
+ final Properties configProps,
+ final KinesisDeserializationSchema<T> deserializationSchema,
+ final KinesisShardAssigner shardAssigner,
+ final AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner,
+ final WatermarkTracker watermarkTracker,
+ final AtomicReference<Throwable> error,
+ final List<KinesisStreamShardState> subscribedShardsState,
+ final HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds,
+ final FlinkKinesisProxyFactory kinesisProxyFactory,
+ @Nullable final FlinkKinesisProxyV2Factory kinesisProxyV2Factory) {
this.streams = checkNotNull(streams);
this.configProps = checkNotNull(configProps);
this.sourceContext = checkNotNull(sourceContext);
@@ -367,6 +391,7 @@ public class KinesisDataFetcher<T> {
this.periodicWatermarkAssigner = periodicWatermarkAssigner;
this.watermarkTracker = watermarkTracker;
this.kinesisProxyFactory = checkNotNull(kinesisProxyFactory);
+ this.kinesisProxyV2Factory = kinesisProxyV2Factory;
this.kinesis = kinesisProxyFactory.create(configProps);
this.recordPublisherFactory = createRecordPublisherFactory();
@@ -379,6 +404,7 @@ public class KinesisDataFetcher<T> {
this.shardConsumersExecutor =
createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks());
+
this.recordEmitter = createRecordEmitter(configProps);
}
@@ -402,11 +428,11 @@ public class KinesisDataFetcher<T> {
* @return shard consumer
*/
protected ShardConsumer<T> createShardConsumer(
- Integer subscribedShardStateIndex,
- StreamShardHandle subscribedShard,
- SequenceNumber lastSequenceNum,
- MetricGroup metricGroup,
- KinesisDeserializationSchema<T> shardDeserializer) throws InterruptedException {
+ final Integer subscribedShardStateIndex,
+ final StreamShardHandle subscribedShard,
+ final SequenceNumber lastSequenceNum,
+ final MetricGroup metricGroup,
+ final KinesisDeserializationSchema<T> shardDeserializer) throws InterruptedException {
return new ShardConsumer<>(
this,
@@ -418,8 +444,17 @@ public class KinesisDataFetcher<T> {
shardDeserializer);
}
- private RecordPublisherFactory createRecordPublisherFactory() {
- return new PollingRecordPublisherFactory(kinesisProxyFactory);
+ protected RecordPublisherFactory createRecordPublisherFactory() {
+ RecordPublisherType recordPublisherType = RecordPublisherType.valueOf(
+ configProps.getProperty(RECORD_PUBLISHER_TYPE, POLLING.name()));
+
+ switch (recordPublisherType) {
+ case EFO:
+ return new FanOutRecordPublisherFactory(kinesisProxyV2Factory.create(configProps));
+ case POLLING:
+ default:
+ return new PollingRecordPublisherFactory(kinesisProxyFactory);
+ }
}
protected RecordPublisher createRecordPublisher(
@@ -432,6 +467,11 @@ public class KinesisDataFetcher<T> {
return recordPublisherFactory.create(startingPosition, configProps, metricGroup, subscribedShard);
}
+ private static KinesisProxyV2Interface createKinesisProxyV2(final Properties configProps) {
+ final KinesisAsyncClient client = AwsV2Util.createKinesisAsyncClient(configProps);
+ return new KinesisProxyV2(client);
+ }
+
/**
* Starts the fetcher. After starting the fetcher, it can only
* be stopped by calling {@link KinesisDataFetcher#shutdownFetcher()}.
@@ -672,6 +712,8 @@ public class KinesisDataFetcher<T> {
LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask);
}
shardConsumersExecutor.shutdownNow();
+
+ recordPublisherFactory.close();
}
/** After calling {@link KinesisDataFetcher#shutdownFetcher()}, this can be called to await the fetcher shutdown. */
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index d9c0d9d..5bf0b09 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -28,6 +28,8 @@ import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -51,6 +53,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
@Internal
public class ShardConsumer<T> implements Runnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ShardConsumer.class);
+
private final KinesisDeserializationSchema<T> deserializer;
private final int subscribedShardStateIndex;
@@ -102,6 +107,11 @@ public class ShardConsumer<T> implements Runnable {
try {
while (isRunning()) {
final RecordPublisherRunResult result = recordPublisher.run(batch -> {
+ if (!batch.getDeaggregatedRecords().isEmpty()) {
+ LOG.debug("stream: {}, shard: {}, millis behind latest: {}, batch size: {}",
+ subscribedShard.getStreamName(), subscribedShard.getShard().getShardId(),
+ batch.getMillisBehindLatest(), batch.getDeaggregatedRecordSize());
+ }
for (UserRecord userRecord : batch.getDeaggregatedRecords()) {
if (filterDeaggregatedRecord(userRecord)) {
deserializeRecordForCollectionAndUpdateState(userRecord);
@@ -118,7 +128,6 @@ public class ShardConsumer<T> implements Runnable {
if (result == COMPLETE) {
fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
-
// we can close this consumer thread once we've reached the end of the subscribed shard
break;
}
@@ -188,7 +197,7 @@ public class ShardConsumer<T> implements Runnable {
* This method is to support restarting from a partially consumed aggregated sequence number.
*
* @param record the record to filter
- * @return {@code true} if the record should be retained
+ * @return true if the record should be retained
*/
private boolean filterDeaggregatedRecord(final UserRecord record) {
if (!lastSequenceNum.isAggregated()) {
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisherFactory.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisherFactory.java
index 0dcd0cb..672dc38 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisherFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisherFactory.java
@@ -45,4 +45,11 @@ public interface RecordPublisherFactory {
MetricGroup metricGroup,
StreamShardHandle streamShardHandle) throws InterruptedException;
+ /**
+ * Destroy any open resources used by the factory.
+ */
+ default void close() {
+ // Do nothing by default
+ }
+
}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java
new file mode 100644
index 0000000..2174029
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.FanOutSubscriberException;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.proxy.FullJitterBackoff;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.model.EncryptionType;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.function.Consumer;
+
+import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
+import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
+import static software.amazon.awssdk.services.kinesis.model.StartingPosition.builder;
+
+/**
+ * A {@link RecordPublisher} that will read and forward records from Kinesis using EFO, to the subscriber.
+ * Records are consumed via Enhanced Fan Out subscriptions using SubscribeToShard API.
+ */
+@Internal
+public class FanOutRecordPublisher implements RecordPublisher {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FanOutRecordPublisher.class);
+
+ private final FullJitterBackoff backoff;
+
+ private final String consumerArn;
+
+ private final KinesisProxyV2Interface kinesisProxy;
+
+ private final StreamShardHandle subscribedShard;
+
+ private final FanOutRecordPublisherConfiguration configuration;
+
+ /** The current attempt in the case of subsequent recoverable errors. */
+ private int attempt = 0;
+
+ private StartingPosition nextStartingPosition;
+
+ /**
+ * Instantiate a new FanOutRecordPublisher.
+ * Consumes data from KDS using EFO SubscribeToShard over AWS SDK V2.x
+ *
+ * @param startingPosition the position in the shard to start consuming from
+ * @param consumerArn the consumer ARN of the stream consumer
+ * @param subscribedShard the shard to consumer from
+ * @param kinesisProxy the proxy used to talk to Kinesis services
+ * @param configuration the record publisher configuration
+ */
+ public FanOutRecordPublisher(
+ final StartingPosition startingPosition,
+ final String consumerArn,
+ final StreamShardHandle subscribedShard,
+ final KinesisProxyV2Interface kinesisProxy,
+ final FanOutRecordPublisherConfiguration configuration,
+ final FullJitterBackoff backoff) {
+ this.nextStartingPosition = Preconditions.checkNotNull(startingPosition);
+ this.consumerArn = Preconditions.checkNotNull(consumerArn);
+ this.subscribedShard = Preconditions.checkNotNull(subscribedShard);
+ this.kinesisProxy = Preconditions.checkNotNull(kinesisProxy);
+ this.configuration = Preconditions.checkNotNull(configuration);
+ this.backoff = Preconditions.checkNotNull(backoff);
+ }
+
+ @Override
+ public RecordPublisherRunResult run(final RecordBatchConsumer recordConsumer) throws InterruptedException {
+ LOG.info("Running fan out record publisher on {}::{} from {} - {}",
+ subscribedShard.getStreamName(),
+ subscribedShard.getShard().getShardId(),
+ nextStartingPosition.getShardIteratorType(),
+ nextStartingPosition.getStartingMarker());
+
+ Consumer<SubscribeToShardEvent> eventConsumer = event -> {
+ RecordBatch recordBatch = new RecordBatch(toSdkV1Records(event.records()), subscribedShard, event.millisBehindLatest());
+ SequenceNumber sequenceNumber = recordConsumer.accept(recordBatch);
+ nextStartingPosition = StartingPosition.continueFromSequenceNumber(sequenceNumber);
+ };
+
+ RecordPublisherRunResult result = runWithBackoff(eventConsumer);
+
+ LOG.info("Subscription expired {}::{}, with status {}",
+ subscribedShard.getStreamName(),
+ subscribedShard.getShard().getShardId(),
+ result);
+
+ return result;
+ }
+
+ /**
+ * Runs the record publisher, will sleep for configuration computed jitter period in the case of certain exceptions.
+ * Unrecoverable exceptions are thrown to terminate the application.
+ *
+ * @param eventConsumer the consumer to pass events to
+ * @return {@code COMPLETE} if the shard is complete and this shard consumer should exit
+ * @throws InterruptedException
+ */
+ private RecordPublisherRunResult runWithBackoff(
+ final Consumer<SubscribeToShardEvent> eventConsumer) throws InterruptedException {
+ FanOutShardSubscriber fanOutShardSubscriber = new FanOutShardSubscriber(
+ consumerArn,
+ subscribedShard.getShard().getShardId(),
+ kinesisProxy);
+ boolean complete;
+
+ try {
+ complete = fanOutShardSubscriber.subscribeToShardAndConsumeRecords(
+ toSdkV2StartingPosition(nextStartingPosition), eventConsumer);
+ attempt = 0;
+ } catch (FanOutSubscriberException ex) {
+ // We have received an error from the network layer
+ // This can be due to limits being exceeded, network timeouts, etc
+ // We should backoff, reacquire a subscription and try again
+ if (ex.getCause() instanceof ResourceNotFoundException) {
+ LOG.warn("Received ResourceNotFoundException. Either the shard does not exist, or the stream subscriber has been deregistered." +
+ "Marking this shard as complete {} ({})", subscribedShard.getShard().getShardId(), consumerArn);
+
+ return COMPLETE;
+ }
+
+ if (attempt == configuration.getSubscribeToShardMaxRetries()) {
+ throw new RuntimeException("Maximum reties exceeded for SubscribeToShard. " +
+ "Failed " + configuration.getSubscribeToShardMaxRetries() + " times.");
+ }
+
+ attempt++;
+ backoff(ex);
+ return INCOMPLETE;
+ }
+
+ return complete ? COMPLETE : INCOMPLETE;
+ }
+
+ private void backoff(final Throwable ex) throws InterruptedException {
+ long backoffMillis = backoff.calculateFullJitterBackoff(
+ configuration.getSubscribeToShardBaseBackoffMillis(),
+ configuration.getSubscribeToShardMaxBackoffMillis(),
+ configuration.getSubscribeToShardExpConstant(),
+ attempt);
+
+ LOG.warn("Encountered recoverable error {}. Backing off for {} millis {} ({})",
+ ex.getCause().getClass().getSimpleName(),
+ backoffMillis,
+ subscribedShard.getShard().getShardId(),
+ consumerArn,
+ ex);
+
+ backoff.sleep(backoffMillis);
+ }
+
+ /**
+ * Records that come from KPL may be aggregated.
+ * Records must be deaggregated before they are processed by the application.
+ * Deaggregation is performed by KCL.
+ * In order to prevent having to import KCL 1.x and 2.x we convert the records to v1 format and use KCL v1.
+ *
+ * @param records the SDK v2 records
+ * @return records converted to SDK v1 format
+ */
+ private List<com.amazonaws.services.kinesis.model.Record> toSdkV1Records(final List<Record> records) {
+ final List<com.amazonaws.services.kinesis.model.Record> sdkV1Records = new ArrayList<>();
+
+ for (Record record : records) {
+ sdkV1Records.add(toSdkV1Record(record));
+ }
+
+ return sdkV1Records;
+ }
+
+ private com.amazonaws.services.kinesis.model.Record toSdkV1Record(@Nonnull final Record record) {
+ final com.amazonaws.services.kinesis.model.Record recordV1 = new com.amazonaws.services.kinesis.model.Record()
+ .withData(record.data().asByteBuffer())
+ .withSequenceNumber(record.sequenceNumber())
+ .withPartitionKey(record.partitionKey())
+ .withApproximateArrivalTimestamp(new Date(record.approximateArrivalTimestamp().toEpochMilli()));
+
+ EncryptionType encryptionType = record.encryptionType();
+ if (encryptionType != null) {
+ recordV1.withEncryptionType(encryptionType.name());
+ }
+
+ return recordV1;
+ }
+
+ /**
+ * Converts a local {@link StartingPosition} to an AWS SDK V2 object representation.
+ *
+ * @param startingPosition the local {@link StartingPosition}
+ * @return an AWS SDK V2 representation
+ */
+ private software.amazon.awssdk.services.kinesis.model.StartingPosition toSdkV2StartingPosition(StartingPosition startingPosition) {
+ software.amazon.awssdk.services.kinesis.model.StartingPosition.Builder builder = builder()
+ .type(startingPosition.getShardIteratorType().toString());
+
+ Object marker = startingPosition.getStartingMarker();
+
+ switch (startingPosition.getShardIteratorType()) {
+ case AT_TIMESTAMP: {
+ Preconditions.checkNotNull(marker, "StartingPosition AT_TIMESTAMP date marker is null.");
+ builder.timestamp(((Date) marker).toInstant());
+ break;
+ }
+ case AT_SEQUENCE_NUMBER:
+ case AFTER_SEQUENCE_NUMBER: {
+ Preconditions.checkNotNull(marker, "StartingPosition *_SEQUENCE_NUMBER position is null.");
+ builder.sequenceNumber(marker.toString());
+ break;
+ }
+ }
+
+ return builder.build();
+ }
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
index 03705cf..89ffad3 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
@@ -124,7 +124,7 @@ public class FanOutRecordPublisherConfiguration {
private final long describeStreamBaseBackoffMillis;
/**
- * Maximum backoff millis for the describe stream operation.
+ * Maximum backoff millis for the describe stream operation.
*/
private final long describeStreamMaxBackoffMillis;
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactory.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherFactory.java
similarity index 52%
copy from flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactory.java
copy to flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherFactory.java
index ee5034f..f21bfdc 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherFactory.java
@@ -15,74 +15,81 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.FlinkKinesisProxyFactory;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisherFactory;
-import org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
-import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.proxy.FullJitterBackoff;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
import org.apache.flink.util.Preconditions;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+
+import java.util.Optional;
import java.util.Properties;
+import static java.util.Collections.singletonList;
+
/**
- * A {@link RecordPublisher} factory used to create instances of {@link PollingRecordPublisher}.
+ * A {@link RecordPublisher} factory used to create instances of {@link FanOutRecordPublisher}.
*/
@Internal
-public class PollingRecordPublisherFactory implements RecordPublisherFactory {
+public class FanOutRecordPublisherFactory implements RecordPublisherFactory {
- private final FlinkKinesisProxyFactory kinesisProxyFactory;
+ private static final FullJitterBackoff BACKOFF = new FullJitterBackoff();
- public PollingRecordPublisherFactory(FlinkKinesisProxyFactory kinesisProxyFactory) {
- this.kinesisProxyFactory = kinesisProxyFactory;
+ /**
+ * A singleton {@link KinesisProxyV2} is used per Flink task.
+ * The {@link KinesisAsyncClient} uses an internal thread pool; using a single client reduces overhead.
+ */
+ private final KinesisProxyV2Interface kinesisProxy;
+
+ /**
+ * Instantiate a factory responsible for creating {@link FanOutRecordPublisher}.
+ *
+ * @param kinesisProxy the singleton proxy used by all record publishers created by this factory
+ */
+ public FanOutRecordPublisherFactory(final KinesisProxyV2Interface kinesisProxy) {
+ this.kinesisProxy = kinesisProxy;
}
/**
- * Create a {@link PollingRecordPublisher}.
- * An {@link AdaptivePollingRecordPublisher} will be created should adaptive reads be enabled in the configuration.
+ * Create a {@link FanOutRecordPublisher}.
*
- * @param startingPosition the position in the shard to start consuming records from
+ * @param startingPosition the starting position in the shard to start consuming from
* @param consumerConfig the consumer configuration properties
* @param metricGroup the metric group to report metrics to
* @param streamShardHandle the shard this consumer is subscribed to
- * @return a {@link PollingRecordPublisher}
+ * @return a {@link FanOutRecordPublisher}
*/
@Override
- public PollingRecordPublisher create(
+ public FanOutRecordPublisher create(
final StartingPosition startingPosition,
final Properties consumerConfig,
final MetricGroup metricGroup,
- final StreamShardHandle streamShardHandle) throws InterruptedException {
+ final StreamShardHandle streamShardHandle) {
Preconditions.checkNotNull(startingPosition);
Preconditions.checkNotNull(consumerConfig);
Preconditions.checkNotNull(metricGroup);
Preconditions.checkNotNull(streamShardHandle);
- final PollingRecordPublisherConfiguration configuration = new PollingRecordPublisherConfiguration(consumerConfig);
- final PollingRecordPublisherMetricsReporter metricsReporter = new PollingRecordPublisherMetricsReporter(metricGroup);
- final KinesisProxyInterface kinesisProxy = kinesisProxyFactory.create(consumerConfig);
+ String stream = streamShardHandle.getStreamName();
+ FanOutRecordPublisherConfiguration configuration = new FanOutRecordPublisherConfiguration(consumerConfig, singletonList(stream));
+
+ Optional<String> streamConsumerArn = configuration.getStreamConsumerArn(stream);
+ Preconditions.checkState(streamConsumerArn.isPresent());
+
+ return new FanOutRecordPublisher(startingPosition, streamConsumerArn.get(), streamShardHandle, kinesisProxy, configuration, BACKOFF);
+ }
- if (configuration.isAdaptiveReads()) {
- return new AdaptivePollingRecordPublisher(
- startingPosition,
- streamShardHandle,
- metricsReporter,
- kinesisProxy,
- configuration.getMaxNumberOfRecordsPerFetch(),
- configuration.getFetchIntervalMillis());
- } else {
- return new PollingRecordPublisher(
- startingPosition,
- streamShardHandle,
- metricsReporter,
- kinesisProxy,
- configuration.getMaxNumberOfRecordsPerFetch(),
- configuration.getFetchIntervalMillis());
- }
+ @Override
+ public void close() {
+ kinesisProxy.close();
}
+
}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
new file mode 100644
index 0000000..0cc1eaf
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
+import org.apache.flink.util.Preconditions;
+
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class is responsible for acquiring an Enhanced Fan Out subscription and consuming records from a shard.
+ * A queue is used to buffer records between the Kinesis Proxy and Flink application. This allows processing
+ * to be separated from consumption; errors thrown in the consumption layer do not propagate up to application.
+ *
+ * <pre>{@code [
+ * | ----------- Source Connector Thread ----------- | | --- KinesisAsyncClient Thread(s) -- |
+ * | FanOutRecordPublisher | FanOutShardSubscription | == blocking queue == | KinesisProxyV2 | KinesisAsyncClient |
+ * ]}</pre>
+ * <p>
+ * Three types of message are passed over the queue for inter-thread communication:
+ * <ul>
+ * <li>{@link SubscriptionNextEvent} - passes data from the network to the consumer</li>
+ * <li>{@link SubscriptionCompleteEvent} - indicates a subscription has expired</li>
+ * <li>{@link SubscriptionErrorEvent} - passes an exception from the network to the consumer</li>
+ * </ul>
+ * </p>
+ * <p>
+ * The blocking queue has a maximum capacity of 1 record.
+ * This allows backpressure to be applied closer to the network stack and results in record prefetch.
+ * At maximum capacity we will have three {@link SubscribeToShardEvent} in memory (per instance of this class):
+ * <ul>
+ * <li>1 event being processed by the consumer</li>
+ * <li>1 event enqueued in the blocking queue</li>
+ * <li>1 event being added to the queue by the network (blocking)</li>
+ * </ul>
+ * </p>
+ */
+@Internal
+public class FanOutShardSubscriber {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FanOutShardSubscriber.class);
+
+ /**
+ * The maximum capacity of the queue between the network and consumption thread.
+ * The queue is mainly used to isolate networking from consumption such that errors do not bubble up.
+ * This queue also acts as a buffer resulting in a record prefetch and reduced latency.
+ */
+ private static final int QUEUE_CAPACITY = 1;
+
+ /**
+ * Read timeout will occur after 30 seconds, a sanity timeout to prevent lockup in unexpected error states.
+ * If the consumer does not receive a new event within the DEQUEUE_WAIT_SECONDS it will backoff and resubscribe.
+ * Under normal conditions heartbeat events are received even when there are no records to consume, so it is not
+ * expected for this timeout to occur under normal conditions.
+ */
+ private static final int DEQUEUE_WAIT_SECONDS = 35;
+
+ /** The time to wait when enqueuing events to allow error events to "push in front" of data . */
+ private static final int ENQUEUE_WAIT_SECONDS = 5;
+
+ private final BlockingQueue<FanOutSubscriptionEvent> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
+
+ private final KinesisProxyV2Interface kinesis;
+
+ private final String consumerArn;
+
+ private final String shardId;
+
+ /**
+ * Create a new Fan Out subscriber.
+ *
+ * @param consumerArn the stream consumer ARN
+ * @param shardId the shard ID to subscribe to
+ * @param kinesis the Kinesis Proxy used to communicate via AWS SDK v2
+ */
+ FanOutShardSubscriber(final String consumerArn, final String shardId, final KinesisProxyV2Interface kinesis) {
+ this.kinesis = Preconditions.checkNotNull(kinesis);
+ this.consumerArn = Preconditions.checkNotNull(consumerArn);
+ this.shardId = Preconditions.checkNotNull(shardId);
+ }
+
+ /**
+ * Obtains a subscription to the shard from the specified {@code startingPosition}.
+ * {@link SubscribeToShardEvent} received from KDS are delivered to the given {@code eventConsumer}.
+ * Returns false if there are records left to consume from the shard.
+ *
+ * @param startingPosition the position in the stream in which to start receiving records
+ * @param eventConsumer the consumer to deliver received events to
+ * @return true if there are no more messages (complete), false if a subsequent subscription should be obtained
+ * @throws FanOutSubscriberException when an exception is propagated from the networking stack
+ * @throws InterruptedException when the thread is interrupted
+ */
+ boolean subscribeToShardAndConsumeRecords(
+ final StartingPosition startingPosition,
+ final Consumer<SubscribeToShardEvent> eventConsumer) throws InterruptedException, FanOutSubscriberException {
+ LOG.debug("Subscribing to shard {} ({})", shardId, consumerArn);
+
+ try {
+ openSubscriptionToShard(startingPosition);
+ } catch (FanOutSubscriberException ex) {
+ // The only exception that should cause a failure is a ResourceNotFoundException
+ // Rethrow the exception to trigger the application to terminate
+ if (ex.getCause() instanceof ResourceNotFoundException) {
+ throw (ResourceNotFoundException) ex.getCause();
+ }
+
+ throw ex;
+ }
+
+ return consumeAllRecordsFromKinesisShard(eventConsumer);
+ }
+
+ /**
+ * Calls {@link KinesisProxyV2#subscribeToShard} and waits to acquire a subscription.
+ * In the event a non-recoverable error occurs this method will rethrow the exception.
+ * Once the subscription is acquired the client signals to the producer that we are ready to receive records.
+ *
+ * @param startingPosition the position in which to start consuming from
+ * @throws FanOutSubscriberException when an exception is propagated from the networking stack
+ */
+ private void openSubscriptionToShard(final StartingPosition startingPosition) throws FanOutSubscriberException, InterruptedException {
+ SubscribeToShardRequest request = SubscribeToShardRequest.builder()
+ .consumerARN(consumerArn)
+ .shardId(shardId)
+ .startingPosition(startingPosition)
+ .build();
+
+ AtomicReference<Throwable> exception = new AtomicReference<>();
+ CountDownLatch waitForSubscriptionLatch = new CountDownLatch(1);
+ FanOutShardSubscription subscription = new FanOutShardSubscription(waitForSubscriptionLatch);
+
+ SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
+ .builder()
+ .onError(e -> {
+ // Errors that occur while trying to acquire a subscription are only thrown from here
+ // Errors that occur during the subscription are surfaced here and to the FanOutShardSubscription
+ // (errors are ignored here once the subscription is open)
+ if (waitForSubscriptionLatch.getCount() > 0) {
+ exception.set(e);
+ waitForSubscriptionLatch.countDown();
+ }
+ })
+ .subscriber(() -> subscription)
+ .build();
+
+ kinesis.subscribeToShard(request, responseHandler);
+
+ waitForSubscriptionLatch.await();
+
+ Throwable throwable = exception.get();
+ if (throwable != null) {
+ handleError(throwable);
+ }
+
+ LOG.debug("Acquired subscription - {} ({})", shardId, consumerArn);
+
+ // Request the first record to kick off consumption
+ // Following requests are made by the FanOutShardSubscription on the netty thread
+ subscription.requestRecord();
+ }
+
+ /**
+ * Update the reference to the latest networking error in this object.
+ * Parent caller can interrogate to decide how to handle error.
+ *
+ * @param throwable the exception that has occurred
+ */
+ private void handleError(final Throwable throwable) throws FanOutSubscriberException {
+ Throwable cause;
+ if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
+ cause = throwable.getCause();
+ } else {
+ cause = throwable;
+ }
+
+ LOG.warn("Error occurred on EFO subscription: {} - ({}). {} ({})",
+ throwable.getClass().getName(), throwable.getMessage(), shardId, consumerArn, cause);
+
+ throw new FanOutSubscriberException(cause);
+ }
+
+ /**
+ * Once the subscription is open, records will be delivered to the {@link BlockingQueue}.
+ * Queue capacity is hardcoded to 1 record, the queue is used solely to separate consumption and processing.
+ * However, this buffer will result in latency reduction as records are pre-fetched as a result.
+ * This method will poll the queue and exit under any of these conditions:
+ * - {@code continuationSequenceNumber} is {@code null}, indicating the shard is complete
+ * - The subscription expires, indicated by a {@link SubscriptionCompleteEvent}
+ * - There is an error while consuming records, indicated by a {@link SubscriptionErrorEvent}
+ *
+ * @param eventConsumer the event consumer to deliver records to
+ * @return true if there are no more messages (complete), false if a subsequent subscription should be obtained
+ * @throws FanOutSubscriberException when an exception is propagated from the networking stack
+ * @throws InterruptedException when the thread is interrupted
+ */
+ private boolean consumeAllRecordsFromKinesisShard(
+ final Consumer<SubscribeToShardEvent> eventConsumer) throws InterruptedException, FanOutSubscriberException {
+ String continuationSequenceNumber;
+
+ do {
+ // Read timeout will occur after 30 seconds, add a sanity timeout here to prevent lockup
+ FanOutSubscriptionEvent subscriptionEvent = queue.poll(DEQUEUE_WAIT_SECONDS, SECONDS);
+
+ if (subscriptionEvent == null) {
+ LOG.debug("Timed out polling events from network, reacquiring subscription - {} ({})", shardId, consumerArn);
+ return false;
+ } else if (subscriptionEvent.isSubscribeToShardEvent()) {
+ SubscribeToShardEvent event = subscriptionEvent.getSubscribeToShardEvent();
+ continuationSequenceNumber = event.continuationSequenceNumber();
+ if (!event.records().isEmpty()) {
+ eventConsumer.accept(event);
+ }
+ } else if (subscriptionEvent.isSubscriptionComplete()) {
+ // The subscription is complete, but the shard might not be, so we return incomplete
+ return false;
+ } else {
+ handleError(subscriptionEvent.getThrowable());
+ return false;
+ }
+ } while (continuationSequenceNumber != null);
+
+ return true;
+ }
+
+ /**
+ * The {@link FanOutShardSubscription} subscribes to the events coming from KDS and adds them to the {@link BlockingQueue}.
+ * Backpressure is applied based on the maximum capacity of the queue.
+ * The {@link Subscriber} methods of this class are invoked by a thread from the {@link KinesisAsyncClient}.
+ */
+ private class FanOutShardSubscription implements Subscriber<SubscribeToShardEventStream> {
+
+ private Subscription subscription;
+
+ private volatile boolean cancelled = false;
+
+ private final CountDownLatch waitForSubscriptionLatch;
+
+ private final Object lockObject = new Object();
+
+ private FanOutShardSubscription(final CountDownLatch waitForSubscriptionLatch) {
+ this.waitForSubscriptionLatch = waitForSubscriptionLatch;
+ }
+
+ /**
+ * Flag to the producer that we are ready to receive more events.
+ */
+ void requestRecord() {
+ if (!cancelled) {
+ LOG.debug("Requesting more records from EFO subscription - {} ({})", shardId, consumerArn);
+ subscription.request(1);
+ }
+ }
+
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ this.subscription = subscription;
+ waitForSubscriptionLatch.countDown();
+ }
+
+ @Override
+ public void onNext(SubscribeToShardEventStream subscribeToShardEventStream) {
+ subscribeToShardEventStream.accept(new SubscribeToShardResponseHandler.Visitor() {
+ @Override
+ public void visit(SubscribeToShardEvent event) {
+ synchronized (lockObject) {
+ if (enqueueEventWithRetry(new SubscriptionNextEvent(event))) {
+ requestRecord();
+ }
+ }
+ }
+ });
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ LOG.debug("Error occurred on EFO subscription: {} - ({}). {} ({})",
+ throwable.getClass().getName(), throwable.getMessage(), shardId, consumerArn);
+
+ // Cancel the subscription to signal the onNext to stop queuing and requesting data
+ cancelSubscription();
+
+ synchronized (lockObject) {
+ // Empty the queue and add a poison pill to terminate this subscriber
+ // The synchronized block ensures that new data is not written in the meantime
+ queue.clear();
+ enqueueEvent(new SubscriptionErrorEvent(throwable));
+ }
+ }
+
+ @Override
+ public void onComplete() {
+ LOG.debug("EFO subscription complete - {} ({})", shardId, consumerArn);
+ enqueueEvent(new SubscriptionCompleteEvent());
+ }
+
+ private void cancelSubscription() {
+ if (!cancelled) {
+ cancelled = true;
+ subscription.cancel();
+ }
+ }
+
+ /**
+ * Continuously attempt to enqueue an event until successful or the subscription is cancelled (due to error).
+ * When backpressure applied by the consumer exceeds 30s for a single batch, a ReadTimeoutException will be
+ * thrown by the network stack. This will result in the subscription be cancelled and this event being discarded.
+ * The subscription would subsequently be reacquired and the discarded data would be fetched again.
+ *
+ * @param event the event to enqueue
+ * @return true if the event was successfully enqueued.
+ */
+ private boolean enqueueEventWithRetry(final FanOutSubscriptionEvent event) {
+ boolean result = false;
+ do {
+ if (cancelled) {
+ break;
+ }
+
+ synchronized (lockObject) {
+ result = enqueueEvent(event);
+ }
+ } while (!result);
+
+ return result;
+ }
+
+ /**
+ * Offers the event to the queue.
+ *
+ * @param event the event to enqueue
+ * @return true if the event was successfully enqueued.
+ */
+ private boolean enqueueEvent(final FanOutSubscriptionEvent event) {
+ try {
+ if (!queue.offer(event, ENQUEUE_WAIT_SECONDS, SECONDS)) {
+ LOG.debug("Timed out enqueuing event {} - {} ({})", event.getClass().getSimpleName(), shardId, consumerArn);
+ return false;
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+
+ return true;
+ }
+ }
+
+ /**
+ * An exception wrapper to indicate an error has been thrown from the networking stack.
+ */
+ static class FanOutSubscriberException extends Exception {
+
+ private static final long serialVersionUID = 2275015497000437736L;
+
+ public FanOutSubscriberException(Throwable cause) {
+ super(cause);
+ }
+ }
+
+ /**
+ * An interface used to pass messages between {@link FanOutShardSubscription} and {@link FanOutShardSubscriber}
+ * via the {@link BlockingQueue}.
+ */
+ private interface FanOutSubscriptionEvent {
+
+ default boolean isSubscribeToShardEvent() {
+ return false;
+ }
+
+ default boolean isSubscriptionComplete() {
+ return false;
+ }
+
+ default SubscribeToShardEvent getSubscribeToShardEvent() {
+ throw new UnsupportedOperationException("This event does not support getSubscribeToShardEvent()");
+ }
+
+ default Throwable getThrowable() {
+ throw new UnsupportedOperationException("This event does not support getThrowable()");
+ }
+ }
+
+ /**
+ * Indicates that an EFO subscription has completed/expired.
+ */
+ private static class SubscriptionCompleteEvent implements FanOutSubscriptionEvent {
+
+ @Override
+ public boolean isSubscriptionComplete() {
+ return true;
+ }
+ }
+
+ /**
+ * Poison pill, indicates that an error occurred while consuming from KDS.
+ */
+ private static class SubscriptionErrorEvent implements FanOutSubscriptionEvent {
+ private final Throwable throwable;
+
+ private SubscriptionErrorEvent(Throwable throwable) {
+ this.throwable = throwable;
+ }
+
+ @Override
+ public Throwable getThrowable() {
+ return throwable;
+ }
+ }
+
+ /**
+ * A wrapper to pass the next {@link SubscribeToShardEvent} between threads.
+ */
+ private static class SubscriptionNextEvent implements FanOutSubscriptionEvent {
+ private final SubscribeToShardEvent subscribeToShardEvent;
+
+ private SubscriptionNextEvent(SubscribeToShardEvent subscribeToShardEvent) {
+ this.subscribeToShardEvent = subscribeToShardEvent;
+ }
+
+ @Override
+ public boolean isSubscribeToShardEvent() {
+ return true;
+ }
+
+ @Override
+ public SubscribeToShardEvent getSubscribeToShardEvent() {
+ return subscribeToShardEvent;
+ }
+ }
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java
index 4edc1f0..36d3c69 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java
@@ -34,7 +34,6 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import static com.amazonaws.services.kinesis.model.ShardIteratorType.LATEST;
import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
@@ -156,10 +155,6 @@ public class PollingRecordPublisher implements RecordPublisher {
*/
@Nullable
private String getShardIterator() throws InterruptedException {
- if (nextStartingPosition.getShardIteratorType() == LATEST && subscribedShard.isClosed()) {
- return null;
- }
-
return kinesisProxy.getShardIterator(
subscribedShard,
nextStartingPosition.getShardIteratorType().toString(),
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactory.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactory.java
index ee5034f..00680e8 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherFactory.java
@@ -38,7 +38,7 @@ public class PollingRecordPublisherFactory implements RecordPublisherFactory {
private final FlinkKinesisProxyFactory kinesisProxyFactory;
- public PollingRecordPublisherFactory(FlinkKinesisProxyFactory kinesisProxyFactory) {
+ public PollingRecordPublisherFactory(final FlinkKinesisProxyFactory kinesisProxyFactory) {
this.kinesisProxyFactory = kinesisProxyFactory;
}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/FullJitterBackoff.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/FullJitterBackoff.java
new file mode 100644
index 0000000..5d2ffa7
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/FullJitterBackoff.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.Random;
+
+/**
+ * Used to calculate full jitter backoff sleep durations.
+ * @see <a href="https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/">
+ * Exponential Backoff and Jitter
+ * </a>
+ */
+@Internal
+public class FullJitterBackoff {
+
+ /** Random seed used to calculate backoff jitter for Kinesis operations. */
+ private final Random seed = new Random();
+
+ /**
+ * Calculates the sleep time for full jitter based on the given parameters.
+ *
+ * @param baseMillis the base backoff time in milliseconds
+ * @param maxMillis the maximum backoff time in milliseconds
+ * @param power the power constant for exponential backoff
+ * @param attempt the attempt number
+ * @return the time to wait before trying again
+ */
+ public long calculateFullJitterBackoff(long baseMillis, long maxMillis, double power, int attempt) {
+ long exponentialBackoff = (long) Math.min(maxMillis, baseMillis * Math.pow(power, attempt));
+ return (long) (seed.nextDouble() * exponentialBackoff);
+ }
+
+ /**
+ * Puts the current thread to sleep for the specified number of millis.
+ * Simply delegates to {@link Thread#sleep}.
+ *
+ * @param millisToSleep the number of milliseconds to sleep for
+ * @throws InterruptedException
+ */
+ public void sleep(long millisToSleep) throws InterruptedException {
+ Thread.sleep(millisToSleep);
+ }
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 5f2d6d5..b8d3086 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -55,7 +55,6 @@ import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Random;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -74,12 +73,12 @@ public class KinesisProxy implements KinesisProxyInterface {
private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class);
+ /** Calculates full jitter backoff delays. */
+ private static final FullJitterBackoff BACKOFF = new FullJitterBackoff();
+
/** The actual Kinesis client from the AWS SDK that we will be using to make calls. */
private final AmazonKinesis kinesisClient;
- /** Random seed used to calculate backoff jitter for Kinesis operations. */
- private static final Random seed = new Random();
-
// ------------------------------------------------------------------------
// listShards() related performance settings
// ------------------------------------------------------------------------
@@ -206,7 +205,6 @@ public class KinesisProxy implements KinesisProxyInterface {
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES,
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_RETRIES)));
-
}
/**
@@ -230,9 +228,6 @@ public class KinesisProxy implements KinesisProxyInterface {
return new KinesisProxy(configProps);
}
- /**
- * {@inheritDoc}
- */
@Override
public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException {
final GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
@@ -247,11 +242,11 @@ public class KinesisProxy implements KinesisProxyInterface {
getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
} catch (SdkClientException ex) {
if (isRecoverableSdkClientException(ex)) {
- long backoffMillis = fullJitterBackoff(
+ long backoffMillis = BACKOFF.calculateFullJitterBackoff(
getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, retryCount++);
LOG.warn("Got recoverable SdkClientException. Backing off for "
+ backoffMillis + " millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
- Thread.sleep(backoffMillis);
+ BACKOFF.sleep(backoffMillis);
} else {
throw ex;
}
@@ -266,9 +261,6 @@ public class KinesisProxy implements KinesisProxyInterface {
return getRecordsResult;
}
- /**
- * {@inheritDoc}
- */
@Override
public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) throws InterruptedException {
GetShardListResult result = new GetShardListResult();
@@ -281,9 +273,6 @@ public class KinesisProxy implements KinesisProxyInterface {
return result;
}
- /**
- * {@inheritDoc}
- */
@Override
public String getShardIterator(StreamShardHandle shard, String shardIteratorType, @Nullable Object startingMarker) throws InterruptedException {
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest()
@@ -322,11 +311,11 @@ public class KinesisProxy implements KinesisProxyInterface {
getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
} catch (AmazonServiceException ex) {
if (isRecoverableException(ex)) {
- long backoffMillis = fullJitterBackoff(
+ long backoffMillis = BACKOFF.calculateFullJitterBackoff(
getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, retryCount++);
LOG.warn("Got recoverable AmazonServiceException. Backing off for "
+ backoffMillis + " millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
- Thread.sleep(backoffMillis);
+ BACKOFF.sleep(backoffMillis);
} else {
throw ex;
}
@@ -438,11 +427,11 @@ public class KinesisProxy implements KinesisProxyInterface {
listShardsResults = kinesisClient.listShards(listShardsRequest);
} catch (LimitExceededException le) {
- long backoffMillis = fullJitterBackoff(
+ long backoffMillis = BACKOFF.calculateFullJitterBackoff(
listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, retryCount++);
LOG.warn("Got LimitExceededException when listing shards from stream " + streamName
+ ". Backing off for " + backoffMillis + " millis.");
- Thread.sleep(backoffMillis);
+ BACKOFF.sleep(backoffMillis);
} catch (ResourceInUseException reInUse) {
if (LOG.isWarnEnabled()) {
// List Shards will throw an exception if stream in not in active state. Return and re-use previous state available.
@@ -459,11 +448,11 @@ public class KinesisProxy implements KinesisProxyInterface {
break;
} catch (SdkClientException ex) {
if (retryCount < listShardsMaxRetries && isRecoverableSdkClientException(ex)) {
- long backoffMillis = fullJitterBackoff(
+ long backoffMillis = BACKOFF.calculateFullJitterBackoff(
listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, retryCount++);
LOG.warn("Got SdkClientException when listing shards from stream {}. Backing off for {} millis.",
streamName, backoffMillis);
- Thread.sleep(backoffMillis);
+ BACKOFF.sleep(backoffMillis);
} else {
// propagate if retries exceeded or not recoverable
// (otherwise would return null result and keep trying forever)
@@ -515,14 +504,14 @@ public class KinesisProxy implements KinesisProxyInterface {
try {
describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
} catch (LimitExceededException le) {
- long backoffMillis = fullJitterBackoff(
+ long backoffMillis = BACKOFF.calculateFullJitterBackoff(
describeStreamBaseBackoffMillis,
describeStreamMaxBackoffMillis,
describeStreamExpConstant,
attemptCount++);
LOG.warn(String.format("Got LimitExceededException when describing stream %s. "
+ "Backing off for %d millis.", streamName, backoffMillis));
- Thread.sleep(backoffMillis);
+ BACKOFF.sleep(backoffMillis);
} catch (ResourceNotFoundException re) {
throw new RuntimeException("Error while getting stream details", re);
}
@@ -541,8 +530,4 @@ public class KinesisProxy implements KinesisProxyInterface {
return describeStreamResult;
}
- protected static long fullJitterBackoff(long base, long max, double power, int attempt) {
- long exponentialBackoff = (long) Math.min(max, base * Math.pow(power, attempt));
- return (long) (seed.nextDouble() * exponentialBackoff); // random jitter between 0 and the exponential backoff
- }
}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
index 30464f3..db9c7ca 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
@@ -71,4 +71,5 @@ public interface KinesisProxyInterface {
* if the backoff is interrupted.
*/
GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) throws InterruptedException;
+
}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
index d1310e5..26908ce 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
@@ -21,6 +21,10 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.util.Preconditions;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+import java.util.concurrent.CompletableFuture;
/**
* Kinesis proxy implementation using AWS SDK v2.x - a utility class that is used as a proxy to make
@@ -30,10 +34,11 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
@Internal
public class KinesisProxyV2 implements KinesisProxyV2Interface {
+ /** An Asynchronous client used to communicate with AWS services. */
private final KinesisAsyncClient kinesisAsyncClient;
/**
- * Create a new KinesisProxyV2 based on the supplied configuration properties.
+ * Create a new KinesisProxyV2 using the provided Async Client.
*
* @param kinesisAsyncClient the kinesis async client used to communicate with Kinesis
*/
@@ -41,4 +46,16 @@ public class KinesisProxyV2 implements KinesisProxyV2Interface {
this.kinesisAsyncClient = Preconditions.checkNotNull(kinesisAsyncClient);
}
+ @Override
+ public CompletableFuture<Void> subscribeToShard(
+ final SubscribeToShardRequest request,
+ final SubscribeToShardResponseHandler responseHandler) {
+ return kinesisAsyncClient.subscribeToShard(request, responseHandler);
+ }
+
+ @Override
+ public void close() {
+ kinesisAsyncClient.close();
+ }
+
}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Interface.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Interface.java
index aff6a85..e748eb2 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Interface.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Interface.java
@@ -19,10 +19,24 @@ package org.apache.flink.streaming.connectors.kinesis.proxy;
import org.apache.flink.annotation.Internal;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+import java.util.concurrent.CompletableFuture;
+
/**
* Interface for a Kinesis proxy using AWS SDK v2.x operating on multiple Kinesis streams within the same AWS service region.
*/
@Internal
public interface KinesisProxyV2Interface {
+ CompletableFuture<Void> subscribeToShard(SubscribeToShardRequest request, SubscribeToShardResponseHandler responseHandler);
+
+ /**
+ * Destroy any open resources used by the factory.
+ */
+ default void close() {
+ // Do nothing by default
+ }
+
}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
index 7301e7a..43e47b7 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -56,6 +56,7 @@ import java.util.Map;
import java.util.Properties;
import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM;
+import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM;
/**
* Some utilities specific to Amazon Web Service.
@@ -277,7 +278,16 @@ public class AWSUtil {
* @return the starting position
*/
public static StartingPosition getStartingPosition(final SequenceNumber sequenceNumber, final Properties configProps) {
- if (SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get().equals(sequenceNumber)) {
+ if (sequenceNumber.equals(SENTINEL_LATEST_SEQUENCE_NUM.get())) {
+ // LATEST starting positions are translated to AT_TIMESTAMP starting positions. This is to prevent data loss
+ // in the situation where the first read times out and is re-attempted. Consider the following scenario:
+ // 1. Consume from LATEST
+ // 2. No records are consumed and Record Publisher throws retryable error
+ // 3. Restart consumption from LATEST
+ // Any records sent between steps 1 and 3 are lost. Using the timestamp of step 1 allows the consumer to
+ // restart from shard position of step 1, and hence no records are lost.
+ return StartingPosition.fromTimestamp(new Date());
+ } else if (SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get().equals(sequenceNumber)) {
Date timestamp = KinesisConfigUtil.parseStreamTimestampStartingPosition(configProps);
return StartingPosition.fromTimestamp(timestamp);
} else {
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
index c4073c3..2326314 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
@@ -34,7 +34,9 @@ import software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider
import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.Http2Configuration;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.profiles.ProfileFile;
import software.amazon.awssdk.regions.Region;
@@ -50,12 +52,19 @@ import java.time.Duration;
import java.util.Optional;
import java.util.Properties;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_EFO_HTTP_CLIENT_MAX_CONURRENCY;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_HTTP_CLIENT_MAX_CONCURRENCY;
+
/**
* Utility methods specific to Amazon Web Service SDK v2.x.
*/
@Internal
public class AwsV2Util {
+ private static final int INITIAL_WINDOW_SIZE_BYTES = 512 * 1024; // 512 KB
+ private static final Duration HEALTH_CHECK_PING_PERIOD = Duration.ofSeconds(60);
+ private static final Duration CONNECTION_ACQUISITION_TIMEOUT = Duration.ofSeconds(60);
+
/**
* Creates an Amazon Kinesis Async Client from the provided properties.
* Configuration is copied from AWS SDK v1 configuration class as per:
@@ -65,8 +74,8 @@ public class AwsV2Util {
* @return a new Amazon Kinesis Client
*/
public static KinesisAsyncClient createKinesisAsyncClient(final Properties configProps) {
- final ClientConfiguration config = new ClientConfigurationFactory().getConfig();
- return createKinesisAsyncClient(configProps, config);
+ ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig();
+ return createKinesisAsyncClient(configProps, clientConfiguration);
}
/**
@@ -79,7 +88,7 @@ public class AwsV2Util {
* @return a new Amazon Kinesis Client
*/
public static KinesisAsyncClient createKinesisAsyncClient(final Properties configProps, final ClientConfiguration config) {
- final SdkAsyncHttpClient httpClient = createHttpClient(config, NettyNioAsyncHttpClient.builder());
+ final SdkAsyncHttpClient httpClient = createHttpClient(config, NettyNioAsyncHttpClient.builder(), configProps);
final ClientOverrideConfiguration overrideConfiguration = createClientOverrideConfiguration(config, ClientOverrideConfiguration.builder());
final KinesisAsyncClientBuilder clientBuilder = KinesisAsyncClient.builder();
@@ -89,13 +98,27 @@ public class AwsV2Util {
@VisibleForTesting
static SdkAsyncHttpClient createHttpClient(
final ClientConfiguration config,
- final NettyNioAsyncHttpClient.Builder httpClientBuilder) {
+ final NettyNioAsyncHttpClient.Builder httpClientBuilder,
+ final Properties consumerConfig) {
+
+ int maxConcurrency = Optional
+ .ofNullable(consumerConfig.getProperty(EFO_HTTP_CLIENT_MAX_CONCURRENCY))
+ .map(Integer::parseInt)
+ .orElse(DEFAULT_EFO_HTTP_CLIENT_MAX_CONURRENCY);
+
httpClientBuilder
- .maxConcurrency(config.getMaxConnections())
+ .maxConcurrency(maxConcurrency)
.connectionTimeout(Duration.ofMillis(config.getConnectionTimeout()))
.writeTimeout(Duration.ofMillis(config.getSocketTimeout()))
.connectionMaxIdleTime(Duration.ofMillis(config.getConnectionMaxIdleMillis()))
- .useIdleConnectionReaper(config.useReaper());
+ .useIdleConnectionReaper(config.useReaper())
+ .protocol(Protocol.HTTP2)
+ .connectionAcquisitionTimeout(CONNECTION_ACQUISITION_TIMEOUT)
+ .http2Configuration(Http2Configuration
+ .builder()
+ .healthCheckPingPeriod(HEALTH_CHECK_PING_PERIOD)
+ .initialWindowSize(INITIAL_WINDOW_SIZE_BYTES)
+ .build());
if (config.getConnectionTTL() > -1) {
httpClientBuilder.connectionTimeToLive(Duration.ofMillis(config.getConnectionTTL()));
@@ -248,4 +271,5 @@ public class AwsV2Util {
public static Region getRegion(final Properties configProps) {
return Region.of(configProps.getProperty(AWSConfigConstants.AWS_REGION));
}
+
}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index fc512e7..9626478 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -247,6 +247,9 @@ public class KinesisConfigUtil {
ConsumerConfigConstants.MAX_SHARD_GETRECORDS_INTERVAL_MILLIS + " milliseconds."
);
}
+
+ validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.EFO_HTTP_CLIENT_MAX_CONCURRENCY,
+ "Invalid value given for EFO HTTP client max concurrency. Must be positive.");
}
/**
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 061120b..8f77d45 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -748,8 +748,9 @@ public class FlinkKinesisConsumerTest extends TestLogger {
new AtomicReference<>(),
new ArrayList<>(),
subscribedStreamsToLastDiscoveredShardIds,
- (props) -> FakeKinesisBehavioursFactory.blockingQueueGetRecords(streamToQueueMap)
- ) {};
+ (props) -> FakeKinesisBehavioursFactory.blockingQueueGetRecords(streamToQueueMap),
+ null) {
+ };
return fetcher;
}
};
@@ -880,9 +881,8 @@ public class FlinkKinesisConsumerTest extends TestLogger {
new AtomicReference<>(),
new ArrayList<>(),
subscribedStreamsToLastDiscoveredShardIds,
- (props) -> FakeKinesisBehavioursFactory.blockingQueueGetRecords(
- streamToQueueMap)
- ) {
+ (props) -> FakeKinesisBehavioursFactory.blockingQueueGetRecords(streamToQueueMap),
+ null) {
@Override
protected void emitWatermark() {
// necessary in this test to ensure that watermark state is updated
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index 478564b..f6de864 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -26,11 +26,13 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kinesis.testutils.AlwaysThrowsDeserializationSchema;
@@ -67,12 +69,16 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import static java.util.Collections.singletonList;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RECORD_PUBLISHER_TYPE;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType.EFO;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
@@ -114,7 +120,7 @@ public class KinesisDataFetcherTest extends TestLogger {
final TestSourceContext<String> sourceContext = new TestSourceContext<>();
final TestableKinesisDataFetcher<String> fetcher = new TestableKinesisDataFetcher<>(
- Collections.singletonList(stream),
+ singletonList(stream),
sourceContext,
TestUtils.getStandardProperties(),
new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
@@ -761,7 +767,7 @@ public class KinesisDataFetcherTest extends TestLogger {
final KinesisDataFetcher<String> fetcher =
new TestableKinesisDataFetcher<String>(
- Collections.singletonList(fakeStream1),
+ singletonList(fakeStream1),
sourceContext,
new java.util.Properties(),
new KinesisDeserializationSchemaWrapper<>(new org.apache.flink.streaming.util.serialization.SimpleStringSchema()),
@@ -825,14 +831,14 @@ public class KinesisDataFetcherTest extends TestLogger {
Map<String, List<BlockingQueue<String>>> streamsToShardQueues = new HashMap<>();
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(10);
queue.put("item1");
- streamsToShardQueues.put(stream, Collections.singletonList(queue));
+ streamsToShardQueues.put(stream, singletonList(queue));
AlwaysThrowsDeserializationSchema deserializationSchema = new AlwaysThrowsDeserializationSchema();
KinesisProxyInterface fakeKinesis =
FakeKinesisBehavioursFactory.blockingQueueGetRecords(streamsToShardQueues);
TestableKinesisDataFetcherForShardConsumerException<String> fetcher = new TestableKinesisDataFetcherForShardConsumerException<>(
- Collections.singletonList(stream),
+ singletonList(stream),
new TestSourceContext<>(),
TestUtils.getStandardProperties(),
new KinesisDeserializationSchemaWrapper<>(deserializationSchema),
@@ -841,7 +847,8 @@ public class KinesisDataFetcherTest extends TestLogger {
new AtomicReference<>(),
new LinkedList<>(),
new HashMap<>(),
- fakeKinesis);
+ fakeKinesis,
+ (sequence, properties, metricGroup, streamShardHandle) -> mock(RecordPublisher.class));
DummyFlinkKinesisConsumer<String> consumer = new DummyFlinkKinesisConsumer<>(
TestUtils.getStandardProperties(), fetcher, 1, 0);
@@ -881,4 +888,32 @@ public class KinesisDataFetcherTest extends TestLogger {
assertTrue("Expected Fetcher to have been interrupted. This test didn't accomplish its goal.",
fetcher.wasInterrupted);
}
+
+ @Test
+ public void testRecordPublisherFactoryIsTornDown() {
+ Properties config = TestUtils.getStandardProperties();
+ config.setProperty(RECORD_PUBLISHER_TYPE, EFO.name());
+
+ KinesisProxyV2Interface kinesisV2 = mock(KinesisProxyV2Interface.class);
+
+ TestableKinesisDataFetcher<String> fetcher =
+ new TestableKinesisDataFetcher<String>(
+ singletonList("fakeStream1"),
+ new TestSourceContext<>(),
+ config,
+ new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
+ 10,
+ 2,
+ new AtomicReference<>(),
+ new LinkedList<>(),
+ new HashMap<>(),
+ mock(KinesisProxyInterface.class),
+ kinesisV2) {
+ };
+
+ fetcher.shutdownFetcher();
+
+ verify(kinesisV2).close();
+ }
+
}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerFanOutTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerFanOutTest.java
new file mode 100644
index 0000000..c9ae709
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerFanOutTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisherFactory;
+import org.apache.flink.streaming.connectors.kinesis.metrics.ShardConsumerMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
+import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory;
+import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.AbstractSingleShardFanOutKinesisV2;
+import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.SingleShardFanOutKinesisV2;
+
+import org.junit.Test;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.util.Properties;
+
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT;
+import static org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumerTestUtils.fakeSequenceNumber;
+import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM;
+import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM;
+import static org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils.efoProperties;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER;
+import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER;
+import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
+
+/**
+ * Tests for the {@link ShardConsumer} using Fan Out consumption mocked Kinesis behaviours.
+ */
+public class ShardConsumerFanOutTest {
+
+ @Test
+ public void testEmptyShard() throws Exception {
+ SingleShardFanOutKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory.emptyShard();
+
+ assertNumberOfMessagesReceivedFromKinesis(0, kinesis, fakeSequenceNumber());
+
+ assertEquals(1, kinesis.getNumberOfSubscribeToShardInvocations());
+ }
+
+ @Test
+ public void testStartFromLatestIsTranslatedToTimestamp() throws Exception {
+ Instant now = Instant.now();
+ SingleShardFanOutKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory.boundedShard().build();
+ SequenceNumber sequenceNumber = SENTINEL_LATEST_SEQUENCE_NUM.get();
+
+ // Fake behaviour defaults to 10 messages
+ assertNumberOfMessagesReceivedFromKinesis(10, kinesis, sequenceNumber, efoProperties());
+
+ StartingPosition actual = kinesis.getStartingPositionForSubscription(0);
+ assertEquals(AT_TIMESTAMP, actual.type());
+ assertTrue(now.equals(actual.timestamp()) || now.isBefore(actual.timestamp()));
+ }
+
+ @Test
+ public void testStartFromLatestReceivesNoRecordsContinuesToUseTimestamp() throws Exception {
+ AbstractSingleShardFanOutKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory.emptyBatchFollowedBySingleRecord();
+
+ SequenceNumber sequenceNumber = SENTINEL_LATEST_SEQUENCE_NUM.get();
+
+ // Fake behaviour defaults to 10 messages
+ assertNumberOfMessagesReceivedFromKinesis(1, kinesis, sequenceNumber, efoProperties());
+
+ // This fake Kinesis will give 2 subscriptions
+ assertEquals(2, kinesis.getNumberOfSubscribeToShardInvocations());
+
+ assertEquals(AT_TIMESTAMP, kinesis.getStartingPositionForSubscription(0).type());
+ assertEquals(AT_TIMESTAMP, kinesis.getStartingPositionForSubscription(1).type());
+ }
+
+ @Test
+ public void testBoundedShardConsumesFromTimestamp() throws Exception {
+ String format = "yyyy-MM-dd'T'HH:mm";
+ String timestamp = "2020-07-02T09:14";
+ Instant expectedTimestamp = new SimpleDateFormat(format).parse(timestamp).toInstant();
+
+ SingleShardFanOutKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory.boundedShard().build();
+
+ Properties consumerConfig = efoProperties();
+ consumerConfig.setProperty(STREAM_INITIAL_TIMESTAMP, timestamp);
+ consumerConfig.setProperty(STREAM_TIMESTAMP_DATE_FORMAT, format);
+ SequenceNumber sequenceNumber = SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get();
+
+ // Fake behaviour defaults to 10 messages
+ assertNumberOfMessagesReceivedFromKinesis(10, kinesis, sequenceNumber, consumerConfig);
+
+ StartingPosition actual = kinesis.getStartingPositionForSubscription(0);
+ assertEquals(AT_TIMESTAMP, actual.type());
+ assertEquals(expectedTimestamp, actual.timestamp());
+ }
+
+ @Test
+ public void testMillisBehindReported() throws Exception {
+ SingleShardFanOutKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory
+ .boundedShard()
+ .withMillisBehindLatest(123L)
+ .build();
+
+ // Fake behaviour defaults to 10 messages
+ ShardConsumerMetricsReporter metrics = assertNumberOfMessagesReceivedFromKinesis(10, kinesis, fakeSequenceNumber());
+
+ assertEquals(123L, metrics.getMillisBehindLatest());
+ }
+
+ @Test
+ public void testBoundedShardConsumesCorrectNumberOfMessages() throws Exception {
+ SingleShardFanOutKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory
+ .boundedShard()
+ .withBatchCount(10)
+ .withRecordsPerBatch(5)
+ .build();
+
+ // 10 batches of 5 records = 50
+ assertNumberOfMessagesReceivedFromKinesis(50, kinesis, fakeSequenceNumber());
+
+ assertEquals(1, kinesis.getNumberOfSubscribeToShardInvocations());
+ }
+
+ @Test
+ public void testBoundedShardResubscribesToShard() throws Exception {
+ SingleShardFanOutKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory
+ .boundedShard()
+ .withBatchCount(100)
+ .withRecordsPerBatch(10)
+ .withBatchesPerSubscription(5)
+ .build();
+
+ // 100 batches of 10 records = 1000
+ assertNumberOfMessagesReceivedFromKinesis(1000, kinesis, fakeSequenceNumber());
+
+ // 100 batches / 5 batches per subscription = 20 subscriptions
+ assertEquals(20, kinesis.getNumberOfSubscribeToShardInvocations());
+
+ // Starting from non-aggregated sequence number means we should start AFTER the sequence number
+ assertEquals(AFTER_SEQUENCE_NUMBER, kinesis.getStartingPositionForSubscription(0).type());
+ }
+
+ @Test
+ public void testBoundedShardWithAggregatedRecords() throws Exception {
+ SingleShardFanOutKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory
+ .boundedShard()
+ .withBatchCount(100)
+ .withRecordsPerBatch(10)
+ .withAggregationFactor(100)
+ .build();
+
+ // 100 batches of 10 records * 100 aggregation factor = 100000
+ assertNumberOfMessagesReceivedFromKinesis(100000, kinesis, fakeSequenceNumber());
+ }
+
+ @Test
+ public void testBoundedShardResumingConsumptionFromAggregatedSubsequenceNumber() throws Exception {
+ SingleShardFanOutKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory
+ .boundedShard()
+ .withBatchCount(10)
+ .withRecordsPerBatch(1)
+ .withAggregationFactor(10)
+ .build();
+
+ SequenceNumber subsequenceNumber = new SequenceNumber("1", 5);
+
+ // 10 batches of 1 record * 10 aggregation factor - 6 previously consumed subsequence records (0,1,2,3,4,5) = 94
+ assertNumberOfMessagesReceivedFromKinesis(94, kinesis, subsequenceNumber);
+
+ // Starting from aggregated sequence number means we should start AT the sequence number
+ assertEquals(AT_SEQUENCE_NUMBER, kinesis.getStartingPositionForSubscription(0).type());
+ }
+
+ @Test
+ public void testSubscribeToShardUsesCorrectStartingSequenceNumbers() throws Exception {
+ SingleShardFanOutKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory
+ .boundedShard()
+ .withBatchCount(10)
+ .withRecordsPerBatch(1)
+ .withBatchesPerSubscription(2)
+ .build();
+
+ // 10 batches of 1 records = 10
+ assertNumberOfMessagesReceivedFromKinesis(10, kinesis, new SequenceNumber("0"));
+
+ // 10 batches / 2 batches per subscription = 5 subscriptions
+ assertEquals(5, kinesis.getNumberOfSubscribeToShardInvocations());
+
+ // Starting positions should correlate to the last consumed sequence number
+ assertStartingPositionAfterSequenceNumber(kinesis.getStartingPositionForSubscription(0), "0");
+ assertStartingPositionAfterSequenceNumber(kinesis.getStartingPositionForSubscription(1), "2");
+ assertStartingPositionAfterSequenceNumber(kinesis.getStartingPositionForSubscription(2), "4");
+ assertStartingPositionAfterSequenceNumber(kinesis.getStartingPositionForSubscription(3), "6");
+ assertStartingPositionAfterSequenceNumber(kinesis.getStartingPositionForSubscription(4), "8");
+ }
+
+ private void assertStartingPositionAfterSequenceNumber(
+ final StartingPosition startingPosition,
+ final String sequenceNumber) {
+ assertEquals(AFTER_SEQUENCE_NUMBER, startingPosition.type());
+ assertEquals(sequenceNumber, startingPosition.sequenceNumber());
+ }
+
+ private ShardConsumerMetricsReporter assertNumberOfMessagesReceivedFromKinesis(
+ final int expectedNumberOfMessages,
+ final KinesisProxyV2Interface kinesis,
+ final SequenceNumber startingSequenceNumber) throws Exception {
+ return assertNumberOfMessagesReceivedFromKinesis(
+ expectedNumberOfMessages,
+ kinesis,
+ startingSequenceNumber,
+ efoProperties());
+ }
+
+ private ShardConsumerMetricsReporter assertNumberOfMessagesReceivedFromKinesis(
+ final int expectedNumberOfMessages,
+ final KinesisProxyV2Interface kinesis,
+ final SequenceNumber startingSequenceNumber,
+ final Properties consumerConfig) throws Exception {
+ return ShardConsumerTestUtils.assertNumberOfMessagesReceivedFromKinesis(
+ expectedNumberOfMessages,
+ new FanOutRecordPublisherFactory(kinesis),
+ startingSequenceNumber,
+ consumerConfig);
+ }
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
index b39b99e..40a599c 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
@@ -17,52 +17,32 @@
package org.apache.flink.streaming.connectors.kinesis.internals;
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisherFactory;
import org.apache.flink.streaming.connectors.kinesis.metrics.ShardConsumerMetricsReporter;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
-import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
-import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
-import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory;
-import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
-import org.apache.flink.streaming.connectors.kinesis.testutils.TestSourceContext;
-import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcher;
-import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
-
-import com.amazonaws.services.kinesis.model.HashKeyRange;
-import com.amazonaws.services.kinesis.model.Shard;
-import org.apache.commons.lang3.StringUtils;
+
import org.junit.Test;
-import org.mockito.Mockito;
-import java.math.BigInteger;
import java.text.SimpleDateFormat;
-import java.util.Collections;
import java.util.Date;
-import java.util.LinkedList;
import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS;
import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP;
import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT;
+import static org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumerTestUtils.fakeSequenceNumber;
import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM;
import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM;
-import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
/**
- * Tests for the {@link ShardConsumer}.
+ * Tests for the {@link ShardConsumer} using Polling consumption mocked Kinesis behaviours.
*/
public class ShardConsumerTest {
@@ -161,83 +141,24 @@ public class ShardConsumerTest {
verify(kinesis).getShardIterator(any(), eq("AT_SEQUENCE_NUMBER"), eq("0"));
}
- private SequenceNumber fakeSequenceNumber() {
- return new SequenceNumber("fakeStartingState");
- }
-
private ShardConsumerMetricsReporter assertNumberOfMessagesReceivedFromKinesis(
- final int expectedNumberOfMessages,
- final KinesisProxyInterface kinesis,
- final SequenceNumber startingSequenceNumber) throws Exception {
+ final int expectedNumberOfMessages,
+ final KinesisProxyInterface kinesis,
+ final SequenceNumber startingSequenceNumber) throws Exception {
return assertNumberOfMessagesReceivedFromKinesis(expectedNumberOfMessages, kinesis, startingSequenceNumber, new Properties());
}
private ShardConsumerMetricsReporter assertNumberOfMessagesReceivedFromKinesis(
- final int expectedNumberOfMessages,
- final KinesisProxyInterface kinesis,
- final SequenceNumber startingSequenceNumber,
- final Properties consumerProperties) throws Exception {
- ShardConsumerMetricsReporter shardMetricsReporter = new ShardConsumerMetricsReporter(mock(MetricGroup.class));
-
- StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);
-
- LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
- subscribedShardsStateUnderTest.add(
- new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
- fakeToBeConsumedShard, startingSequenceNumber));
-
- TestSourceContext<String> sourceContext = new TestSourceContext<>();
-
- KinesisDeserializationSchemaWrapper<String> deserializationSchema = new KinesisDeserializationSchemaWrapper<>(
- new SimpleStringSchema());
- TestableKinesisDataFetcher<String> fetcher =
- new TestableKinesisDataFetcher<>(
- Collections.singletonList("fakeStream"),
- sourceContext,
- consumerProperties,
- deserializationSchema,
- 10,
- 2,
- new AtomicReference<>(),
- subscribedShardsStateUnderTest,
- KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
- Mockito.mock(KinesisProxyInterface.class));
-
- final StreamShardHandle shardHandle = subscribedShardsStateUnderTest.get(0).getStreamShardHandle();
- SequenceNumber lastProcessedSequenceNum = subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum();
- StartingPosition startingPosition = AWSUtil.getStartingPosition(lastProcessedSequenceNum, consumerProperties);
-
- final RecordPublisher recordPublisher = new PollingRecordPublisherFactory(config -> kinesis)
- .create(startingPosition, fetcher.getConsumerConfiguration(), mock(MetricGroup.class), shardHandle);
-
- int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
- new ShardConsumer<>(
- fetcher,
- recordPublisher,
- shardIndex,
- shardHandle,
- lastProcessedSequenceNum,
- shardMetricsReporter,
- deserializationSchema)
- .run();
-
- assertEquals(expectedNumberOfMessages, sourceContext.getCollectedOutputs().size());
- assertEquals(
- SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
- subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum());
-
- return shardMetricsReporter;
- }
-
- private static StreamShardHandle getMockStreamShard(String streamName, int shardId) {
- return new StreamShardHandle(
- streamName,
- new Shard()
- .withShardId(KinesisShardIdGenerator.generateFromShardOrder(shardId))
- .withHashKeyRange(
- new HashKeyRange()
- .withStartingHashKey("0")
- .withEndingHashKey(new BigInteger(StringUtils.repeat("FF", 16), 16).toString())));
+ final int expectedNumberOfMessages,
+ final KinesisProxyInterface kinesis,
+ final SequenceNumber startingSequenceNumber,
+ final Properties consumerProperties) throws Exception {
+
+ return ShardConsumerTestUtils.assertNumberOfMessagesReceivedFromKinesis(
+ expectedNumberOfMessages,
+ new PollingRecordPublisherFactory(config -> kinesis),
+ startingSequenceNumber,
+ consumerProperties);
}
}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTestUtils.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTestUtils.java
new file mode 100644
index 0000000..3be976a
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTestUtils.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisherFactory;
+import org.apache.flink.streaming.connectors.kinesis.metrics.ShardConsumerMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestSourceContext;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcher;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+
+import com.amazonaws.services.kinesis.model.HashKeyRange;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.commons.lang3.StringUtils;
+import org.mockito.Mockito;
+
+import java.math.BigInteger;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the {@link ShardConsumer}.
+ */
+public class ShardConsumerTestUtils {
+
+ public static <T> ShardConsumerMetricsReporter assertNumberOfMessagesReceivedFromKinesis(
+ final int expectedNumberOfMessages,
+ final RecordPublisherFactory recordPublisherFactory,
+ final SequenceNumber startingSequenceNumber,
+ final Properties consumerProperties) throws InterruptedException {
+ ShardConsumerMetricsReporter shardMetricsReporter = new ShardConsumerMetricsReporter(mock(MetricGroup.class));
+
+ StreamShardHandle fakeToBeConsumedShard = getMockStreamShard("fakeStream", 0);
+
+ LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
+ subscribedShardsStateUnderTest.add(
+ new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
+ fakeToBeConsumedShard, startingSequenceNumber));
+
+ TestSourceContext<String> sourceContext = new TestSourceContext<>();
+
+ KinesisDeserializationSchemaWrapper<String> deserializationSchema = new KinesisDeserializationSchemaWrapper<>(
+ new SimpleStringSchema());
+ TestableKinesisDataFetcher<String> fetcher =
+ new TestableKinesisDataFetcher<>(
+ Collections.singletonList("fakeStream"),
+ sourceContext,
+ consumerProperties,
+ deserializationSchema,
+ 10,
+ 2,
+ new AtomicReference<>(),
+ subscribedShardsStateUnderTest,
+ KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
+ Mockito.mock(KinesisProxyInterface.class),
+ Mockito.mock(KinesisProxyV2Interface.class));
+
+ final StreamShardHandle shardHandle = subscribedShardsStateUnderTest.get(0).getStreamShardHandle();
+ final SequenceNumber lastProcessedSequenceNum = subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum();
+ final StartingPosition startingPosition = AWSUtil.getStartingPosition(lastProcessedSequenceNum, consumerProperties);
+
+ final RecordPublisher recordPublisher = recordPublisherFactory
+ .create(startingPosition, fetcher.getConsumerConfiguration(), mock(MetricGroup.class), shardHandle);
+
+ int shardIndex = fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
+ new ShardConsumer<>(
+ fetcher,
+ recordPublisher,
+ shardIndex,
+ shardHandle,
+ lastProcessedSequenceNum,
+ shardMetricsReporter,
+ deserializationSchema)
+ .run();
+
+ assertEquals(expectedNumberOfMessages, sourceContext.getCollectedOutputs().size());
+ assertEquals(
+ SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
+ subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum());
+
+ return shardMetricsReporter;
+ }
+
+ public static StreamShardHandle getMockStreamShard(String streamName, int shardId) {
+ return new StreamShardHandle(
+ streamName,
+ new Shard()
+ .withShardId(KinesisShardIdGenerator.generateFromShardOrder(shardId))
+ .withHashKeyRange(
+ new HashKeyRange()
+ .withStartingHashKey("0")
+ .withEndingHashKey(new BigInteger(StringUtils.repeat("FF", 16), 16).toString())));
+ }
+
+ public static SequenceNumber fakeSequenceNumber() {
+ return new SequenceNumber("fakeStartingState");
+ }
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfigurationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfigurationTest.java
index ebeaa33..2b4ee99 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfigurationTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfigurationTest.java
@@ -1,4 +1,3 @@
-
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherTest.java
new file mode 100644
index 0000000..97aef9f
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherTest.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.proxy.FullJitterBackoff;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
+import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory;
+import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.SingleShardFanOutKinesisV2;
+import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.SubscriptionErrorKinesisV2;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils.TestConsumer;
+
+import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
+import org.hamcrest.Matchers;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+
+import java.nio.ByteBuffer;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.emptyList;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_CONSUMER_NAME;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RECORD_PUBLISHER_TYPE;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType.EFO;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_BASE;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_MAX;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_RETRIES;
+import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
+import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
+import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM;
+import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM;
+import static org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.SubscriptionErrorKinesisV2.NUMBER_OF_SUBSCRIPTIONS;
+import static org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.emptyShard;
+import static org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.singletonShard;
+import static org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils.createDummyStreamShardHandle;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.anyDouble;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER;
+import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER;
+import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
+import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.LATEST;
+import static software.amazon.awssdk.services.kinesis.model.ShardIteratorType.TRIM_HORIZON;
+
+/**
+ * Tests for {@link FanOutRecordPublisher}.
+ */
+public class FanOutRecordPublisherTest {
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ private static final long EXPECTED_SUBSCRIBE_TO_SHARD_MAX = 1;
+ private static final long EXPECTED_SUBSCRIBE_TO_SHARD_BASE = 2;
+ private static final double EXPECTED_SUBSCRIBE_TO_SHARD_POW = 0.5;
+ private static final int EXPECTED_SUBSCRIBE_TO_SHARD_RETRIES = 3;
+
+ private static final String DUMMY_SEQUENCE = "1";
+
+ private static final SequenceNumber SEQUENCE_NUMBER = new SequenceNumber(DUMMY_SEQUENCE);
+
+ private static final SequenceNumber AGGREGATED_SEQUENCE_NUMBER = new SequenceNumber(DUMMY_SEQUENCE, 1L);
+
+ @Test
+ public void testToSdkV2StartingPositionAfterSequenceNumber() throws Exception {
+ SingleShardFanOutKinesisV2 kinesis = emptyShard();
+
+ RecordPublisher publisher = createRecordPublisher(kinesis, StartingPosition.continueFromSequenceNumber(SEQUENCE_NUMBER));
+ publisher.run(new TestConsumer());
+
+ assertEquals(DUMMY_SEQUENCE, kinesis.getStartingPositionForSubscription(0).sequenceNumber());
+ assertEquals(AFTER_SEQUENCE_NUMBER, kinesis.getStartingPositionForSubscription(0).type());
+ }
+
+ @Test
+ public void testToSdkV2StartingPositionAtSequenceNumber() throws Exception {
+ SingleShardFanOutKinesisV2 kinesis = emptyShard();
+
+ RecordPublisher publisher = createRecordPublisher(kinesis, StartingPosition.restartFromSequenceNumber(AGGREGATED_SEQUENCE_NUMBER));
+ publisher.run(new TestConsumer());
+
+ assertEquals(DUMMY_SEQUENCE, kinesis.getStartingPositionForSubscription(0).sequenceNumber());
+ assertEquals(AT_SEQUENCE_NUMBER, kinesis.getStartingPositionForSubscription(0).type());
+ }
+
+ @Test
+ public void testToSdkV2StartingPositionLatest() throws Exception {
+ SingleShardFanOutKinesisV2 kinesis = emptyShard();
+
+ RecordPublisher publisher = createRecordPublisher(kinesis, latest());
+ publisher.run(new TestConsumer());
+
+ assertNull(kinesis.getStartingPositionForSubscription(0).sequenceNumber());
+ assertEquals(LATEST, kinesis.getStartingPositionForSubscription(0).type());
+ }
+
+ @Test
+ public void testToSdkV2StartingPositionTrimHorizon() throws Exception {
+ SingleShardFanOutKinesisV2 kinesis = emptyShard();
+
+ RecordPublisher publisher = createRecordPublisher(kinesis, StartingPosition.continueFromSequenceNumber(SENTINEL_EARLIEST_SEQUENCE_NUM.get()));
+ publisher.run(new TestConsumer());
+
+ assertNull(kinesis.getStartingPositionForSubscription(0).sequenceNumber());
+ assertEquals(TRIM_HORIZON, kinesis.getStartingPositionForSubscription(0).type());
+ }
+
+ @Test
+ public void testToSdkV2StartingPositionAtTimeStamp() throws Exception {
+ SingleShardFanOutKinesisV2 kinesis = emptyShard();
+ Date now = new Date();
+
+ RecordPublisher publisher = createRecordPublisher(kinesis, StartingPosition.fromTimestamp(now));
+ publisher.run(new TestConsumer());
+
+ assertEquals(now.toInstant(), kinesis.getStartingPositionForSubscription(0).timestamp());
+ assertEquals(AT_TIMESTAMP, kinesis.getStartingPositionForSubscription(0).type());
+ }
+
+ @Test
+ public void testToSdkV1Records() throws Exception {
+ Date now = new Date();
+ byte[] data = new byte[] { 0, 1, 2, 3 };
+
+ Record record = Record
+ .builder()
+ .approximateArrivalTimestamp(now.toInstant())
+ .partitionKey("pk")
+ .sequenceNumber("sn")
+ .data(SdkBytes.fromByteArray(data))
+ .build();
+
+ KinesisProxyV2Interface kinesis = singletonShard(createSubscribeToShardEvent(record));
+ RecordPublisher publisher = createRecordPublisher(kinesis, latest());
+
+ TestConsumer consumer = new TestConsumer();
+ publisher.run(consumer);
+
+ UserRecord actual = consumer.getRecordBatches().get(0).getDeaggregatedRecords().get(0);
+ assertFalse(actual.isAggregated());
+ assertEquals(now, actual.getApproximateArrivalTimestamp());
+ assertEquals("sn", actual.getSequenceNumber());
+ assertEquals("pk", actual.getPartitionKey());
+ assertThat(toByteArray(actual.getData()), Matchers.equalTo(data));
+ }
+
+ @Test
+ public void testExceptionThrownInConsumerPropagatesToRecordPublisher() throws Exception {
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("An error thrown from the consumer");
+
+ SingleShardFanOutKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory.boundedShard().build();
+ RecordPublisher recordPublisher = createRecordPublisher(kinesis);
+
+ recordPublisher.run(batch -> {
+ throw new RuntimeException("An error thrown from the consumer");
+ });
+ }
+
+ @Test
+ public void testResourceNotFoundWhenObtainingSubscriptionTerminatesApplication() throws Exception {
+ thrown.expect(ResourceNotFoundException.class);
+
+ KinesisProxyV2Interface kinesis = FakeKinesisFanOutBehavioursFactory.resourceNotFoundWhenObtainingSubscription();
+ RecordPublisher recordPublisher = createRecordPublisher(kinesis);
+
+ recordPublisher.run(new TestConsumer());
+ }
+
+ @Test
+ public void testShardConsumerCompletesIfResourceNotFoundExceptionThrownFromSubscription() throws Exception {
+ ResourceNotFoundException exception = ResourceNotFoundException.builder().build();
+ SubscriptionErrorKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(exception);
+ RecordPublisher recordPublisher = createRecordPublisher(kinesis);
+ TestConsumer consumer = new TestConsumer();
+
+ assertEquals(COMPLETE, recordPublisher.run(consumer));
+
+ // Will exit on the first subscription
+ assertEquals(1, kinesis.getNumberOfSubscribeToShardInvocations());
+ }
+
+ @Test
+ public void testShardConsumerRetriesIfLimitExceededExceptionThrownFromSubscription() throws Exception {
+ LimitExceededException exception = LimitExceededException.builder().build();
+ SubscriptionErrorKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(exception);
+ RecordPublisher recordPublisher = createRecordPublisher(kinesis);
+ TestConsumer consumer = new TestConsumer();
+
+ int count = 0;
+ while (recordPublisher.run(consumer) == INCOMPLETE) {
+ if (++count > NUMBER_OF_SUBSCRIPTIONS + 1) {
+ break;
+ }
+ }
+
+ // An exception is thrown on the 5th subscription and then the subscription completes on the next
+ assertEquals(NUMBER_OF_SUBSCRIPTIONS + 1, kinesis.getNumberOfSubscribeToShardInvocations());
+ }
+
+ @Test
+ public void testSubscribeToShardBacksOffForRetryableError() throws Exception {
+ LimitExceededException retryableError = LimitExceededException.builder().build();
+ SubscriptionErrorKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(retryableError);
+ FanOutRecordPublisherConfiguration configuration = createConfiguration();
+
+ FullJitterBackoff backoff = mock(FullJitterBackoff.class);
+ when(backoff.calculateFullJitterBackoff(anyLong(), anyLong(), anyDouble(), anyInt())).thenReturn(100L);
+
+ new FanOutRecordPublisher(latest(), "arn", createDummyStreamShardHandle(), kinesis, configuration, backoff)
+ .run(new TestConsumer());
+
+ verify(backoff).calculateFullJitterBackoff(
+ EXPECTED_SUBSCRIBE_TO_SHARD_BASE,
+ EXPECTED_SUBSCRIBE_TO_SHARD_MAX,
+ EXPECTED_SUBSCRIBE_TO_SHARD_POW,
+ 1
+ );
+
+ verify(backoff).sleep(100L);
+ }
+
+ @Test
+ public void testSubscribeToShardFailsWhenMaxRetriesExceeded() throws Exception {
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("Maximum reties exceeded for SubscribeToShard. Failed 3 times.");
+
+ Properties efoProperties = createEfoProperties();
+ efoProperties.setProperty(SUBSCRIBE_TO_SHARD_RETRIES, String.valueOf(EXPECTED_SUBSCRIBE_TO_SHARD_RETRIES));
+ FanOutRecordPublisherConfiguration configuration = new FanOutRecordPublisherConfiguration(efoProperties, emptyList());
+
+ LimitExceededException retryableError = LimitExceededException.builder().build();
+ SubscriptionErrorKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(retryableError);
+ FullJitterBackoff backoff = mock(FullJitterBackoff.class);
+
+ FanOutRecordPublisher recordPublisher = new FanOutRecordPublisher(latest(), "arn", createDummyStreamShardHandle(), kinesis, configuration, backoff);
+
+ int count = 0;
+ while (recordPublisher.run(new TestConsumer()) == INCOMPLETE) {
+ if (++count > 3) {
+ break;
+ }
+ }
+ }
+
+ @Test
+ public void testSubscribeToShardBacksOffAttemptIncreases() throws Exception {
+ LimitExceededException retryableError = LimitExceededException.builder().build();
+ SubscriptionErrorKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(retryableError);
+ FanOutRecordPublisherConfiguration configuration = createConfiguration();
+
+ FullJitterBackoff backoff = mock(FullJitterBackoff.class);
+
+ FanOutRecordPublisher recordPublisher = new FanOutRecordPublisher(latest(), "arn", createDummyStreamShardHandle(), kinesis, configuration, backoff);
+
+ recordPublisher.run(new TestConsumer());
+ recordPublisher.run(new TestConsumer());
+ recordPublisher.run(new TestConsumer());
+
+ verify(backoff).calculateFullJitterBackoff(anyLong(), anyLong(), anyDouble(), eq(1));
+ verify(backoff).calculateFullJitterBackoff(anyLong(), anyLong(), anyDouble(), eq(2));
+ verify(backoff).calculateFullJitterBackoff(anyLong(), anyLong(), anyDouble(), eq(3));
+
+ verify(backoff, never()).calculateFullJitterBackoff(anyLong(), anyLong(), anyDouble(), eq(0));
+ verify(backoff, never()).calculateFullJitterBackoff(anyLong(), anyLong(), anyDouble(), eq(4));
+ }
+
+ @Test
+ public void testBackOffAttemptResetsWithSuccessfulSubscription() throws Exception {
+ SubscriptionErrorKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory.alternatingSuccessErrorDuringSubscription();
+ FanOutRecordPublisherConfiguration configuration = createConfiguration();
+
+ FullJitterBackoff backoff = mock(FullJitterBackoff.class);
+
+ FanOutRecordPublisher recordPublisher = new FanOutRecordPublisher(latest(), "arn", createDummyStreamShardHandle(), kinesis, configuration, backoff);
+
+ recordPublisher.run(new TestConsumer());
+ recordPublisher.run(new TestConsumer());
+ recordPublisher.run(new TestConsumer());
+
+ // Expecting:
+ // - first attempt to fail, and backoff attempt #1
+ // - second attempt to succeed, and reset attempt index
+ // - third attempt to fail, and backoff attempt #1
+
+ verify(backoff, times(2)).calculateFullJitterBackoff(anyLong(), anyLong(), anyDouble(), eq(1));
+
+ verify(backoff, never()).calculateFullJitterBackoff(anyLong(), anyLong(), anyDouble(), eq(0));
+ verify(backoff, never()).calculateFullJitterBackoff(anyLong(), anyLong(), anyDouble(), eq(2));
+ }
+
+ @Test
+ public void testRecordDurability() throws Exception {
+ SingleShardFanOutKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory
+ .boundedShard()
+ .withBatchCount(10)
+ .withBatchesPerSubscription(3)
+ .withRecordsPerBatch(12)
+ .build();
+
+ RecordPublisher recordPublisher = createRecordPublisher(kinesis);
+ TestConsumer consumer = new TestConsumer();
+
+ int count = 0;
+ while (recordPublisher.run(consumer) == INCOMPLETE) {
+ if (++count > 4) {
+ break;
+ }
+ }
+
+ List<UserRecord> userRecords = flattenToUserRecords(consumer.getRecordBatches());
+
+ // Should have received 10 * 12 = 120 records
+ assertEquals(120, userRecords.size());
+
+ int expectedSequenceNumber = 1;
+ for (UserRecord record : userRecords) {
+ assertEquals(String.valueOf(expectedSequenceNumber++), record.getSequenceNumber());
+ }
+ }
+
+ @Test
+ public void testAggregatedRecordDurability() throws Exception {
+ SingleShardFanOutKinesisV2 kinesis = FakeKinesisFanOutBehavioursFactory
+ .boundedShard()
+ .withBatchCount(10)
+ .withAggregationFactor(5)
+ .withRecordsPerBatch(12)
+ .build();
+
+ RecordPublisher recordPublisher = createRecordPublisher(kinesis);
+ TestConsumer consumer = new TestConsumer();
+
+ int count = 0;
+ while (recordPublisher.run(consumer) == INCOMPLETE) {
+ if (++count > 5) {
+ break;
+ }
+ }
+
+ List<UserRecord> userRecords = flattenToUserRecords(consumer.getRecordBatches());
+
+ // Should have received 10 * 12 * 5 = 600 records
+ assertEquals(600, userRecords.size());
+
+ int sequence = 1;
+ long subsequence = 0;
+ for (UserRecord userRecord : userRecords) {
+ assertEquals(String.valueOf(sequence), userRecord.getSequenceNumber());
+ assertEquals(subsequence++, userRecord.getSubSequenceNumber());
+
+ if (subsequence == 5) {
+ sequence++;
+ subsequence = 0;
+ }
+ }
+ }
+
+ private List<UserRecord> flattenToUserRecords(final List<RecordBatch> recordBatch) {
+ return recordBatch
+ .stream()
+ .flatMap(b -> b.getDeaggregatedRecords().stream())
+ .collect(Collectors.toList());
+ }
+
+ private byte[] toByteArray(final ByteBuffer byteBuffer) {
+ byte[] dataBytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(dataBytes);
+ return dataBytes;
+ }
+
+ private RecordPublisher createRecordPublisher(final KinesisProxyV2Interface kinesis) {
+ return createRecordPublisher(kinesis, latest());
+ }
+
+ private RecordPublisher createRecordPublisher(final KinesisProxyV2Interface kinesis, final StartingPosition startingPosition) {
+ return new FanOutRecordPublisher(startingPosition, "arn", createDummyStreamShardHandle(), kinesis, createConfiguration(), new FullJitterBackoff());
+ }
+
+ private FanOutRecordPublisherConfiguration createConfiguration() {
+ return new FanOutRecordPublisherConfiguration(createEfoProperties(), emptyList());
+ }
+
+ private Properties createEfoProperties() {
+ Properties config = new Properties();
+ config.setProperty(RECORD_PUBLISHER_TYPE, EFO.name());
+ config.setProperty(EFO_CONSUMER_NAME, "dummy-efo-consumer");
+ config.setProperty(SUBSCRIBE_TO_SHARD_BACKOFF_BASE, String.valueOf(EXPECTED_SUBSCRIBE_TO_SHARD_BASE));
+ config.setProperty(SUBSCRIBE_TO_SHARD_BACKOFF_MAX, String.valueOf(EXPECTED_SUBSCRIBE_TO_SHARD_MAX));
+ config.setProperty(SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT, String.valueOf(EXPECTED_SUBSCRIBE_TO_SHARD_POW));
+ return config;
+ }
+
+ private SubscribeToShardEvent createSubscribeToShardEvent(final Record...records) {
+ return SubscribeToShardEvent
+ .builder()
+ .records(records)
+ .build();
+ }
+
+ private StartingPosition latest() {
+ return StartingPosition.continueFromSequenceNumber(SENTINEL_LATEST_SEQUENCE_NUM.get());
+ }
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherTest.java
index 2bbe3da..6b6107e 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisherTest.java
@@ -18,23 +18,17 @@
package org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;
import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
-import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordBatchConsumer;
import org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
-import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory;
import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils.TestConsumer;
-import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
-import java.util.ArrayList;
-import java.util.List;
-
import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM;
@@ -62,9 +56,9 @@ public class PollingRecordPublisherTest {
TestConsumer consumer = new TestConsumer();
recordPublisher.run(consumer);
- assertEquals(1, consumer.recordBatches.size());
- assertEquals(5, consumer.recordBatches.get(0).getDeaggregatedRecordSize());
- assertEquals(100L, consumer.recordBatches.get(0).getMillisBehindLatest(), 0);
+ assertEquals(1, consumer.getRecordBatches().size());
+ assertEquals(5, consumer.getRecordBatches().get(0).getDeaggregatedRecordSize());
+ assertEquals(100L, consumer.getRecordBatches().get(0).getMillisBehindLatest(), 0);
}
@Test
@@ -146,20 +140,4 @@ public class PollingRecordPublisherTest {
500L);
}
- private static class TestConsumer implements RecordBatchConsumer {
- private final List<RecordBatch> recordBatches = new ArrayList<>();
- private String latestSequenceNumber;
-
- @Override
- public SequenceNumber accept(final RecordBatch batch) {
- recordBatches.add(batch);
-
- if (batch.getDeaggregatedRecordSize() > 0) {
- List<UserRecord> records = batch.getDeaggregatedRecords();
- latestSequenceNumber = records.get(records.size() - 1).getSequenceNumber();
- }
-
- return new SequenceNumber(latestSequenceNumber);
- }
- }
}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Test.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Test.java
new file mode 100644
index 0000000..f7641c3
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Test.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import org.junit.Test;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for {@link KinesisProxyV2}.
+ */
+public class KinesisProxyV2Test {
+
+ @Test
+ public void testSubscribeToShard() {
+ KinesisAsyncClient kinesis = mock(KinesisAsyncClient.class);
+ KinesisProxyV2 proxy = new KinesisProxyV2(kinesis);
+
+ SubscribeToShardRequest request = SubscribeToShardRequest.builder().build();
+ SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
+ .builder()
+ .subscriber(event -> {})
+ .build();
+
+ proxy.subscribeToShard(request, responseHandler);
+
+ verify(kinesis).subscribeToShard(eq(request), eq(responseHandler));
+ }
+
+ @Test
+ public void testCloseInvokesClientClose() {
+ KinesisAsyncClient kinesis = mock(KinesisAsyncClient.class);
+ KinesisProxyV2 proxy = new KinesisProxyV2(kinesis);
+
+ proxy.close();
+
+ verify(kinesis).close();
+ }
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java
new file mode 100644
index 0000000..83e299a
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.testutils;
+
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
+
+import com.amazonaws.kinesis.agg.RecordAggregator;
+import org.apache.commons.lang3.NotImplementedException;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Factory for different kinds of fake Kinesis behaviours using the {@link KinesisProxyV2Interface} interface.
+ */
+public class FakeKinesisFanOutBehavioursFactory {
+
+ public static SingleShardFanOutKinesisV2.Builder boundedShard() {
+ return new SingleShardFanOutKinesisV2.Builder();
+ }
+
+ public static KinesisProxyV2Interface singletonShard(final SubscribeToShardEvent event) {
+ return new SingletonEventFanOutKinesisV2(event);
+ }
+
+ public static SingleShardFanOutKinesisV2 emptyShard() {
+ return new SingleShardFanOutKinesisV2.Builder().withBatchCount(0).build();
+ }
+
+ public static KinesisProxyV2Interface resourceNotFoundWhenObtainingSubscription() {
+ return new ExceptionalKinesisV2(ResourceNotFoundException.builder().build());
+ }
+
+ public static SubscriptionErrorKinesisV2 errorDuringSubscription(final Throwable throwable) {
+ return new SubscriptionErrorKinesisV2(throwable);
+ }
+
+ public static SubscriptionErrorKinesisV2 alternatingSuccessErrorDuringSubscription() {
+ return new AlternatingSubscriptionErrorKinesisV2(LimitExceededException.builder().build());
+ }
+
+ public static AbstractSingleShardFanOutKinesisV2 emptyBatchFollowedBySingleRecord() {
+ return new AbstractSingleShardFanOutKinesisV2(2) {
+ private int subscription = 0;
+
+ @Override
+ void sendEvents(Subscriber<? super SubscribeToShardEventStream> subscriber) {
+ SubscribeToShardEvent.Builder builder = SubscribeToShardEvent
+ .builder()
+ .continuationSequenceNumber(subscription == 0 ? "1" : null);
+
+ if (subscription == 1) {
+ builder.records(createRecord(new AtomicInteger(1)));
+ }
+
+ subscriber.onNext(builder.build());
+ subscription++;
+ }
+ };
+ }
+
+ /**
+ * An unbounded fake Kinesis that offers subscriptions with 5 records, alternating throwing the given exception.
+ * The first subscription is exceptional, second successful, and so on.
+ */
+ private static class AlternatingSubscriptionErrorKinesisV2 extends SubscriptionErrorKinesisV2 {
+
+ int index = 0;
+
+ private AlternatingSubscriptionErrorKinesisV2(final Throwable throwable) {
+ super(throwable);
+ }
+
+ @Override
+ void sendEvents(Subscriber<? super SubscribeToShardEventStream> subscriber) {
+ if (index % 2 == 0) {
+ super.sendEvents(subscriber);
+ } else {
+ super.sendEventBatch(subscriber);
+ subscriber.onComplete();
+ }
+
+ index++;
+ }
+ }
+
+ /**
+ * A fake Kinesis that throws the given exception after sending 5 records.
+ * A total of 5 subscriptions can be acquired.
+ */
+ public static class SubscriptionErrorKinesisV2 extends AbstractSingleShardFanOutKinesisV2 {
+
+ public static final int NUMBER_OF_SUBSCRIPTIONS = 5;
+
+ public static final int NUMBER_OF_EVENTS_PER_SUBSCRIPTION = 5;
+
+ private final Throwable throwable;
+
+ AtomicInteger sequenceNumber = new AtomicInteger();
+
+ private SubscriptionErrorKinesisV2(final Throwable throwable) {
+ super(NUMBER_OF_SUBSCRIPTIONS);
+ this.throwable = throwable;
+ }
+
+ @Override
+ void sendEvents(Subscriber<? super SubscribeToShardEventStream> subscriber) {
+ sendEventBatch(subscriber);
+ subscriber.onError(throwable);
+ }
+
+ void sendEventBatch(Subscriber<? super SubscribeToShardEventStream> subscriber) {
+ for (int i = 0; i < NUMBER_OF_EVENTS_PER_SUBSCRIPTION; i++) {
+ subscriber.onNext(SubscribeToShardEvent
+ .builder()
+ .records(createRecord(sequenceNumber))
+ .continuationSequenceNumber(String.valueOf(i))
+ .build());
+ }
+ }
+ }
+
+ private static class ExceptionalKinesisV2 extends KinesisProxyV2InterfaceAdapter {
+
+ private final RuntimeException exception;
+
+ private ExceptionalKinesisV2(RuntimeException exception) {
+ this.exception = exception;
+ }
+
+ @Override
+ public CompletableFuture<Void> subscribeToShard(SubscribeToShardRequest request, SubscribeToShardResponseHandler responseHandler) {
+ responseHandler.exceptionOccurred(exception);
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+ private static class SingletonEventFanOutKinesisV2 extends AbstractSingleShardFanOutKinesisV2 {
+
+ private final SubscribeToShardEvent event;
+
+ private SingletonEventFanOutKinesisV2(SubscribeToShardEvent event) {
+ super(1);
+ this.event = event;
+ }
+
+ @Override
+ void sendEvents(Subscriber<? super SubscribeToShardEventStream> subscriber) {
+ subscriber.onNext(event);
+ }
+ }
+
+ /**
+ * A fake implementation of KinesisProxyV2 SubscribeToShard that provides dummy records for EFO subscriptions.
+ * Aggregated and non-aggregated records are supported with various batch and subscription sizes.
+ */
+ public static class SingleShardFanOutKinesisV2 extends AbstractSingleShardFanOutKinesisV2 {
+
+ private final int batchesPerSubscription;
+
+ private final int recordsPerBatch;
+
+ private final long millisBehindLatest;
+
+ private final int totalRecords;
+
+ private final int aggregationFactor;
+
+ private final AtomicInteger sequenceNumber = new AtomicInteger();
+
+ private SingleShardFanOutKinesisV2(final Builder builder) {
+ super(builder.getSubscriptionCount());
+ this.batchesPerSubscription = builder.batchesPerSubscription;
+ this.recordsPerBatch = builder.recordsPerBatch;
+ this.millisBehindLatest = builder.millisBehindLatest;
+ this.aggregationFactor = builder.aggregationFactor;
+ this.totalRecords = builder.getTotalRecords();
+ }
+
+ @Override
+ void sendEvents(final Subscriber<? super SubscribeToShardEventStream> subscriber) {
+ SubscribeToShardEvent.Builder eventBuilder = SubscribeToShardEvent
+ .builder()
+ .millisBehindLatest(millisBehindLatest);
+
+ for (int batchIndex = 0; batchIndex < batchesPerSubscription && sequenceNumber.get() < totalRecords; batchIndex++) {
+ List<Record> records = new ArrayList<>();
+
+ for (int i = 0; i < recordsPerBatch; i++) {
+ final Record record;
+
+ if (aggregationFactor == 1) {
+ record = createRecord(sequenceNumber);
+ } else {
+ record = createAggregatedRecord(aggregationFactor, sequenceNumber);
+ }
+
+ records.add(record);
+ }
+
+ eventBuilder.records(records);
+
+ String continuation = sequenceNumber.get() < totalRecords ? String.valueOf(sequenceNumber.get() + 1) : null;
+ eventBuilder.continuationSequenceNumber(continuation);
+
+ subscriber.onNext(eventBuilder.build());
+ }
+ }
+
+ /**
+ * A convenience builder for {@link SingleShardFanOutKinesisV2}.
+ */
+ public static class Builder {
+ private int batchesPerSubscription = 100000;
+ private int recordsPerBatch = 10;
+ private long millisBehindLatest = 0;
+ private int batchCount = 1;
+ private int aggregationFactor = 1;
+
+ public int getSubscriptionCount() {
+ return (int) Math.ceil((double) getTotalRecords() / batchesPerSubscription / recordsPerBatch);
+ }
+
+ public int getTotalRecords() {
+ return batchCount * recordsPerBatch;
+ }
+
+ public Builder withBatchesPerSubscription(final int batchesPerSubscription) {
+ this.batchesPerSubscription = batchesPerSubscription;
+ return this;
+ }
+
+ public Builder withRecordsPerBatch(final int recordsPerBatch) {
+ this.recordsPerBatch = recordsPerBatch;
+ return this;
+ }
+
+ public Builder withBatchCount(final int batchCount) {
+ this.batchCount = batchCount;
+ return this;
+ }
+
+ public Builder withMillisBehindLatest(final long millisBehindLatest) {
+ this.millisBehindLatest = millisBehindLatest;
+ return this;
+ }
+
+ public Builder withAggregationFactor(final int aggregationFactor) {
+ this.aggregationFactor = aggregationFactor;
+ return this;
+ }
+
+ public SingleShardFanOutKinesisV2 build() {
+ return new SingleShardFanOutKinesisV2(this);
+ }
+ }
+ }
+
+ /**
+ * A single shard dummy EFO implementation that provides basic responses and subscription management.
+ * Does not provide any records.
+ */
+ public abstract static class AbstractSingleShardFanOutKinesisV2 extends KinesisProxyV2InterfaceAdapter {
+
+ private final List<SubscribeToShardRequest> requests = new ArrayList<>();
+
+ private int remainingSubscriptions;
+
+ private AbstractSingleShardFanOutKinesisV2(final int remainingSubscriptions) {
+ this.remainingSubscriptions = remainingSubscriptions;
+ }
+
+ public int getNumberOfSubscribeToShardInvocations() {
+ return requests.size();
+ }
+
+ public StartingPosition getStartingPositionForSubscription(final int subscriptionIndex) {
+ assertTrue(subscriptionIndex >= 0);
+ assertTrue(subscriptionIndex < getNumberOfSubscribeToShardInvocations());
+
+ return requests.get(subscriptionIndex).startingPosition();
+ }
+
+ @Override
+ public CompletableFuture<Void> subscribeToShard(
+ final SubscribeToShardRequest request,
+ final SubscribeToShardResponseHandler responseHandler) {
+
+ requests.add(request);
+
+ return CompletableFuture.supplyAsync(() -> {
+ responseHandler.responseReceived(SubscribeToShardResponse.builder().build());
+
+ responseHandler.onEventStream(subscriber -> {
+ subscriber.onSubscribe(mock(Subscription.class));
+
+ if (remainingSubscriptions > 0) {
+ sendEvents(subscriber);
+ remainingSubscriptions--;
+ } else {
+ SubscribeToShardEvent.Builder eventBuilder = SubscribeToShardEvent
+ .builder()
+ .millisBehindLatest(0L)
+ .continuationSequenceNumber(null);
+
+ subscriber.onNext(eventBuilder.build());
+ }
+
+ subscriber.onComplete();
+ });
+
+ return null;
+ });
+ }
+
+ abstract void sendEvents(final Subscriber<? super SubscribeToShardEventStream> subscriber);
+
+ }
+
+ private static class KinesisProxyV2InterfaceAdapter implements KinesisProxyV2Interface {
+
+ @Override
+ public CompletableFuture<Void> subscribeToShard(SubscribeToShardRequest request, SubscribeToShardResponseHandler responseHandler) {
+ throw new NotImplementedException("This method is not implemented.");
+ }
+ }
+
+ private static Record createRecord(final AtomicInteger sequenceNumber) {
+ return createRecord(randomAlphabetic(32).getBytes(UTF_8), sequenceNumber);
+ }
+
+ private static Record createRecord(final byte[] data, final AtomicInteger sequenceNumber) {
+ return Record
+ .builder()
+ .approximateArrivalTimestamp(Instant.now())
+ .data(SdkBytes.fromByteArray(data))
+ .sequenceNumber(String.valueOf(sequenceNumber.incrementAndGet()))
+ .partitionKey("pk")
+ .build();
+ }
+
+ private static Record createAggregatedRecord(final int aggregationFactor, final AtomicInteger sequenceNumber) {
+ RecordAggregator recordAggregator = new RecordAggregator();
+
+ for (int i = 0; i < aggregationFactor; i++) {
+ try {
+ recordAggregator.addUserRecord("pk", randomAlphabetic(32).getBytes(UTF_8));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ return createRecord(recordAggregator.clearAndGet().toRecordBytes(), sequenceNumber);
+ }
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
index 7c5c786..abb186b 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestUtils.java
@@ -19,10 +19,14 @@ package org.apache.flink.streaming.connectors.kinesis.testutils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import com.amazonaws.kinesis.agg.AggRecord;
import com.amazonaws.kinesis.agg.RecordAggregator;
+import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
@@ -39,6 +43,12 @@ import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType.NONE;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_REGISTRATION_TYPE;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RECORD_PUBLISHER_TYPE;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType.EFO;
+
/**
* General test utils.
*/
@@ -116,4 +126,35 @@ public class TestUtils {
return new StreamShardHandle(streamName, shard);
}
+ public static Properties efoProperties() {
+ Properties consumerConfig = new Properties();
+ consumerConfig.setProperty(RECORD_PUBLISHER_TYPE, EFO.name());
+ consumerConfig.setProperty(EFO_REGISTRATION_TYPE, NONE.name());
+ consumerConfig.setProperty(EFO_CONSUMER_ARN_PREFIX + "." + "fakeStream", "stream-consumer-arn");
+ return consumerConfig;
+ }
+
+ /**
+ * A test record consumer used to capture messages from kinesis.
+ */
+ public static class TestConsumer implements RecordPublisher.RecordBatchConsumer {
+ private final List<RecordBatch> recordBatches = new ArrayList<>();
+ private String latestSequenceNumber;
+
+ @Override
+ public SequenceNumber accept(final RecordBatch batch) {
+ recordBatches.add(batch);
+
+ if (batch.getDeaggregatedRecordSize() > 0) {
+ List<UserRecord> records = batch.getDeaggregatedRecords();
+ latestSequenceNumber = records.get(records.size() - 1).getSequenceNumber();
+ }
+
+ return new SequenceNumber(latestSequenceNumber);
+ }
+
+ public List<RecordBatch> getRecordBatches() {
+ return recordBatches;
+ }
+ }
}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
index 3bb11bd..cd32361 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetche
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.mockito.Mockito;
@@ -48,13 +49,39 @@ import static org.mockito.Mockito.when;
*/
public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
- private OneShotLatch runWaiter;
- private OneShotLatch initialDiscoveryWaiter;
- private OneShotLatch shutdownWaiter;
+ private final OneShotLatch runWaiter;
+ private final OneShotLatch initialDiscoveryWaiter;
+ private final OneShotLatch shutdownWaiter;
private volatile boolean running;
public TestableKinesisDataFetcher(
+ List<String> fakeStreams,
+ SourceFunction.SourceContext<T> sourceContext,
+ Properties fakeConfiguration,
+ KinesisDeserializationSchema<T> deserializationSchema,
+ int fakeTotalCountOfSubtasks,
+ int fakeIndexOfThisSubtask,
+ AtomicReference<Throwable> thrownErrorUnderTest,
+ LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest,
+ HashMap<String, String> subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
+ KinesisProxyInterface fakeKinesis) {
+
+ this(
+ fakeStreams,
+ sourceContext,
+ fakeConfiguration,
+ deserializationSchema,
+ fakeTotalCountOfSubtasks,
+ fakeIndexOfThisSubtask,
+ thrownErrorUnderTest,
+ subscribedShardsStateUnderTest,
+ subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
+ fakeKinesis,
+ null);
+ }
+
+ public TestableKinesisDataFetcher(
List<String> fakeStreams,
SourceFunction.SourceContext<T> sourceContext,
Properties fakeConfiguration,
@@ -64,7 +91,8 @@ public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
AtomicReference<Throwable> thrownErrorUnderTest,
LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest,
HashMap<String, String> subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
- KinesisProxyInterface fakeKinesis) {
+ KinesisProxyInterface fakeKinesis,
+ KinesisProxyV2Interface fakeKinesisV2) {
super(
fakeStreams,
sourceContext,
@@ -78,7 +106,8 @@ public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
thrownErrorUnderTest,
subscribedShardsStateUnderTest,
subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
- (properties) -> fakeKinesis);
+ properties -> fakeKinesis,
+ properties -> fakeKinesisV2);
this.runWaiter = new OneShotLatch();
this.initialDiscoveryWaiter = new OneShotLatch();
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java
index 74bb0c5..a45a463 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcherForShardConsumerException.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.testutils;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisherFactory;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
@@ -55,7 +56,8 @@ public class TestableKinesisDataFetcherForShardConsumerException<T> extends Test
final AtomicReference<Throwable> thrownErrorUnderTest,
final LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest,
final HashMap<String, String> subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
- final KinesisProxyInterface fakeKinesis) {
+ final KinesisProxyInterface fakeKinesis,
+ final RecordPublisherFactory recordPublisherFactory) {
super(fakeStreams, sourceContext, fakeConfiguration, deserializationSchema, fakeTotalCountOfSubtasks,
fakeIndexOfThisSubtask, thrownErrorUnderTest, subscribedShardsStateUnderTest,
subscribedStreamsToLastDiscoveredShardIdsStateUnderTest, fakeKinesis);
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtilTest.java
index e66ff51..cdb871b 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtilTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtilTest.java
@@ -40,7 +40,6 @@ import java.util.Date;
import java.util.Properties;
import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
-import static com.amazonaws.services.kinesis.model.ShardIteratorType.LATEST;
import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider.ASSUME_ROLE;
import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider.AUTO;
@@ -52,7 +51,7 @@ import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequen
import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
@@ -61,6 +60,7 @@ import static org.junit.Assert.assertTrue;
@RunWith(PowerMockRunner.class)
@PrepareForTest(AWSUtil.class)
public class AWSUtilTest {
+
@Rule
private final ExpectedException exception = ExpectedException.none();
@@ -214,8 +214,8 @@ public class AWSUtilTest {
public void testGetStartingPositionForLatest() {
StartingPosition position = AWSUtil.getStartingPosition(SENTINEL_LATEST_SEQUENCE_NUM.get(), new Properties());
- assertEquals(LATEST, position.getShardIteratorType());
- assertNull(position.getStartingMarker());
+ assertEquals(AT_TIMESTAMP, position.getShardIteratorType());
+ assertNotNull(position.getStartingMarker());
}
@Test
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
index 0862ec2..4d96ab8 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
@@ -33,6 +33,7 @@ import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsPr
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.Http2Configuration;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
@@ -48,6 +49,8 @@ import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigCons
import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.roleArn;
import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.roleSessionName;
import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.webIdentityTokenFile;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_EFO_HTTP_CLIENT_MAX_CONURRENCY;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_HTTP_CLIENT_MAX_CONCURRENCY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@@ -59,6 +62,7 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static software.amazon.awssdk.http.Protocol.HTTP2;
/**
* Tests for {@link AwsV2Util}.
@@ -227,39 +231,40 @@ public class AwsV2UtilTest {
ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig();
NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder();
- AwsV2Util.createHttpClient(clientConfiguration, builder);
+ AwsV2Util.createHttpClient(clientConfiguration, builder, new Properties());
verify(builder).build();
- verify(builder).maxConcurrency(50);
+ verify(builder).maxConcurrency(DEFAULT_EFO_HTTP_CLIENT_MAX_CONURRENCY);
verify(builder).connectionTimeout(Duration.ofSeconds(10));
verify(builder).writeTimeout(Duration.ofSeconds(50));
verify(builder).connectionMaxIdleTime(Duration.ofMinutes(1));
verify(builder).useIdleConnectionReaper(true);
+ verify(builder).protocol(HTTP2);
verify(builder, never()).connectionTimeToLive(any());
}
@Test
- public void testCreateNettyHttpClientMaxConcurrency() {
+ public void testCreateNettyHttpClientConnectionTimeout() {
ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig();
- clientConfiguration.setMaxConnections(100);
+ clientConfiguration.setConnectionTimeout(1000);
NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder();
- AwsV2Util.createHttpClient(clientConfiguration, builder);
+ AwsV2Util.createHttpClient(clientConfiguration, builder, new Properties());
- verify(builder).maxConcurrency(100);
+ verify(builder).connectionTimeout(Duration.ofSeconds(1));
}
@Test
- public void testCreateNettyHttpClientConnectionTimeout() {
- ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig();
- clientConfiguration.setConnectionTimeout(1000);
+ public void testCreateNettyHttpClientMaxConcurrency() {
+ Properties clientConfiguration = new Properties();
+ clientConfiguration.setProperty(EFO_HTTP_CLIENT_MAX_CONCURRENCY, "123");
NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder();
- AwsV2Util.createHttpClient(clientConfiguration, builder);
+ AwsV2Util.createHttpClient(new ClientConfigurationFactory().getConfig(), builder, clientConfiguration);
- verify(builder).connectionTimeout(Duration.ofSeconds(1));
+ verify(builder).maxConcurrency(123);
}
@Test
@@ -269,7 +274,7 @@ public class AwsV2UtilTest {
NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder();
- AwsV2Util.createHttpClient(clientConfiguration, builder);
+ AwsV2Util.createHttpClient(clientConfiguration, builder, new Properties());
verify(builder).writeTimeout(Duration.ofSeconds(3));
}
@@ -281,7 +286,7 @@ public class AwsV2UtilTest {
NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder();
- AwsV2Util.createHttpClient(clientConfiguration, builder);
+ AwsV2Util.createHttpClient(clientConfiguration, builder, new Properties());
verify(builder).connectionMaxIdleTime(Duration.ofSeconds(2));
}
@@ -293,7 +298,7 @@ public class AwsV2UtilTest {
NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder();
- AwsV2Util.createHttpClient(clientConfiguration, builder);
+ AwsV2Util.createHttpClient(clientConfiguration, builder, new Properties());
verify(builder).useIdleConnectionReaper(false);
}
@@ -305,7 +310,7 @@ public class AwsV2UtilTest {
NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder();
- AwsV2Util.createHttpClient(clientConfiguration, builder);
+ AwsV2Util.createHttpClient(clientConfiguration, builder, new Properties());
verify(builder).connectionTimeToLive(Duration.ofSeconds(5));
}
@@ -383,6 +388,9 @@ public class AwsV2UtilTest {
when(builder.writeTimeout(any())).thenReturn(builder);
when(builder.connectionMaxIdleTime(any())).thenReturn(builder);
when(builder.useIdleConnectionReaper(anyBoolean())).thenReturn(builder);
+ when(builder.connectionAcquisitionTimeout(any())).thenReturn(builder);
+ when(builder.protocol(any())).thenReturn(builder);
+ when(builder.http2Configuration(any(Http2Configuration.class))).thenReturn(builder);
return builder;
}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
index bb0f6a1..ee32b6b 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
@@ -38,6 +38,7 @@ import java.util.Properties;
import java.util.stream.Collectors;
import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_HTTP_CLIENT_MAX_CONCURRENCY;
import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP;
import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT;
import static org.junit.Assert.assertEquals;
@@ -50,7 +51,7 @@ import static org.junit.Assert.fail;
public class KinesisConfigUtilTest {
@Rule
- private ExpectedException exception = ExpectedException.none();
+ public ExpectedException exception = ExpectedException.none();
// ----------------------------------------------------------------------
// getValidatedProducerConfiguration() tests
@@ -214,6 +215,7 @@ public class KinesisConfigUtilTest {
// ----------------------------------------------------------------------
// validateEfoConfiguration() tests
// ----------------------------------------------------------------------
+
@Test
public void testNoEfoRegistrationTypeInConfig() {
Properties testConfig = TestUtils.getStandardProperties();
@@ -282,6 +284,37 @@ public class KinesisConfigUtilTest {
List<String> streams = Arrays.asList("stream1", "stream2");
KinesisConfigUtil.validateEfoConfiguration(testConfig, streams);
}
+
+ @Test
+ public void testValidateEfoMaxConcurrency() {
+ Properties testConfig = TestUtils.getStandardProperties();
+ testConfig.setProperty(EFO_HTTP_CLIENT_MAX_CONCURRENCY, "55");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
+
+ @Test
+ public void testValidateEfoMaxConcurrencyNonNumeric() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Invalid value given for EFO HTTP client max concurrency. Must be positive.");
+
+ Properties testConfig = TestUtils.getStandardProperties();
+ testConfig.setProperty(EFO_HTTP_CLIENT_MAX_CONCURRENCY, "abc");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
+
+ @Test
+ public void testValidateEfoMaxConcurrencyNegative() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Invalid value given for EFO HTTP client max concurrency. Must be positive.");
+
+ Properties testConfig = TestUtils.getStandardProperties();
+ testConfig.setProperty(EFO_HTTP_CLIENT_MAX_CONCURRENCY, "-1");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
+
// ----------------------------------------------------------------------
// validateConsumerConfiguration() tests
// ----------------------------------------------------------------------