You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/07/29 07:54:39 UTC

[GitHub] [flink] dannycranmer commented on a change in pull request #13005: Feature/flink 18661 efo de registration

dannycranmer commented on a change in pull request #13005:
URL: https://github.com/apache/flink/pull/13005#discussion_r461532730



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
##########
@@ -362,6 +440,10 @@ protected KinesisDataFetcher(List<String> streams,
 		this.watermarkTracker = watermarkTracker;
 		this.kinesisProxyFactory = checkNotNull(kinesisProxyFactory);
 		this.kinesis = kinesisProxyFactory.create(configProps);
+		this.kinesisProxyV2Factory = checkNotNull(kinesisProxyV2Factory);
+		if (shouldRegisterConsumerEagerly()) {

Review comment:
       @xiaolong-sn `EAGER` should be done in the `FlinkKinesisConsumer` constructor, `LAZY` should be done here

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
##########
@@ -400,20 +482,51 @@ private RecordEmitter createRecordEmitter(Properties configProps) {
 		SequenceNumber lastSequenceNum,
 		MetricGroup metricGroup,
 		KinesisDeserializationSchema<T> shardDeserializer) {
+		return createShardConsumer(
+			subscribedShardStateIndex,
+			subscribedShard,
+			lastSequenceNum,
+			metricGroup,
+			shardDeserializer,
+			null
+		);
+	}
+
+	/**
+	 * Create a new shard consumer.
+	 * Override this method to customize shard consumer behavior in subclasses.
+	 * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to
+	 * @param subscribedShard the shard this consumer is subscribed to
+	 * @param lastSequenceNum the sequence number in the shard to start consuming
+	 * @param metricGroup the metric group to report metrics to
+	 * @param streamInfoList the stream info used for enhanced fan-out to consume from
+	 * @return shard consumer
+	 */
+	protected ShardConsumer<T> createShardConsumer(
+		Integer subscribedShardStateIndex,
+		StreamShardHandle subscribedShard,
+		SequenceNumber lastSequenceNum,
+		MetricGroup metricGroup,
+		KinesisDeserializationSchema<T> shardDeserializer,
+		@Nullable List<FanOutStreamInfo> streamInfoList) {

Review comment:
       @xiaolong-sn why do we need to pass all the stream info? A `ShardConsumer` is only concerned about a single stream

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
##########
@@ -134,6 +134,46 @@
 	 */
 	private final int listStreamConsumersMaxRetries;
 
+	/**
+	 * Max retries for the describe stream operation.
+	 */
+	private final int describeStreamMaxRetries;

Review comment:
       @xiaolong-sn it would make sense to move the `DescribeStream` bits to you config PR since that has not been reviewed yet

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutStreamInfo.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import java.util.Objects;
+
+/**
+ * This is a data class which describe all related information when de-/registering streams.
+ */
+public class FanOutStreamInfo {
+	/** Kinesis stream name. */
+	private final String stream;
+
+	/** Kinesis stream arn. */
+	private final String streamArn;
+
+	/** Registered consumer name for the related stream. */
+	private final String consumerName;
+
+	/** Registered consumer arn for the related stream. */
+	private final String consumerArn;
+
+	/**
+	 * Return the Kinesis stream name.
+	 */
+	public String getStream() {
+		return stream;
+	}
+
+	/**
+	 * Return the Kinesis stream arn.
+	 */
+	public String getStreamArn() {
+		return streamArn;
+	}
+
+	/**
+	 * Return the Kinesis consumer name for an enhanced fan-out consumer.
+	 */
+	public String getConsumerName() {
+		return consumerName;
+	}
+
+	/**
+	 * Return the Kinesis consumer arn for an enhanced fan-out consumer.
+	 */
+	public String getConsumerArn() {
+		return consumerArn;
+	}
+
+	/**
+	 * Public constructor for fan out stream info.
+	 */
+	public FanOutStreamInfo(String stream, String streamArn, String consumerName, String consumerArn) {

Review comment:
       @xiaolong-sn nit: constructors should go above methods

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+	private static final Logger LOG = LoggerFactory.getLogger(KinesisProxyV2.class);
+
+	/** Random seed used to calculate backoff jitter for Kinesis operations. */
+	private static final Random seed = new Random();
 
 	private final KinesisAsyncClient kinesisAsyncClient;
 
+	private final FanOutRecordPublisherConfiguration fanOutRecordPublisherConfiguration;
+
 	/**
 	 * Create a new KinesisProxyV2 based on the supplied configuration properties.
 	 *
-	 * @param kinesisAsyncClient the kinesis async client used to communicate with Kinesis
+	 * @param configProps configuration properties containing AWS credential and AWS region info
+	 */
+	public KinesisProxyV2(final Properties configProps, List<String> streams) {
+		this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+		this.fanOutRecordPublisherConfiguration = new FanOutRecordPublisherConfiguration(configProps, streams);
+	}
+
+	/**
+	 * Creates a Kinesis proxy V2.
+	 *
+	 * @param configProps configuration properties
+	 * @param streams list of kinesis stream names
+	 * @return the created kinesis proxy v2
+	 */
+	public static KinesisProxyV2Interface create(Properties configProps, List<String> streams) {
+		return new KinesisProxyV2(configProps, streams);
+	}
+
+	/**
+	 * Create the Kinesis client, using the provided configuration properties.
+	 * Derived classes can override this method to customize the client configuration.
+	 *
+	 * @param configProps the properties map used to create the Kinesis Client
+	 * @return a Kinesis Client
+	 */
+	protected KinesisAsyncClient createKinesisAsyncClient(final Properties configProps) {
+		final ClientConfiguration config = new ClientConfigurationFactory().getConfig();
+		return AwsV2Util.createKinesisAsyncClient(configProps, config);
+	}
+
+	/**
+	 * {@inheritDoc}
 	 */
-	public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {

Review comment:
       @xiaolong-sn I had changed this class to not construct it dependencies inline with Dependency Inversion (SOLID) principle (https://github.com/apache/flink/pull/13005/commits/b7e8c0631ff1484905ff1ecf8f89a8b81036bc1a)
   

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+	private static final Logger LOG = LoggerFactory.getLogger(KinesisProxyV2.class);
+
+	/** Random seed used to calculate backoff jitter for Kinesis operations. */
+	private static final Random seed = new Random();
 
 	private final KinesisAsyncClient kinesisAsyncClient;
 
+	private final FanOutRecordPublisherConfiguration fanOutRecordPublisherConfiguration;
+
 	/**
 	 * Create a new KinesisProxyV2 based on the supplied configuration properties.
 	 *
-	 * @param kinesisAsyncClient the kinesis async client used to communicate with Kinesis
+	 * @param configProps configuration properties containing AWS credential and AWS region info
+	 */
+	public KinesisProxyV2(final Properties configProps, List<String> streams) {
+		this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+		this.fanOutRecordPublisherConfiguration = new FanOutRecordPublisherConfiguration(configProps, streams);
+	}
+
+	/**
+	 * Creates a Kinesis proxy V2.
+	 *
+	 * @param configProps configuration properties
+	 * @param streams list of kinesis stream names
+	 * @return the created kinesis proxy v2
+	 */
+	public static KinesisProxyV2Interface create(Properties configProps, List<String> streams) {
+		return new KinesisProxyV2(configProps, streams);
+	}
+
+	/**
+	 * Create the Kinesis client, using the provided configuration properties.
+	 * Derived classes can override this method to customize the client configuration.
+	 *
+	 * @param configProps the properties map used to create the Kinesis Client
+	 * @return a Kinesis Client
+	 */
+	protected KinesisAsyncClient createKinesisAsyncClient(final Properties configProps) {
+		final ClientConfiguration config = new ClientConfigurationFactory().getConfig();
+		return AwsV2Util.createKinesisAsyncClient(configProps, config);
+	}
+
+	/**
+	 * {@inheritDoc}
 	 */
-	public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
-		this.kinesisAsyncClient = kinesisAsyncClient;
+	@Override
+	public Map<String, String> describeStream(List<String> streams) throws InterruptedException, ExecutionException {
+		Map<String, String> result = new HashMap<>();
+		for (String stream : streams) {
+			DescribeStreamRequest describeStreamRequest = DescribeStreamRequest
+				.builder()
+				.streamName(stream)
+				.build();
+			DescribeStreamResponse describeStreamResponse = null;
+
+			int retryCount = 0;
+			while (retryCount <= fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() && describeStreamResponse == null) {
+				try {
+					describeStreamResponse = kinesisAsyncClient.describeStream(describeStreamRequest).get();
+				} catch (ExecutionException ex) {
+					if (AwsV2Util.isRecoverableException(ex)) {
+						long backoffMillis = fullJitterBackoff(
+							fanOutRecordPublisherConfiguration.getDescribeStreamBaseBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamMaxBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamExpConstant(), retryCount++);
+						LOG.warn("Got recoverable AmazonServiceException when trying to describe stream " + stream + ". Backing off for "
+							+ backoffMillis + " millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+						Thread.sleep(backoffMillis);
+					} else {
+						throw ex;
+					}
+				}
+			}
+			if (describeStreamResponse == null) {
+				throw new RuntimeException("Retries exceeded for describeStream operation - all " + fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() +
+					" retry attempts failed.");
+			}
+			result.put(stream, describeStreamResponse.streamDescription().streamARN());
+		}
+		return result;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public List<FanOutStreamInfo> registerStreamConsumer(Map<String, String> streamArns) throws InterruptedException, ExecutionException {
+		Preconditions.checkArgument(fanOutRecordPublisherConfiguration.getConsumerName().isPresent());
+		String consumerName = fanOutRecordPublisherConfiguration.getConsumerName().get();
+		List<FanOutStreamInfo> result = new ArrayList<>();
+		for (Map.Entry<String, String> entry : streamArns.entrySet()) {
+			String stream = entry.getKey();
+			String streamArn = entry.getValue();
+			RegisterStreamConsumerRequest registerStreamConsumerRequest = RegisterStreamConsumerRequest
+				.builder()
+				.consumerName(consumerName)
+				.streamARN(streamArn)
+				.build();
+			FanOutStreamInfo fanOutStreamInfo = null;
+			int retryCount = 0;
+			while (retryCount <= fanOutRecordPublisherConfiguration.getRegisterStreamMaxRetries() && fanOutStreamInfo == null) {
+				try {
+					RegisterStreamConsumerResponse registerStreamConsumerResponse = kinesisAsyncClient.registerStreamConsumer(registerStreamConsumerRequest).get();
+					fanOutStreamInfo = new FanOutStreamInfo(stream, streamArn, consumerName, registerStreamConsumerResponse.consumer().consumerARN());
+				} catch (ExecutionException ex) {
+					if (AwsV2Util.isResourceInUse(ex)) {
+						fanOutStreamInfo = describeStreamConsumer(stream, streamArn, consumerName);
+					} else if (AwsV2Util.isRecoverableException(ex)) {
+						long backoffMillis = fullJitterBackoff(
+							fanOutRecordPublisherConfiguration.getRegisterStreamBaseBackoffMillis(), fanOutRecordPublisherConfiguration.getRegisterStreamMaxBackoffMillis(), fanOutRecordPublisherConfiguration.getRegisterStreamExpConstant(), retryCount++);
+						LOG.warn("Got recoverable AmazonServiceException when trying to register " + stream + ". Backing off for "
+							+ backoffMillis + " millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+						Thread.sleep(backoffMillis);
+					} else {
+						throw ex;
+					}
+				}
+			}
+
+			if (fanOutStreamInfo == null) {
+				throw new RuntimeException("Retries exceeded for registerStream operation - all " + fanOutRecordPublisherConfiguration.getRegisterStreamMaxRetries() +
+					" retry attempts failed.");
+			}
+			result.add(fanOutStreamInfo);
+		}
+		return result;
+	}
+
+	public FanOutStreamInfo describeStreamConsumer(String stream, String streamArn, String consumerName) throws InterruptedException, ExecutionException  {
+		DescribeStreamConsumerRequest describeStreamConsumerRequest = DescribeStreamConsumerRequest
+			.builder()
+			.streamARN(streamArn)
+			.consumerName(consumerName)
+			.build();
+		FanOutStreamInfo fanOutStreamInfo = null;
+		int retryCount = 0;
+		while (retryCount <= fanOutRecordPublisherConfiguration.getDescribeStreamConsumerMaxRetries() && fanOutStreamInfo == null) {
+			try {
+				DescribeStreamConsumerResponse describeStreamConsumerResponse = kinesisAsyncClient.describeStreamConsumer(describeStreamConsumerRequest).get();
+				fanOutStreamInfo = new FanOutStreamInfo(stream, streamArn, consumerName, describeStreamConsumerResponse.consumerDescription().consumerARN());
+			} catch (ExecutionException ex) {
+				if (AwsV2Util.isRecoverableException(ex)) {
+					long backoffMillis = fullJitterBackoff(
+						fanOutRecordPublisherConfiguration.getDescribeStreamConsumerBaseBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamConsumerMaxBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamConsumerExpConstant(), retryCount++);
+					LOG.warn("Got recoverable AmazonServiceException when trying to describe stream consumer " + stream + ". Backing off for "
+						+ backoffMillis + " millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+					Thread.sleep(backoffMillis);
+				} else {
+					throw ex;
+				}
+			}
+		}
+
+		if (fanOutStreamInfo == null) {
+			throw new RuntimeException("Retries exceeded for registerStream operation - all " + fanOutRecordPublisherConfiguration.getRegisterStreamMaxRetries() +
+				" retry attempts failed.");
+		}
+		return fanOutStreamInfo;
 	}
 
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void deregisterStreamConsumer(List<FanOutStreamInfo> fanOutStreamInfos) throws InterruptedException, ExecutionException {

Review comment:
       @xiaolong-sn again please make this singular (single deregistration at a time)
   
   Also this needs to open with describe and stagger too as per the FLIP flow chart to deal with competing requests

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
##########
@@ -400,20 +482,51 @@ private RecordEmitter createRecordEmitter(Properties configProps) {
 		SequenceNumber lastSequenceNum,
 		MetricGroup metricGroup,
 		KinesisDeserializationSchema<T> shardDeserializer) {
+		return createShardConsumer(
+			subscribedShardStateIndex,
+			subscribedShard,
+			lastSequenceNum,
+			metricGroup,
+			shardDeserializer,
+			null
+		);
+	}
+
+	/**
+	 * Create a new shard consumer.
+	 * Override this method to customize shard consumer behavior in subclasses.
+	 * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to
+	 * @param subscribedShard the shard this consumer is subscribed to
+	 * @param lastSequenceNum the sequence number in the shard to start consuming
+	 * @param metricGroup the metric group to report metrics to
+	 * @param streamInfoList the stream info used for enhanced fan-out to consume from
+	 * @return shard consumer
+	 */
+	protected ShardConsumer<T> createShardConsumer(
+		Integer subscribedShardStateIndex,
+		StreamShardHandle subscribedShard,
+		SequenceNumber lastSequenceNum,
+		MetricGroup metricGroup,
+		KinesisDeserializationSchema<T> shardDeserializer,
+		@Nullable List<FanOutStreamInfo> streamInfoList) {
 
 		final KinesisProxyInterface kinesis = kinesisProxyFactory.create(configProps);
 
 		final RecordPublisher recordPublisher = new PollingRecordPublisherFactory()
 			.create(configProps, metricGroup, subscribedShard, kinesis);
 
-		return new ShardConsumer<>(
+		return new ShardConsumer<T>(
 			this,
 			recordPublisher,
 			subscribedShardStateIndex,
 			subscribedShard,
 			lastSequenceNum,
 			new ShardConsumerMetricsReporter(metricGroup),
-			shardDeserializer);
+			shardDeserializer,
+			configProps,

Review comment:
       @xiaolong-sn all these additional properties should not be required once you move LAZY. Remember we have a single stream consumer per stream, not shard. So we should perform lazy in the `KinesisDataFetcher` (once per task manager)

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+	private static final Logger LOG = LoggerFactory.getLogger(KinesisProxyV2.class);
+
+	/** Random seed used to calculate backoff jitter for Kinesis operations. */
+	private static final Random seed = new Random();
 
 	private final KinesisAsyncClient kinesisAsyncClient;
 
+	private final FanOutRecordPublisherConfiguration fanOutRecordPublisherConfiguration;
+
 	/**
 	 * Create a new KinesisProxyV2 based on the supplied configuration properties.
 	 *
-	 * @param kinesisAsyncClient the kinesis async client used to communicate with Kinesis
+	 * @param configProps configuration properties containing AWS credential and AWS region info
+	 */
+	public KinesisProxyV2(final Properties configProps, List<String> streams) {
+		this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+		this.fanOutRecordPublisherConfiguration = new FanOutRecordPublisherConfiguration(configProps, streams);
+	}
+
+	/**
+	 * Creates a Kinesis proxy V2.
+	 *
+	 * @param configProps configuration properties
+	 * @param streams list of kinesis stream names
+	 * @return the created kinesis proxy v2
+	 */
+	public static KinesisProxyV2Interface create(Properties configProps, List<String> streams) {
+		return new KinesisProxyV2(configProps, streams);
+	}
+
+	/**
+	 * Create the Kinesis client, using the provided configuration properties.
+	 * Derived classes can override this method to customize the client configuration.
+	 *
+	 * @param configProps the properties map used to create the Kinesis Client
+	 * @return a Kinesis Client
+	 */
+	protected KinesisAsyncClient createKinesisAsyncClient(final Properties configProps) {
+		final ClientConfiguration config = new ClientConfigurationFactory().getConfig();
+		return AwsV2Util.createKinesisAsyncClient(configProps, config);
+	}
+
+	/**
+	 * {@inheritDoc}
 	 */
-	public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
-		this.kinesisAsyncClient = kinesisAsyncClient;
+	@Override
+	public Map<String, String> describeStream(List<String> streams) throws InterruptedException, ExecutionException {
+		Map<String, String> result = new HashMap<>();
+		for (String stream : streams) {
+			DescribeStreamRequest describeStreamRequest = DescribeStreamRequest
+				.builder()
+				.streamName(stream)
+				.build();
+			DescribeStreamResponse describeStreamResponse = null;
+
+			int retryCount = 0;
+			while (retryCount <= fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() && describeStreamResponse == null) {
+				try {
+					describeStreamResponse = kinesisAsyncClient.describeStream(describeStreamRequest).get();
+				} catch (ExecutionException ex) {
+					if (AwsV2Util.isRecoverableException(ex)) {
+						long backoffMillis = fullJitterBackoff(
+							fanOutRecordPublisherConfiguration.getDescribeStreamBaseBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamMaxBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamExpConstant(), retryCount++);
+						LOG.warn("Got recoverable AmazonServiceException when trying to describe stream " + stream + ". Backing off for "
+							+ backoffMillis + " millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+						Thread.sleep(backoffMillis);
+					} else {
+						throw ex;
+					}
+				}
+			}
+			if (describeStreamResponse == null) {
+				throw new RuntimeException("Retries exceeded for describeStream operation - all " + fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() +
+					" retry attempts failed.");
+			}
+			result.put(stream, describeStreamResponse.streamDescription().streamARN());
+		}
+		return result;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public List<FanOutStreamInfo> registerStreamConsumer(Map<String, String> streamArns) throws InterruptedException, ExecutionException {

Review comment:
       @xiaolong-sn again I think this should accept a single streamArn/consumerName and loop externally

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutStreamInfo.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import java.util.Objects;
+
+/**
+ * This is a data class which describe all related information when de-/registering streams.
+ */
+public class FanOutStreamInfo {
+	/** Kinesis stream name. */
+	private final String stream;
+
+	/** Kinesis stream arn. */
+	private final String streamArn;
+
+	/** Registered consumer name for the related stream. */
+	private final String consumerName;
+
+	/** Registered consumer arn for the related stream. */
+	private final String consumerArn;
+
+	/**
+	 * Return the Kinesis stream name.
+	 */
+	public String getStream() {
+		return stream;
+	}
+
+	/**
+	 * Return the Kinesis stream arn.
+	 */
+	public String getStreamArn() {
+		return streamArn;
+	}
+
+	/**
+	 * Return the Kinesis consumer name for an enhanced fan-out consumer.
+	 */
+	public String getConsumerName() {
+		return consumerName;
+	}
+
+	/**
+	 * Return the Kinesis consumer arn for an enhanced fan-out consumer.
+	 */
+	public String getConsumerArn() {
+		return consumerArn;
+	}
+
+	/**
+	 * Public constructor for fan out stream info.
+	 */
+	public FanOutStreamInfo(String stream, String streamArn, String consumerName, String consumerArn) {
+		this.stream = stream;
+		this.streamArn = streamArn;
+		this.consumerName = consumerName;
+		this.consumerArn = consumerArn;
+	}
+
+	@Override
+	public boolean equals(Object o) {

Review comment:
       @xiaolong-sn do we need equals/hashCode for this object?

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
##########
@@ -147,9 +228,31 @@ public void run() {
 			}
 		} catch (Throwable t) {
 			fetcherRef.stopWithError(t);
+		} finally {
+			if (this.streamInfoList != null) {
+				try {
+					deregisterStreamConsumer();
+				} catch (Throwable t) {
+					fetcherRef.stopWithError(t);
+				}
+			}
 		}
 	}
 
+	private void deregisterStreamConsumer() throws ExecutionException, InterruptedException {

Review comment:
       @xiaolong-sn this needs to move to `KinesisDataFetcher::shutdownFetcher` 
   
   

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
##########
@@ -22,21 +22,29 @@
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

Review comment:
       @xiaolong-sn I was not expecting any changes to `ShardConsumer`, once you move `LAZY` to `KinesisDataFetcher` this should be unchanged

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutStreamInfo.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import java.util.Objects;
+
+/**
+ * This is a data class which describe all related information when de-/registering streams.
+ */
+public class FanOutStreamInfo {
+	/** Kinesis stream name. */
+	private final String stream;
+
+	/** Kinesis stream arn. */
+	private final String streamArn;
+
+	/** Registered consumer name for the related stream. */
+	private final String consumerName;

Review comment:
       @xiaolong-sn Immutable, nice +1

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
##########
@@ -248,4 +254,29 @@ static AwsCredentialsProvider getWebIdentityTokenFileCredentialsProvider(
 	public static Region getRegion(final Properties configProps) {
 		return Region.of(configProps.getProperty(AWSConfigConstants.AWS_REGION));
 	}
+
+	/**
+	 * Whether or not an exception is recoverable.
+	 */
+	public static boolean isRecoverableException(ExecutionException e) {
+		if (!(e.getCause() instanceof SdkException)) {
+			return false;
+		}
+		SdkException ase = (SdkException) e.getCause();
+		return ase instanceof LimitExceededException || ase instanceof ProvisionedThroughputExceededException || ase instanceof ResourceInUseException;

Review comment:
       @xiaolong-sn `ase instanceof ResourceInUseException` can be replaced with `isResourceInUse(ase)`

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+	private static final Logger LOG = LoggerFactory.getLogger(KinesisProxyV2.class);
+
+	/** Random seed used to calculate backoff jitter for Kinesis operations. */
+	private static final Random seed = new Random();
 
 	private final KinesisAsyncClient kinesisAsyncClient;
 
+	private final FanOutRecordPublisherConfiguration fanOutRecordPublisherConfiguration;
+
 	/**
 	 * Create a new KinesisProxyV2 based on the supplied configuration properties.
 	 *
-	 * @param kinesisAsyncClient the kinesis async client used to communicate with Kinesis
+	 * @param configProps configuration properties containing AWS credential and AWS region info
+	 */
+	public KinesisProxyV2(final Properties configProps, List<String> streams) {
+		this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+		this.fanOutRecordPublisherConfiguration = new FanOutRecordPublisherConfiguration(configProps, streams);
+	}
+
+	/**
+	 * Creates a Kinesis proxy V2.
+	 *
+	 * @param configProps configuration properties
+	 * @param streams list of kinesis stream names
+	 * @return the created kinesis proxy v2
+	 */
+	public static KinesisProxyV2Interface create(Properties configProps, List<String> streams) {
+		return new KinesisProxyV2(configProps, streams);
+	}
+
+	/**
+	 * Create the Kinesis client, using the provided configuration properties.
+	 * Derived classes can override this method to customize the client configuration.
+	 *
+	 * @param configProps the properties map used to create the Kinesis Client
+	 * @return a Kinesis Client
+	 */
+	protected KinesisAsyncClient createKinesisAsyncClient(final Properties configProps) {
+		final ClientConfiguration config = new ClientConfigurationFactory().getConfig();
+		return AwsV2Util.createKinesisAsyncClient(configProps, config);
+	}
+
+	/**
+	 * {@inheritDoc}
 	 */
-	public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
-		this.kinesisAsyncClient = kinesisAsyncClient;
+	@Override
+	public Map<String, String> describeStream(List<String> streams) throws InterruptedException, ExecutionException {

Review comment:
       @xiaolong-sn I feel like we are throwing a lot of potentially useful information away here, why not return `DescribeStreamResponse` instead and keep it generic? The KinesisProxyV1 returns SDK objects 

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutStreamInfo.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import java.util.Objects;
+
+/**
+ * This is a data class which describe all related information when de-/registering streams.
+ */
+public class FanOutStreamInfo {

Review comment:
       @xiaolong-sn This should be renamed to `FanOutStreamConsumerInfo` please

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutStreamInfo.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import java.util.Objects;
+
+/**
+ * This is a data class which describe all related information when de-/registering streams.
+ */
+public class FanOutStreamInfo {
+	/** Kinesis stream name. */
+	private final String stream;
+
+	/** Kinesis stream arn. */
+	private final String streamArn;
+
+	/** Registered consumer name for the related stream. */
+	private final String consumerName;
+
+	/** Registered consumer arn for the related stream. */
+	private final String consumerArn;
+
+	/**
+	 * Return the Kinesis stream name.
+	 */
+	public String getStream() {
+		return stream;
+	}
+
+	/**
+	 * Return the Kinesis stream arn.
+	 */
+	public String getStreamArn() {
+		return streamArn;
+	}
+
+	/**
+	 * Return the Kinesis consumer name for an enhanced fan-out consumer.
+	 */
+	public String getConsumerName() {
+		return consumerName;
+	}
+
+	/**
+	 * Return the Kinesis consumer arn for an enhanced fan-out consumer.
+	 */
+	public String getConsumerArn() {
+		return consumerArn;
+	}
+
+	/**
+	 * Public constructor for fan out stream info.

Review comment:
       @xiaolong-sn no javadoc for parameters

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+	private static final Logger LOG = LoggerFactory.getLogger(KinesisProxyV2.class);
+
+	/** Random seed used to calculate backoff jitter for Kinesis operations. */
+	private static final Random seed = new Random();
 
 	private final KinesisAsyncClient kinesisAsyncClient;
 
+	private final FanOutRecordPublisherConfiguration fanOutRecordPublisherConfiguration;
+
 	/**
 	 * Create a new KinesisProxyV2 based on the supplied configuration properties.
 	 *
-	 * @param kinesisAsyncClient the kinesis async client used to communicate with Kinesis
+	 * @param configProps configuration properties containing AWS credential and AWS region info
+	 */
+	public KinesisProxyV2(final Properties configProps, List<String> streams) {
+		this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+		this.fanOutRecordPublisherConfiguration = new FanOutRecordPublisherConfiguration(configProps, streams);
+	}
+
+	/**
+	 * Creates a Kinesis proxy V2.
+	 *
+	 * @param configProps configuration properties
+	 * @param streams list of kinesis stream names
+	 * @return the created kinesis proxy v2
+	 */
+	public static KinesisProxyV2Interface create(Properties configProps, List<String> streams) {
+		return new KinesisProxyV2(configProps, streams);
+	}
+
+	/**
+	 * Create the Kinesis client, using the provided configuration properties.
+	 * Derived classes can override this method to customize the client configuration.
+	 *
+	 * @param configProps the properties map used to create the Kinesis Client
+	 * @return a Kinesis Client
+	 */
+	protected KinesisAsyncClient createKinesisAsyncClient(final Properties configProps) {
+		final ClientConfiguration config = new ClientConfigurationFactory().getConfig();
+		return AwsV2Util.createKinesisAsyncClient(configProps, config);
+	}
+
+	/**
+	 * {@inheritDoc}
 	 */
-	public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
-		this.kinesisAsyncClient = kinesisAsyncClient;
+	@Override
+	public Map<String, String> describeStream(List<String> streams) throws InterruptedException, ExecutionException {
+		Map<String, String> result = new HashMap<>();
+		for (String stream : streams) {
+			DescribeStreamRequest describeStreamRequest = DescribeStreamRequest
+				.builder()
+				.streamName(stream)
+				.build();
+			DescribeStreamResponse describeStreamResponse = null;
+
+			int retryCount = 0;
+			while (retryCount <= fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() && describeStreamResponse == null) {
+				try {
+					describeStreamResponse = kinesisAsyncClient.describeStream(describeStreamRequest).get();
+				} catch (ExecutionException ex) {
+					if (AwsV2Util.isRecoverableException(ex)) {
+						long backoffMillis = fullJitterBackoff(
+							fanOutRecordPublisherConfiguration.getDescribeStreamBaseBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamMaxBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamExpConstant(), retryCount++);
+						LOG.warn("Got recoverable AmazonServiceException when trying to describe stream " + stream + ". Backing off for "
+							+ backoffMillis + " millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+						Thread.sleep(backoffMillis);
+					} else {
+						throw ex;
+					}
+				}
+			}
+			if (describeStreamResponse == null) {
+				throw new RuntimeException("Retries exceeded for describeStream operation - all " + fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() +
+					" retry attempts failed.");
+			}
+			result.put(stream, describeStreamResponse.streamDescription().streamARN());
+		}
+		return result;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public List<FanOutStreamInfo> registerStreamConsumer(Map<String, String> streamArns) throws InterruptedException, ExecutionException {
+		Preconditions.checkArgument(fanOutRecordPublisherConfiguration.getConsumerName().isPresent());
+		String consumerName = fanOutRecordPublisherConfiguration.getConsumerName().get();
+		List<FanOutStreamInfo> result = new ArrayList<>();
+		for (Map.Entry<String, String> entry : streamArns.entrySet()) {
+			String stream = entry.getKey();
+			String streamArn = entry.getValue();
+			RegisterStreamConsumerRequest registerStreamConsumerRequest = RegisterStreamConsumerRequest

Review comment:
       @xiaolong-sn what happens if the stream consumer is already registered? The flow chart in the FLIP had a call to `ListStreamConsumers` to establish if it is already registered. Also in the case of Lazy we are meant to be introducing a jitter delay up front. Please refer to the flowcharts in the [FLIP](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers)

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+	private static final Logger LOG = LoggerFactory.getLogger(KinesisProxyV2.class);
+
+	/** Random seed used to calculate backoff jitter for Kinesis operations. */
+	private static final Random seed = new Random();
 
 	private final KinesisAsyncClient kinesisAsyncClient;
 
+	private final FanOutRecordPublisherConfiguration fanOutRecordPublisherConfiguration;
+
 	/**
 	 * Create a new KinesisProxyV2 based on the supplied configuration properties.
 	 *
-	 * @param kinesisAsyncClient the kinesis async client used to communicate with Kinesis
+	 * @param configProps configuration properties containing AWS credential and AWS region info
+	 */
+	public KinesisProxyV2(final Properties configProps, List<String> streams) {
+		this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+		this.fanOutRecordPublisherConfiguration = new FanOutRecordPublisherConfiguration(configProps, streams);
+	}
+
+	/**
+	 * Creates a Kinesis proxy V2.
+	 *
+	 * @param configProps configuration properties
+	 * @param streams list of kinesis stream names
+	 * @return the created kinesis proxy v2
+	 */
+	public static KinesisProxyV2Interface create(Properties configProps, List<String> streams) {
+		return new KinesisProxyV2(configProps, streams);
+	}
+
+	/**
+	 * Create the Kinesis client, using the provided configuration properties.
+	 * Derived classes can override this method to customize the client configuration.
+	 *
+	 * @param configProps the properties map used to create the Kinesis Client
+	 * @return a Kinesis Client
+	 */
+	protected KinesisAsyncClient createKinesisAsyncClient(final Properties configProps) {
+		final ClientConfiguration config = new ClientConfigurationFactory().getConfig();
+		return AwsV2Util.createKinesisAsyncClient(configProps, config);
+	}
+
+	/**
+	 * {@inheritDoc}
 	 */
-	public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
-		this.kinesisAsyncClient = kinesisAsyncClient;
+	@Override
+	public Map<String, String> describeStream(List<String> streams) throws InterruptedException, ExecutionException {

Review comment:
       @xiaolong-sn this method is called `describeStream` yet it is describing streams, please rename or split to two methods:
   - `describeStream`
   - `describeStreams`

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+	private static final Logger LOG = LoggerFactory.getLogger(KinesisProxyV2.class);
+
+	/** Random seed used to calculate backoff jitter for Kinesis operations. */
+	private static final Random seed = new Random();
 
 	private final KinesisAsyncClient kinesisAsyncClient;
 
+	private final FanOutRecordPublisherConfiguration fanOutRecordPublisherConfiguration;
+
 	/**
 	 * Create a new KinesisProxyV2 based on the supplied configuration properties.
 	 *
-	 * @param kinesisAsyncClient the kinesis async client used to communicate with Kinesis
+	 * @param configProps configuration properties containing AWS credential and AWS region info
+	 */
+	public KinesisProxyV2(final Properties configProps, List<String> streams) {
+		this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+		this.fanOutRecordPublisherConfiguration = new FanOutRecordPublisherConfiguration(configProps, streams);
+	}
+
+	/**
+	 * Creates a Kinesis proxy V2.
+	 *
+	 * @param configProps configuration properties
+	 * @param streams list of kinesis stream names
+	 * @return the created kinesis proxy v2
+	 */
+	public static KinesisProxyV2Interface create(Properties configProps, List<String> streams) {
+		return new KinesisProxyV2(configProps, streams);
+	}
+
+	/**
+	 * Create the Kinesis client, using the provided configuration properties.
+	 * Derived classes can override this method to customize the client configuration.
+	 *
+	 * @param configProps the properties map used to create the Kinesis Client
+	 * @return a Kinesis Client
+	 */
+	protected KinesisAsyncClient createKinesisAsyncClient(final Properties configProps) {
+		final ClientConfiguration config = new ClientConfigurationFactory().getConfig();
+		return AwsV2Util.createKinesisAsyncClient(configProps, config);
+	}
+
+	/**
+	 * {@inheritDoc}
 	 */
-	public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
-		this.kinesisAsyncClient = kinesisAsyncClient;
+	@Override
+	public Map<String, String> describeStream(List<String> streams) throws InterruptedException, ExecutionException {
+		Map<String, String> result = new HashMap<>();
+		for (String stream : streams) {
+			DescribeStreamRequest describeStreamRequest = DescribeStreamRequest
+				.builder()
+				.streamName(stream)
+				.build();
+			DescribeStreamResponse describeStreamResponse = null;
+
+			int retryCount = 0;
+			while (retryCount <= fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() && describeStreamResponse == null) {
+				try {
+					describeStreamResponse = kinesisAsyncClient.describeStream(describeStreamRequest).get();
+				} catch (ExecutionException ex) {
+					if (AwsV2Util.isRecoverableException(ex)) {
+						long backoffMillis = fullJitterBackoff(
+							fanOutRecordPublisherConfiguration.getDescribeStreamBaseBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamMaxBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamExpConstant(), retryCount++);
+						LOG.warn("Got recoverable AmazonServiceException when trying to describe stream " + stream + ". Backing off for "
+							+ backoffMillis + " millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+						Thread.sleep(backoffMillis);
+					} else {
+						throw ex;
+					}
+				}
+			}
+			if (describeStreamResponse == null) {
+				throw new RuntimeException("Retries exceeded for describeStream operation - all " + fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() +
+					" retry attempts failed.");
+			}
+			result.put(stream, describeStreamResponse.streamDescription().streamARN());
+		}
+		return result;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public List<FanOutStreamInfo> registerStreamConsumer(Map<String, String> streamArns) throws InterruptedException, ExecutionException {
+		Preconditions.checkArgument(fanOutRecordPublisherConfiguration.getConsumerName().isPresent());
+		String consumerName = fanOutRecordPublisherConfiguration.getConsumerName().get();
+		List<FanOutStreamInfo> result = new ArrayList<>();
+		for (Map.Entry<String, String> entry : streamArns.entrySet()) {
+			String stream = entry.getKey();
+			String streamArn = entry.getValue();
+			RegisterStreamConsumerRequest registerStreamConsumerRequest = RegisterStreamConsumerRequest

Review comment:
       Ah I see you are using the `describeStreamConsumer` instead of list. That is better actually. But I think you should do that first, as per the flow chart. Just replace List with Describe. If the consumer is already registered it will be 1 call instead of 2 per task. 
   
   Describe has a higher TPS too so that is good!

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+	private static final Logger LOG = LoggerFactory.getLogger(KinesisProxyV2.class);
+
+	/** Random seed used to calculate backoff jitter for Kinesis operations. */
+	private static final Random seed = new Random();
 
 	private final KinesisAsyncClient kinesisAsyncClient;
 
+	private final FanOutRecordPublisherConfiguration fanOutRecordPublisherConfiguration;
+
 	/**
 	 * Create a new KinesisProxyV2 based on the supplied configuration properties.
 	 *
-	 * @param kinesisAsyncClient the kinesis async client used to communicate with Kinesis
+	 * @param configProps configuration properties containing AWS credential and AWS region info
+	 */
+	public KinesisProxyV2(final Properties configProps, List<String> streams) {
+		this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+		this.fanOutRecordPublisherConfiguration = new FanOutRecordPublisherConfiguration(configProps, streams);
+	}
+
+	/**
+	 * Creates a Kinesis proxy V2.
+	 *
+	 * @param configProps configuration properties
+	 * @param streams list of kinesis stream names
+	 * @return the created kinesis proxy v2
+	 */
+	public static KinesisProxyV2Interface create(Properties configProps, List<String> streams) {
+		return new KinesisProxyV2(configProps, streams);
+	}
+
+	/**
+	 * Create the Kinesis client, using the provided configuration properties.
+	 * Derived classes can override this method to customize the client configuration.
+	 *
+	 * @param configProps the properties map used to create the Kinesis Client
+	 * @return a Kinesis Client
+	 */
+	protected KinesisAsyncClient createKinesisAsyncClient(final Properties configProps) {
+		final ClientConfiguration config = new ClientConfigurationFactory().getConfig();
+		return AwsV2Util.createKinesisAsyncClient(configProps, config);
+	}
+
+	/**
+	 * {@inheritDoc}
 	 */
-	public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
-		this.kinesisAsyncClient = kinesisAsyncClient;
+	@Override
+	public Map<String, String> describeStream(List<String> streams) throws InterruptedException, ExecutionException {

Review comment:
       @xiaolong-sn considering we cannot batch call I would opt to just expose a single `describeStream` method and loop externally to remove the Map and additional complexities around that:
   -  `DescribeStreamResponse describeStream(final String streamName)`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org