You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/07/29 07:52:40 UTC

[GitHub] [flink] xiaolong-sn opened a new pull request #13005: Feature/flink 18661 efo de registration

xiaolong-sn opened a new pull request #13005:
URL: https://github.com/apache/flink/pull/13005


   
   ## What is the purpose of the change
   
   *This is the milestone of [FLIP-128](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers) to add EFO support to the FlinkKinesisConsumer. 
   The de-/regisiterStream operations has been introduce.* 
   [JIRA issue](https://issues.apache.org/jira/browse/FLINK-18661)
   
   ## Note
   This PR is blocked by:
   - Test coverage improvements:
     - [JIRA issue](https://issues.apache.org/jira/browse/FLINK-18483)
     - [Pull request](https://github.com/apache/flink/pull/12850)
   - Introducing RecordPublisher.
      - [JIRA issue](https://issues.apache.org/jira/browse/FLINK-18512)
      - [Pull request](https://github.com/apache/flink/pull/12881)
    - Add AWS SDK v2.x dependency and KinesisProxyV2
      - [JIRA issue](https://issues.apache.org/jira/browse/FLINK-18513)
      - [Pull request](https://github.com/apache/flink/pull/12944)
   -  Configuration deserialisation and validation
      - [JIRA issue](https://issues.apache.org/jira/browse/FLINK-18536)
      - [Pull request](https://github.com/apache/flink/pull/12920)
      
    
   
   ## Brief change log
   
     - `ConsumerConstants`
       - *Add the describe stream and descrbie stream consumer operation parameters.* 
     - `KinesisConfigUtil`
         - *Add the validation of the above parameter*
      - `FanOutStreamInfo`
          - *Add a new data class for storing enhanced fan-out consumer info.*
      - `FlinkDataFetcher`
          - *Add a KinesisProxyV2Interface in order to register stream consumers when the efo registration mode is set to eager.*
      - `ShardConsumer`
        - *Add a KinesisProxyV2Interface in order to de-/register stream consumers when the efo registration mode is set to lazy.*
      - `KinesisProxyV2Interface` & `KinesisProxyV2`
        - *Add the de-/register stream consumer operations here.*
      - `AwsV2Util`
        - *Add several methods to classify exceptions.* 
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - *`KinesisProxyV2Test`*
     - *`AwsV2UtilTest`*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no 
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #13005: Feature/flink 18661 efo de registration

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #13005:
URL: https://github.com/apache/flink/pull/13005#discussion_r463386948



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutStreamInfo.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import java.util.Objects;
+
+/**
+ * This is a data class which describe all related information when de-/registering streams.
+ */
+public class FanOutStreamInfo {
+	/** Kinesis stream name. */
+	private final String stream;
+
+	/** Kinesis stream arn. */
+	private final String streamArn;
+
+	/** Registered consumer name for the related stream. */
+	private final String consumerName;
+
+	/** Registered consumer arn for the related stream. */
+	private final String consumerArn;
+
+	/**
+	 * Return the Kinesis stream name.
+	 */
+	public String getStream() {
+		return stream;
+	}
+
+	/**
+	 * Return the Kinesis stream arn.
+	 */
+	public String getStreamArn() {
+		return streamArn;
+	}
+
+	/**
+	 * Return the Kinesis consumer name for an enhanced fan-out consumer.
+	 */
+	public String getConsumerName() {
+		return consumerName;
+	}
+
+	/**
+	 * Return the Kinesis consumer arn for an enhanced fan-out consumer.
+	 */
+	public String getConsumerArn() {
+		return consumerArn;
+	}
+
+	/**
+	 * Public constructor for fan out stream info.
+	 */
+	public FanOutStreamInfo(String stream, String streamArn, String consumerName, String consumerArn) {
+		this.stream = stream;
+		this.streamArn = streamArn;
+		this.consumerName = consumerName;
+		this.consumerArn = consumerArn;
+	}
+
+	@Override
+	public boolean equals(Object o) {

Review comment:
       I've removed the equals and hashCode functions.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13005: Feature/flink 18661 efo de registration

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13005:
URL: https://github.com/apache/flink/pull/13005#issuecomment-665003864


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bf29035a5c92930535a80692d61d0819605c4206",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4965",
       "triggerID" : "bf29035a5c92930535a80692d61d0819605c4206",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c55d9c0b24133265d11f05ef84d0d93b46c67bec",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c55d9c0b24133265d11f05ef84d0d93b46c67bec",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bf29035a5c92930535a80692d61d0819605c4206 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4965) 
   * c55d9c0b24133265d11f05ef84d0d93b46c67bec UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13005: Feature/flink 18661 efo de registration

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13005:
URL: https://github.com/apache/flink/pull/13005#issuecomment-665003864


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bf29035a5c92930535a80692d61d0819605c4206",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4965",
       "triggerID" : "bf29035a5c92930535a80692d61d0819605c4206",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c55d9c0b24133265d11f05ef84d0d93b46c67bec",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5067",
       "triggerID" : "c55d9c0b24133265d11f05ef84d0d93b46c67bec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9f1b031f968c039824b8f895f44877de921a3c92",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5077",
       "triggerID" : "9f1b031f968c039824b8f895f44877de921a3c92",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9f1b031f968c039824b8f895f44877de921a3c92 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5077) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #13005: Feature/flink 18661 efo de registration

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #13005:
URL: https://github.com/apache/flink/pull/13005#discussion_r463386695



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutStreamInfo.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import java.util.Objects;
+
+/**
+ * This is a data class which describe all related information when de-/registering streams.
+ */
+public class FanOutStreamInfo {
+	/** Kinesis stream name. */
+	private final String stream;
+
+	/** Kinesis stream arn. */
+	private final String streamArn;
+
+	/** Registered consumer name for the related stream. */
+	private final String consumerName;
+
+	/** Registered consumer arn for the related stream. */
+	private final String consumerArn;
+
+	/**
+	 * Return the Kinesis stream name.
+	 */
+	public String getStream() {
+		return stream;
+	}
+
+	/**
+	 * Return the Kinesis stream arn.
+	 */
+	public String getStreamArn() {
+		return streamArn;
+	}
+
+	/**
+	 * Return the Kinesis consumer name for an enhanced fan-out consumer.
+	 */
+	public String getConsumerName() {
+		return consumerName;
+	}
+
+	/**
+	 * Return the Kinesis consumer arn for an enhanced fan-out consumer.
+	 */
+	public String getConsumerArn() {
+		return consumerArn;
+	}
+
+	/**
+	 * Public constructor for fan out stream info.

Review comment:
       I added it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13005: Feature/flink 18661 efo de registration

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13005:
URL: https://github.com/apache/flink/pull/13005#issuecomment-665003864


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bf29035a5c92930535a80692d61d0819605c4206",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4965",
       "triggerID" : "bf29035a5c92930535a80692d61d0819605c4206",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c55d9c0b24133265d11f05ef84d0d93b46c67bec",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5067",
       "triggerID" : "c55d9c0b24133265d11f05ef84d0d93b46c67bec",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c55d9c0b24133265d11f05ef84d0d93b46c67bec Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5067) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #13005: Feature/flink 18661 efo de registration

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #13005:
URL: https://github.com/apache/flink/pull/13005#discussion_r463385790



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutStreamInfo.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import java.util.Objects;
+
+/**
+ * This is a data class which describe all related information when de-/registering streams.
+ */
+public class FanOutStreamInfo {
+	/** Kinesis stream name. */
+	private final String stream;
+
+	/** Kinesis stream arn. */
+	private final String streamArn;
+
+	/** Registered consumer name for the related stream. */
+	private final String consumerName;
+
+	/** Registered consumer arn for the related stream. */
+	private final String consumerArn;
+
+	/**
+	 * Return the Kinesis stream name.
+	 */
+	public String getStream() {
+		return stream;
+	}
+
+	/**
+	 * Return the Kinesis stream arn.
+	 */
+	public String getStreamArn() {
+		return streamArn;
+	}
+
+	/**
+	 * Return the Kinesis consumer name for an enhanced fan-out consumer.
+	 */
+	public String getConsumerName() {
+		return consumerName;
+	}
+
+	/**
+	 * Return the Kinesis consumer arn for an enhanced fan-out consumer.
+	 */
+	public String getConsumerArn() {
+		return consumerArn;
+	}
+
+	/**
+	 * Public constructor for fan out stream info.
+	 */
+	public FanOutStreamInfo(String stream, String streamArn, String consumerName, String consumerArn) {

Review comment:
       I think you are right.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #13005: Feature/flink 18661 efo de registration

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #13005:
URL: https://github.com/apache/flink/pull/13005#discussion_r463385345



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
##########
@@ -134,6 +134,46 @@
 	 */
 	private final int listStreamConsumersMaxRetries;
 
+	/**
+	 * Max retries for the describe stream operation.
+	 */
+	private final int describeStreamMaxRetries;

Review comment:
       I'll do that later.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #13005: Feature/flink 18661 efo de registration

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #13005:
URL: https://github.com/apache/flink/pull/13005#discussion_r463452805



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
##########
@@ -400,20 +482,51 @@ private RecordEmitter createRecordEmitter(Properties configProps) {
 		SequenceNumber lastSequenceNum,
 		MetricGroup metricGroup,
 		KinesisDeserializationSchema<T> shardDeserializer) {
+		return createShardConsumer(
+			subscribedShardStateIndex,
+			subscribedShard,
+			lastSequenceNum,
+			metricGroup,
+			shardDeserializer,
+			null
+		);
+	}
+
+	/**
+	 * Create a new shard consumer.
+	 * Override this method to customize shard consumer behavior in subclasses.
+	 * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to
+	 * @param subscribedShard the shard this consumer is subscribed to
+	 * @param lastSequenceNum the sequence number in the shard to start consuming
+	 * @param metricGroup the metric group to report metrics to
+	 * @param streamInfoList the stream info used for enhanced fan-out to consume from
+	 * @return shard consumer
+	 */
+	protected ShardConsumer<T> createShardConsumer(
+		Integer subscribedShardStateIndex,
+		StreamShardHandle subscribedShard,
+		SequenceNumber lastSequenceNum,
+		MetricGroup metricGroup,
+		KinesisDeserializationSchema<T> shardDeserializer,
+		@Nullable List<FanOutStreamInfo> streamInfoList) {
 
 		final KinesisProxyInterface kinesis = kinesisProxyFactory.create(configProps);
 
 		final RecordPublisher recordPublisher = new PollingRecordPublisherFactory()
 			.create(configProps, metricGroup, subscribedShard, kinesis);
 
-		return new ShardConsumer<>(
+		return new ShardConsumer<T>(
 			this,
 			recordPublisher,
 			subscribedShardStateIndex,
 			subscribedShard,
 			lastSequenceNum,
 			new ShardConsumerMetricsReporter(metricGroup),
-			shardDeserializer);
+			shardDeserializer,
+			configProps,

Review comment:
       Solved.

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
##########
@@ -22,21 +22,29 @@
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

Review comment:
       Solved




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #13005: Feature/flink 18661 efo de registration

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #13005:
URL: https://github.com/apache/flink/pull/13005#discussion_r463385236



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+	private static final Logger LOG = LoggerFactory.getLogger(KinesisProxyV2.class);
+
+	/** Random seed used to calculate backoff jitter for Kinesis operations. */
+	private static final Random seed = new Random();
 
 	private final KinesisAsyncClient kinesisAsyncClient;
 
+	private final FanOutRecordPublisherConfiguration fanOutRecordPublisherConfiguration;
+
 	/**
 	 * Create a new KinesisProxyV2 based on the supplied configuration properties.
 	 *
-	 * @param kinesisAsyncClient the kinesis async client used to communicate with Kinesis
+	 * @param configProps configuration properties containing AWS credential and AWS region info
+	 */
+	public KinesisProxyV2(final Properties configProps, List<String> streams) {
+		this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+		this.fanOutRecordPublisherConfiguration = new FanOutRecordPublisherConfiguration(configProps, streams);
+	}
+
+	/**
+	 * Creates a Kinesis proxy V2.
+	 *
+	 * @param configProps configuration properties
+	 * @param streams list of kinesis stream names
+	 * @return the created kinesis proxy v2
+	 */
+	public static KinesisProxyV2Interface create(Properties configProps, List<String> streams) {
+		return new KinesisProxyV2(configProps, streams);
+	}
+
+	/**
+	 * Create the Kinesis client, using the provided configuration properties.
+	 * Derived classes can override this method to customize the client configuration.
+	 *
+	 * @param configProps the properties map used to create the Kinesis Client
+	 * @return a Kinesis Client
+	 */
+	protected KinesisAsyncClient createKinesisAsyncClient(final Properties configProps) {
+		final ClientConfiguration config = new ClientConfigurationFactory().getConfig();
+		return AwsV2Util.createKinesisAsyncClient(configProps, config);
+	}
+
+	/**
+	 * {@inheritDoc}
 	 */
-	public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
-		this.kinesisAsyncClient = kinesisAsyncClient;
+	@Override
+	public Map<String, String> describeStream(List<String> streams) throws InterruptedException, ExecutionException {

Review comment:
       I changed the interface accordingly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #13005: Feature/flink 18661 efo de registration

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #13005:
URL: https://github.com/apache/flink/pull/13005#discussion_r463384576



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
##########
@@ -248,4 +254,29 @@ static AwsCredentialsProvider getWebIdentityTokenFileCredentialsProvider(
 	public static Region getRegion(final Properties configProps) {
 		return Region.of(configProps.getProperty(AWSConfigConstants.AWS_REGION));
 	}
+
+	/**
+	 * Whether or not an exception is recoverable.
+	 */
+	public static boolean isRecoverableException(ExecutionException e) {
+		if (!(e.getCause() instanceof SdkException)) {
+			return false;
+		}
+		SdkException ase = (SdkException) e.getCause();
+		return ase instanceof LimitExceededException || ase instanceof ProvisionedThroughputExceededException || ase instanceof ResourceInUseException;

Review comment:
       I re-read the code and found out that the ResourceInUseExeption might not be a recoverable exception and removed it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn closed pull request #13005: Feature/flink 18661 efo de registration

Posted by GitBox <gi...@apache.org>.
xiaolong-sn closed pull request #13005:
URL: https://github.com/apache/flink/pull/13005


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13005: Feature/flink 18661 efo de registration

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13005:
URL: https://github.com/apache/flink/pull/13005#issuecomment-665003864


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bf29035a5c92930535a80692d61d0819605c4206",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4965",
       "triggerID" : "bf29035a5c92930535a80692d61d0819605c4206",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bf29035a5c92930535a80692d61d0819605c4206 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4965) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dannycranmer commented on a change in pull request #13005: Feature/flink 18661 efo de registration

Posted by GitBox <gi...@apache.org>.
dannycranmer commented on a change in pull request #13005:
URL: https://github.com/apache/flink/pull/13005#discussion_r461532730



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
##########
@@ -362,6 +440,10 @@ protected KinesisDataFetcher(List<String> streams,
 		this.watermarkTracker = watermarkTracker;
 		this.kinesisProxyFactory = checkNotNull(kinesisProxyFactory);
 		this.kinesis = kinesisProxyFactory.create(configProps);
+		this.kinesisProxyV2Factory = checkNotNull(kinesisProxyV2Factory);
+		if (shouldRegisterConsumerEagerly()) {

Review comment:
       @xiaolong-sn `EAGER` should be done in the `FlinkKinesisConsumer` constructor, `LAZY` should be done here

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
##########
@@ -400,20 +482,51 @@ private RecordEmitter createRecordEmitter(Properties configProps) {
 		SequenceNumber lastSequenceNum,
 		MetricGroup metricGroup,
 		KinesisDeserializationSchema<T> shardDeserializer) {
+		return createShardConsumer(
+			subscribedShardStateIndex,
+			subscribedShard,
+			lastSequenceNum,
+			metricGroup,
+			shardDeserializer,
+			null
+		);
+	}
+
+	/**
+	 * Create a new shard consumer.
+	 * Override this method to customize shard consumer behavior in subclasses.
+	 * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to
+	 * @param subscribedShard the shard this consumer is subscribed to
+	 * @param lastSequenceNum the sequence number in the shard to start consuming
+	 * @param metricGroup the metric group to report metrics to
+	 * @param streamInfoList the stream info used for enhanced fan-out to consume from
+	 * @return shard consumer
+	 */
+	protected ShardConsumer<T> createShardConsumer(
+		Integer subscribedShardStateIndex,
+		StreamShardHandle subscribedShard,
+		SequenceNumber lastSequenceNum,
+		MetricGroup metricGroup,
+		KinesisDeserializationSchema<T> shardDeserializer,
+		@Nullable List<FanOutStreamInfo> streamInfoList) {

Review comment:
       @xiaolong-sn why do we need to pass all the stream info? A `ShardConsumer` is only concerned about a single stream

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
##########
@@ -134,6 +134,46 @@
 	 */
 	private final int listStreamConsumersMaxRetries;
 
+	/**
+	 * Max retries for the describe stream operation.
+	 */
+	private final int describeStreamMaxRetries;

Review comment:
       @xiaolong-sn it would make sense to move the `DescribeStream` bits to you config PR since that has not been reviewed yet

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutStreamInfo.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import java.util.Objects;
+
+/**
+ * This is a data class which describe all related information when de-/registering streams.
+ */
+public class FanOutStreamInfo {
+	/** Kinesis stream name. */
+	private final String stream;
+
+	/** Kinesis stream arn. */
+	private final String streamArn;
+
+	/** Registered consumer name for the related stream. */
+	private final String consumerName;
+
+	/** Registered consumer arn for the related stream. */
+	private final String consumerArn;
+
+	/**
+	 * Return the Kinesis stream name.
+	 */
+	public String getStream() {
+		return stream;
+	}
+
+	/**
+	 * Return the Kinesis stream arn.
+	 */
+	public String getStreamArn() {
+		return streamArn;
+	}
+
+	/**
+	 * Return the Kinesis consumer name for an enhanced fan-out consumer.
+	 */
+	public String getConsumerName() {
+		return consumerName;
+	}
+
+	/**
+	 * Return the Kinesis consumer arn for an enhanced fan-out consumer.
+	 */
+	public String getConsumerArn() {
+		return consumerArn;
+	}
+
+	/**
+	 * Public constructor for fan out stream info.
+	 */
+	public FanOutStreamInfo(String stream, String streamArn, String consumerName, String consumerArn) {

Review comment:
       @xiaolong-sn nit: constructors should go above methods

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+	private static final Logger LOG = LoggerFactory.getLogger(KinesisProxyV2.class);
+
+	/** Random seed used to calculate backoff jitter for Kinesis operations. */
+	private static final Random seed = new Random();
 
 	private final KinesisAsyncClient kinesisAsyncClient;
 
+	private final FanOutRecordPublisherConfiguration fanOutRecordPublisherConfiguration;
+
 	/**
 	 * Create a new KinesisProxyV2 based on the supplied configuration properties.
 	 *
-	 * @param kinesisAsyncClient the kinesis async client used to communicate with Kinesis
+	 * @param configProps configuration properties containing AWS credential and AWS region info
+	 */
+	public KinesisProxyV2(final Properties configProps, List<String> streams) {
+		this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+		this.fanOutRecordPublisherConfiguration = new FanOutRecordPublisherConfiguration(configProps, streams);
+	}
+
+	/**
+	 * Creates a Kinesis proxy V2.
+	 *
+	 * @param configProps configuration properties
+	 * @param streams list of kinesis stream names
+	 * @return the created kinesis proxy v2
+	 */
+	public static KinesisProxyV2Interface create(Properties configProps, List<String> streams) {
+		return new KinesisProxyV2(configProps, streams);
+	}
+
+	/**
+	 * Create the Kinesis client, using the provided configuration properties.
+	 * Derived classes can override this method to customize the client configuration.
+	 *
+	 * @param configProps the properties map used to create the Kinesis Client
+	 * @return a Kinesis Client
+	 */
+	protected KinesisAsyncClient createKinesisAsyncClient(final Properties configProps) {
+		final ClientConfiguration config = new ClientConfigurationFactory().getConfig();
+		return AwsV2Util.createKinesisAsyncClient(configProps, config);
+	}
+
+	/**
+	 * {@inheritDoc}
 	 */
-	public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {

Review comment:
       @xiaolong-sn I had changed this class to not construct it dependencies inline with Dependency Inversion (SOLID) principle (https://github.com/apache/flink/pull/13005/commits/b7e8c0631ff1484905ff1ecf8f89a8b81036bc1a)
   

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+	private static final Logger LOG = LoggerFactory.getLogger(KinesisProxyV2.class);
+
+	/** Random seed used to calculate backoff jitter for Kinesis operations. */
+	private static final Random seed = new Random();
 
 	private final KinesisAsyncClient kinesisAsyncClient;
 
+	private final FanOutRecordPublisherConfiguration fanOutRecordPublisherConfiguration;
+
 	/**
 	 * Create a new KinesisProxyV2 based on the supplied configuration properties.
 	 *
-	 * @param kinesisAsyncClient the kinesis async client used to communicate with Kinesis
+	 * @param configProps configuration properties containing AWS credential and AWS region info
+	 */
+	public KinesisProxyV2(final Properties configProps, List<String> streams) {
+		this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+		this.fanOutRecordPublisherConfiguration = new FanOutRecordPublisherConfiguration(configProps, streams);
+	}
+
+	/**
+	 * Creates a Kinesis proxy V2.
+	 *
+	 * @param configProps configuration properties
+	 * @param streams list of kinesis stream names
+	 * @return the created kinesis proxy v2
+	 */
+	public static KinesisProxyV2Interface create(Properties configProps, List<String> streams) {
+		return new KinesisProxyV2(configProps, streams);
+	}
+
+	/**
+	 * Create the Kinesis client, using the provided configuration properties.
+	 * Derived classes can override this method to customize the client configuration.
+	 *
+	 * @param configProps the properties map used to create the Kinesis Client
+	 * @return a Kinesis Client
+	 */
+	protected KinesisAsyncClient createKinesisAsyncClient(final Properties configProps) {
+		final ClientConfiguration config = new ClientConfigurationFactory().getConfig();
+		return AwsV2Util.createKinesisAsyncClient(configProps, config);
+	}
+
+	/**
+	 * {@inheritDoc}
 	 */
-	public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
-		this.kinesisAsyncClient = kinesisAsyncClient;
+	@Override
+	public Map<String, String> describeStream(List<String> streams) throws InterruptedException, ExecutionException {
+		Map<String, String> result = new HashMap<>();
+		for (String stream : streams) {
+			DescribeStreamRequest describeStreamRequest = DescribeStreamRequest
+				.builder()
+				.streamName(stream)
+				.build();
+			DescribeStreamResponse describeStreamResponse = null;
+
+			int retryCount = 0;
+			while (retryCount <= fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() && describeStreamResponse == null) {
+				try {
+					describeStreamResponse = kinesisAsyncClient.describeStream(describeStreamRequest).get();
+				} catch (ExecutionException ex) {
+					if (AwsV2Util.isRecoverableException(ex)) {
+						long backoffMillis = fullJitterBackoff(
+							fanOutRecordPublisherConfiguration.getDescribeStreamBaseBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamMaxBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamExpConstant(), retryCount++);
+						LOG.warn("Got recoverable AmazonServiceException when trying to describe stream " + stream + ". Backing off for "
+							+ backoffMillis + " millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+						Thread.sleep(backoffMillis);
+					} else {
+						throw ex;
+					}
+				}
+			}
+			if (describeStreamResponse == null) {
+				throw new RuntimeException("Retries exceeded for describeStream operation - all " + fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() +
+					" retry attempts failed.");
+			}
+			result.put(stream, describeStreamResponse.streamDescription().streamARN());
+		}
+		return result;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public List<FanOutStreamInfo> registerStreamConsumer(Map<String, String> streamArns) throws InterruptedException, ExecutionException {
+		Preconditions.checkArgument(fanOutRecordPublisherConfiguration.getConsumerName().isPresent());
+		String consumerName = fanOutRecordPublisherConfiguration.getConsumerName().get();
+		List<FanOutStreamInfo> result = new ArrayList<>();
+		for (Map.Entry<String, String> entry : streamArns.entrySet()) {
+			String stream = entry.getKey();
+			String streamArn = entry.getValue();
+			RegisterStreamConsumerRequest registerStreamConsumerRequest = RegisterStreamConsumerRequest
+				.builder()
+				.consumerName(consumerName)
+				.streamARN(streamArn)
+				.build();
+			FanOutStreamInfo fanOutStreamInfo = null;
+			int retryCount = 0;
+			while (retryCount <= fanOutRecordPublisherConfiguration.getRegisterStreamMaxRetries() && fanOutStreamInfo == null) {
+				try {
+					RegisterStreamConsumerResponse registerStreamConsumerResponse = kinesisAsyncClient.registerStreamConsumer(registerStreamConsumerRequest).get();
+					fanOutStreamInfo = new FanOutStreamInfo(stream, streamArn, consumerName, registerStreamConsumerResponse.consumer().consumerARN());
+				} catch (ExecutionException ex) {
+					if (AwsV2Util.isResourceInUse(ex)) {
+						fanOutStreamInfo = describeStreamConsumer(stream, streamArn, consumerName);
+					} else if (AwsV2Util.isRecoverableException(ex)) {
+						long backoffMillis = fullJitterBackoff(
+							fanOutRecordPublisherConfiguration.getRegisterStreamBaseBackoffMillis(), fanOutRecordPublisherConfiguration.getRegisterStreamMaxBackoffMillis(), fanOutRecordPublisherConfiguration.getRegisterStreamExpConstant(), retryCount++);
+						LOG.warn("Got recoverable AmazonServiceException when trying to register " + stream + ". Backing off for "
+							+ backoffMillis + " millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+						Thread.sleep(backoffMillis);
+					} else {
+						throw ex;
+					}
+				}
+			}
+
+			if (fanOutStreamInfo == null) {
+				throw new RuntimeException("Retries exceeded for registerStream operation - all " + fanOutRecordPublisherConfiguration.getRegisterStreamMaxRetries() +
+					" retry attempts failed.");
+			}
+			result.add(fanOutStreamInfo);
+		}
+		return result;
+	}
+
+	public FanOutStreamInfo describeStreamConsumer(String stream, String streamArn, String consumerName) throws InterruptedException, ExecutionException  {
+		DescribeStreamConsumerRequest describeStreamConsumerRequest = DescribeStreamConsumerRequest
+			.builder()
+			.streamARN(streamArn)
+			.consumerName(consumerName)
+			.build();
+		FanOutStreamInfo fanOutStreamInfo = null;
+		int retryCount = 0;
+		while (retryCount <= fanOutRecordPublisherConfiguration.getDescribeStreamConsumerMaxRetries() && fanOutStreamInfo == null) {
+			try {
+				DescribeStreamConsumerResponse describeStreamConsumerResponse = kinesisAsyncClient.describeStreamConsumer(describeStreamConsumerRequest).get();
+				fanOutStreamInfo = new FanOutStreamInfo(stream, streamArn, consumerName, describeStreamConsumerResponse.consumerDescription().consumerARN());
+			} catch (ExecutionException ex) {
+				if (AwsV2Util.isRecoverableException(ex)) {
+					long backoffMillis = fullJitterBackoff(
+						fanOutRecordPublisherConfiguration.getDescribeStreamConsumerBaseBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamConsumerMaxBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamConsumerExpConstant(), retryCount++);
+					LOG.warn("Got recoverable AmazonServiceException when trying to describe stream consumer " + stream + ". Backing off for "
+						+ backoffMillis + " millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+					Thread.sleep(backoffMillis);
+				} else {
+					throw ex;
+				}
+			}
+		}
+
+		if (fanOutStreamInfo == null) {
+			throw new RuntimeException("Retries exceeded for registerStream operation - all " + fanOutRecordPublisherConfiguration.getRegisterStreamMaxRetries() +
+				" retry attempts failed.");
+		}
+		return fanOutStreamInfo;
 	}
 
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void deregisterStreamConsumer(List<FanOutStreamInfo> fanOutStreamInfos) throws InterruptedException, ExecutionException {

Review comment:
       @xiaolong-sn again please make this singular (single deregistration at a time)
   
   Also this needs to open with describe and stagger too as per the FLIP flow chart to deal with competing requests

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
##########
@@ -400,20 +482,51 @@ private RecordEmitter createRecordEmitter(Properties configProps) {
 		SequenceNumber lastSequenceNum,
 		MetricGroup metricGroup,
 		KinesisDeserializationSchema<T> shardDeserializer) {
+		return createShardConsumer(
+			subscribedShardStateIndex,
+			subscribedShard,
+			lastSequenceNum,
+			metricGroup,
+			shardDeserializer,
+			null
+		);
+	}
+
+	/**
+	 * Create a new shard consumer.
+	 * Override this method to customize shard consumer behavior in subclasses.
+	 * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to
+	 * @param subscribedShard the shard this consumer is subscribed to
+	 * @param lastSequenceNum the sequence number in the shard to start consuming
+	 * @param metricGroup the metric group to report metrics to
+	 * @param streamInfoList the stream info used for enhanced fan-out to consume from
+	 * @return shard consumer
+	 */
+	protected ShardConsumer<T> createShardConsumer(
+		Integer subscribedShardStateIndex,
+		StreamShardHandle subscribedShard,
+		SequenceNumber lastSequenceNum,
+		MetricGroup metricGroup,
+		KinesisDeserializationSchema<T> shardDeserializer,
+		@Nullable List<FanOutStreamInfo> streamInfoList) {
 
 		final KinesisProxyInterface kinesis = kinesisProxyFactory.create(configProps);
 
 		final RecordPublisher recordPublisher = new PollingRecordPublisherFactory()
 			.create(configProps, metricGroup, subscribedShard, kinesis);
 
-		return new ShardConsumer<>(
+		return new ShardConsumer<T>(
 			this,
 			recordPublisher,
 			subscribedShardStateIndex,
 			subscribedShard,
 			lastSequenceNum,
 			new ShardConsumerMetricsReporter(metricGroup),
-			shardDeserializer);
+			shardDeserializer,
+			configProps,

Review comment:
       @xiaolong-sn all these additional properties should not be required once you move LAZY. Remember we have a single stream consumer per stream, not shard. So we should perform lazy in the `KinesisDataFetcher` (once per task manager)

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+	private static final Logger LOG = LoggerFactory.getLogger(KinesisProxyV2.class);
+
+	/** Random seed used to calculate backoff jitter for Kinesis operations. */
+	private static final Random seed = new Random();
 
 	private final KinesisAsyncClient kinesisAsyncClient;
 
+	private final FanOutRecordPublisherConfiguration fanOutRecordPublisherConfiguration;
+
 	/**
 	 * Create a new KinesisProxyV2 based on the supplied configuration properties.
 	 *
-	 * @param kinesisAsyncClient the kinesis async client used to communicate with Kinesis
+	 * @param configProps configuration properties containing AWS credential and AWS region info
+	 */
+	public KinesisProxyV2(final Properties configProps, List<String> streams) {
+		this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+		this.fanOutRecordPublisherConfiguration = new FanOutRecordPublisherConfiguration(configProps, streams);
+	}
+
+	/**
+	 * Creates a Kinesis proxy V2.
+	 *
+	 * @param configProps configuration properties
+	 * @param streams list of kinesis stream names
+	 * @return the created kinesis proxy v2
+	 */
+	public static KinesisProxyV2Interface create(Properties configProps, List<String> streams) {
+		return new KinesisProxyV2(configProps, streams);
+	}
+
+	/**
+	 * Create the Kinesis client, using the provided configuration properties.
+	 * Derived classes can override this method to customize the client configuration.
+	 *
+	 * @param configProps the properties map used to create the Kinesis Client
+	 * @return a Kinesis Client
+	 */
+	protected KinesisAsyncClient createKinesisAsyncClient(final Properties configProps) {
+		final ClientConfiguration config = new ClientConfigurationFactory().getConfig();
+		return AwsV2Util.createKinesisAsyncClient(configProps, config);
+	}
+
+	/**
+	 * {@inheritDoc}
 	 */
-	public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
-		this.kinesisAsyncClient = kinesisAsyncClient;
+	@Override
+	public Map<String, String> describeStream(List<String> streams) throws InterruptedException, ExecutionException {
+		Map<String, String> result = new HashMap<>();
+		for (String stream : streams) {
+			DescribeStreamRequest describeStreamRequest = DescribeStreamRequest
+				.builder()
+				.streamName(stream)
+				.build();
+			DescribeStreamResponse describeStreamResponse = null;
+
+			int retryCount = 0;
+			while (retryCount <= fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() && describeStreamResponse == null) {
+				try {
+					describeStreamResponse = kinesisAsyncClient.describeStream(describeStreamRequest).get();
+				} catch (ExecutionException ex) {
+					if (AwsV2Util.isRecoverableException(ex)) {
+						long backoffMillis = fullJitterBackoff(
+							fanOutRecordPublisherConfiguration.getDescribeStreamBaseBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamMaxBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamExpConstant(), retryCount++);
+						LOG.warn("Got recoverable AmazonServiceException when trying to describe stream " + stream + ". Backing off for "
+							+ backoffMillis + " millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+						Thread.sleep(backoffMillis);
+					} else {
+						throw ex;
+					}
+				}
+			}
+			if (describeStreamResponse == null) {
+				throw new RuntimeException("Retries exceeded for describeStream operation - all " + fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() +
+					" retry attempts failed.");
+			}
+			result.put(stream, describeStreamResponse.streamDescription().streamARN());
+		}
+		return result;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public List<FanOutStreamInfo> registerStreamConsumer(Map<String, String> streamArns) throws InterruptedException, ExecutionException {

Review comment:
       @xiaolong-sn again I think this should accept a single streamArn/consumerName and loop externally

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutStreamInfo.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import java.util.Objects;
+
+/**
+ * This is a data class which describe all related information when de-/registering streams.
+ */
+public class FanOutStreamInfo {
+	/** Kinesis stream name. */
+	private final String stream;
+
+	/** Kinesis stream arn. */
+	private final String streamArn;
+
+	/** Registered consumer name for the related stream. */
+	private final String consumerName;
+
+	/** Registered consumer arn for the related stream. */
+	private final String consumerArn;
+
+	/**
+	 * Return the Kinesis stream name.
+	 */
+	public String getStream() {
+		return stream;
+	}
+
+	/**
+	 * Return the Kinesis stream arn.
+	 */
+	public String getStreamArn() {
+		return streamArn;
+	}
+
+	/**
+	 * Return the Kinesis consumer name for an enhanced fan-out consumer.
+	 */
+	public String getConsumerName() {
+		return consumerName;
+	}
+
+	/**
+	 * Return the Kinesis consumer arn for an enhanced fan-out consumer.
+	 */
+	public String getConsumerArn() {
+		return consumerArn;
+	}
+
+	/**
+	 * Public constructor for fan out stream info.
+	 */
+	public FanOutStreamInfo(String stream, String streamArn, String consumerName, String consumerArn) {
+		this.stream = stream;
+		this.streamArn = streamArn;
+		this.consumerName = consumerName;
+		this.consumerArn = consumerArn;
+	}
+
+	@Override
+	public boolean equals(Object o) {

Review comment:
       @xiaolong-sn do we need equals/hashCode for this object?

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
##########
@@ -147,9 +228,31 @@ public void run() {
 			}
 		} catch (Throwable t) {
 			fetcherRef.stopWithError(t);
+		} finally {
+			if (this.streamInfoList != null) {
+				try {
+					deregisterStreamConsumer();
+				} catch (Throwable t) {
+					fetcherRef.stopWithError(t);
+				}
+			}
 		}
 	}
 
+	private void deregisterStreamConsumer() throws ExecutionException, InterruptedException {

Review comment:
       @xiaolong-sn this needs to move to `KinesisDataFetcher::shutdownFetcher` 
   
   

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
##########
@@ -22,21 +22,29 @@
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

Review comment:
       @xiaolong-sn I was not expecting any changes to `ShardConsumer`, once you move `LAZY` to `KinesisDataFetcher` this should be unchanged

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutStreamInfo.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import java.util.Objects;
+
+/**
+ * This is a data class which describe all related information when de-/registering streams.
+ */
+public class FanOutStreamInfo {
+	/** Kinesis stream name. */
+	private final String stream;
+
+	/** Kinesis stream arn. */
+	private final String streamArn;
+
+	/** Registered consumer name for the related stream. */
+	private final String consumerName;

Review comment:
       @xiaolong-sn Immutable, nice +1

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
##########
@@ -248,4 +254,29 @@ static AwsCredentialsProvider getWebIdentityTokenFileCredentialsProvider(
 	public static Region getRegion(final Properties configProps) {
 		return Region.of(configProps.getProperty(AWSConfigConstants.AWS_REGION));
 	}
+
+	/**
+	 * Whether or not an exception is recoverable.
+	 */
+	public static boolean isRecoverableException(ExecutionException e) {
+		if (!(e.getCause() instanceof SdkException)) {
+			return false;
+		}
+		SdkException ase = (SdkException) e.getCause();
+		return ase instanceof LimitExceededException || ase instanceof ProvisionedThroughputExceededException || ase instanceof ResourceInUseException;

Review comment:
       @xiaolong-sn `ase instanceof ResourceInUseException` can be replaced with `isResourceInUse(ase)`

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+	private static final Logger LOG = LoggerFactory.getLogger(KinesisProxyV2.class);
+
+	/** Random seed used to calculate backoff jitter for Kinesis operations. */
+	private static final Random seed = new Random();
 
 	private final KinesisAsyncClient kinesisAsyncClient;
 
+	private final FanOutRecordPublisherConfiguration fanOutRecordPublisherConfiguration;
+
 	/**
 	 * Create a new KinesisProxyV2 based on the supplied configuration properties.
 	 *
-	 * @param kinesisAsyncClient the kinesis async client used to communicate with Kinesis
+	 * @param configProps configuration properties containing AWS credential and AWS region info
+	 */
+	public KinesisProxyV2(final Properties configProps, List<String> streams) {
+		this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+		this.fanOutRecordPublisherConfiguration = new FanOutRecordPublisherConfiguration(configProps, streams);
+	}
+
+	/**
+	 * Creates a Kinesis proxy V2.
+	 *
+	 * @param configProps configuration properties
+	 * @param streams list of kinesis stream names
+	 * @return the created kinesis proxy v2
+	 */
+	public static KinesisProxyV2Interface create(Properties configProps, List<String> streams) {
+		return new KinesisProxyV2(configProps, streams);
+	}
+
+	/**
+	 * Create the Kinesis client, using the provided configuration properties.
+	 * Derived classes can override this method to customize the client configuration.
+	 *
+	 * @param configProps the properties map used to create the Kinesis Client
+	 * @return a Kinesis Client
+	 */
+	protected KinesisAsyncClient createKinesisAsyncClient(final Properties configProps) {
+		final ClientConfiguration config = new ClientConfigurationFactory().getConfig();
+		return AwsV2Util.createKinesisAsyncClient(configProps, config);
+	}
+
+	/**
+	 * {@inheritDoc}
 	 */
-	public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
-		this.kinesisAsyncClient = kinesisAsyncClient;
+	@Override
+	public Map<String, String> describeStream(List<String> streams) throws InterruptedException, ExecutionException {

Review comment:
       @xiaolong-sn I feel like we are throwing a lot of potentially useful information away here, why not return `DescribeStreamResponse` instead and keep it generic? The KinesisProxyV1 returns SDK objects 

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutStreamInfo.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import java.util.Objects;
+
+/**
+ * This is a data class which describe all related information when de-/registering streams.
+ */
+public class FanOutStreamInfo {

Review comment:
       @xiaolong-sn This should be renamed to `FanOutStreamConsumerInfo` please

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutStreamInfo.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import java.util.Objects;
+
+/**
+ * This is a data class which describe all related information when de-/registering streams.
+ */
+public class FanOutStreamInfo {
+	/** Kinesis stream name. */
+	private final String stream;
+
+	/** Kinesis stream arn. */
+	private final String streamArn;
+
+	/** Registered consumer name for the related stream. */
+	private final String consumerName;
+
+	/** Registered consumer arn for the related stream. */
+	private final String consumerArn;
+
+	/**
+	 * Return the Kinesis stream name.
+	 */
+	public String getStream() {
+		return stream;
+	}
+
+	/**
+	 * Return the Kinesis stream arn.
+	 */
+	public String getStreamArn() {
+		return streamArn;
+	}
+
+	/**
+	 * Return the Kinesis consumer name for an enhanced fan-out consumer.
+	 */
+	public String getConsumerName() {
+		return consumerName;
+	}
+
+	/**
+	 * Return the Kinesis consumer arn for an enhanced fan-out consumer.
+	 */
+	public String getConsumerArn() {
+		return consumerArn;
+	}
+
+	/**
+	 * Public constructor for fan out stream info.

Review comment:
       @xiaolong-sn no javadoc for parameters

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+	private static final Logger LOG = LoggerFactory.getLogger(KinesisProxyV2.class);
+
+	/** Random seed used to calculate backoff jitter for Kinesis operations. */
+	private static final Random seed = new Random();
 
 	private final KinesisAsyncClient kinesisAsyncClient;
 
+	private final FanOutRecordPublisherConfiguration fanOutRecordPublisherConfiguration;
+
 	/**
 	 * Create a new KinesisProxyV2 based on the supplied configuration properties.
 	 *
-	 * @param kinesisAsyncClient the kinesis async client used to communicate with Kinesis
+	 * @param configProps configuration properties containing AWS credential and AWS region info
+	 */
+	public KinesisProxyV2(final Properties configProps, List<String> streams) {
+		this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+		this.fanOutRecordPublisherConfiguration = new FanOutRecordPublisherConfiguration(configProps, streams);
+	}
+
+	/**
+	 * Creates a Kinesis proxy V2.
+	 *
+	 * @param configProps configuration properties
+	 * @param streams list of kinesis stream names
+	 * @return the created kinesis proxy v2
+	 */
+	public static KinesisProxyV2Interface create(Properties configProps, List<String> streams) {
+		return new KinesisProxyV2(configProps, streams);
+	}
+
+	/**
+	 * Create the Kinesis client, using the provided configuration properties.
+	 * Derived classes can override this method to customize the client configuration.
+	 *
+	 * @param configProps the properties map used to create the Kinesis Client
+	 * @return a Kinesis Client
+	 */
+	protected KinesisAsyncClient createKinesisAsyncClient(final Properties configProps) {
+		final ClientConfiguration config = new ClientConfigurationFactory().getConfig();
+		return AwsV2Util.createKinesisAsyncClient(configProps, config);
+	}
+
+	/**
+	 * {@inheritDoc}
 	 */
-	public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
-		this.kinesisAsyncClient = kinesisAsyncClient;
+	@Override
+	public Map<String, String> describeStream(List<String> streams) throws InterruptedException, ExecutionException {
+		Map<String, String> result = new HashMap<>();
+		for (String stream : streams) {
+			DescribeStreamRequest describeStreamRequest = DescribeStreamRequest
+				.builder()
+				.streamName(stream)
+				.build();
+			DescribeStreamResponse describeStreamResponse = null;
+
+			int retryCount = 0;
+			while (retryCount <= fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() && describeStreamResponse == null) {
+				try {
+					describeStreamResponse = kinesisAsyncClient.describeStream(describeStreamRequest).get();
+				} catch (ExecutionException ex) {
+					if (AwsV2Util.isRecoverableException(ex)) {
+						long backoffMillis = fullJitterBackoff(
+							fanOutRecordPublisherConfiguration.getDescribeStreamBaseBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamMaxBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamExpConstant(), retryCount++);
+						LOG.warn("Got recoverable AmazonServiceException when trying to describe stream " + stream + ". Backing off for "
+							+ backoffMillis + " millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+						Thread.sleep(backoffMillis);
+					} else {
+						throw ex;
+					}
+				}
+			}
+			if (describeStreamResponse == null) {
+				throw new RuntimeException("Retries exceeded for describeStream operation - all " + fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() +
+					" retry attempts failed.");
+			}
+			result.put(stream, describeStreamResponse.streamDescription().streamARN());
+		}
+		return result;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public List<FanOutStreamInfo> registerStreamConsumer(Map<String, String> streamArns) throws InterruptedException, ExecutionException {
+		Preconditions.checkArgument(fanOutRecordPublisherConfiguration.getConsumerName().isPresent());
+		String consumerName = fanOutRecordPublisherConfiguration.getConsumerName().get();
+		List<FanOutStreamInfo> result = new ArrayList<>();
+		for (Map.Entry<String, String> entry : streamArns.entrySet()) {
+			String stream = entry.getKey();
+			String streamArn = entry.getValue();
+			RegisterStreamConsumerRequest registerStreamConsumerRequest = RegisterStreamConsumerRequest

Review comment:
       @xiaolong-sn what happens if the stream consumer is already registered? The flow chart in the FLIP had a call to `ListStreamConsumers` to establish if it is already registered. Also in the case of Lazy we are meant to be introducing a jitter delay up front. Please refer to the flowcharts in the [FLIP](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers)

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+	private static final Logger LOG = LoggerFactory.getLogger(KinesisProxyV2.class);
+
+	/** Random seed used to calculate backoff jitter for Kinesis operations. */
+	private static final Random seed = new Random();
 
 	private final KinesisAsyncClient kinesisAsyncClient;
 
+	private final FanOutRecordPublisherConfiguration fanOutRecordPublisherConfiguration;
+
 	/**
 	 * Create a new KinesisProxyV2 based on the supplied configuration properties.
 	 *
-	 * @param kinesisAsyncClient the kinesis async client used to communicate with Kinesis
+	 * @param configProps configuration properties containing AWS credential and AWS region info
+	 */
+	public KinesisProxyV2(final Properties configProps, List<String> streams) {
+		this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+		this.fanOutRecordPublisherConfiguration = new FanOutRecordPublisherConfiguration(configProps, streams);
+	}
+
+	/**
+	 * Creates a Kinesis proxy V2.
+	 *
+	 * @param configProps configuration properties
+	 * @param streams list of kinesis stream names
+	 * @return the created kinesis proxy v2
+	 */
+	public static KinesisProxyV2Interface create(Properties configProps, List<String> streams) {
+		return new KinesisProxyV2(configProps, streams);
+	}
+
+	/**
+	 * Create the Kinesis client, using the provided configuration properties.
+	 * Derived classes can override this method to customize the client configuration.
+	 *
+	 * @param configProps the properties map used to create the Kinesis Client
+	 * @return a Kinesis Client
+	 */
+	protected KinesisAsyncClient createKinesisAsyncClient(final Properties configProps) {
+		final ClientConfiguration config = new ClientConfigurationFactory().getConfig();
+		return AwsV2Util.createKinesisAsyncClient(configProps, config);
+	}
+
+	/**
+	 * {@inheritDoc}
 	 */
-	public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
-		this.kinesisAsyncClient = kinesisAsyncClient;
+	@Override
+	public Map<String, String> describeStream(List<String> streams) throws InterruptedException, ExecutionException {

Review comment:
       @xiaolong-sn this method is called `describeStream` yet it is describing streams, please rename or split to two methods:
   - `describeStream`
   - `describeStreams`

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+	private static final Logger LOG = LoggerFactory.getLogger(KinesisProxyV2.class);
+
+	/** Random seed used to calculate backoff jitter for Kinesis operations. */
+	private static final Random seed = new Random();
 
 	private final KinesisAsyncClient kinesisAsyncClient;
 
+	private final FanOutRecordPublisherConfiguration fanOutRecordPublisherConfiguration;
+
 	/**
 	 * Create a new KinesisProxyV2 based on the supplied configuration properties.
 	 *
-	 * @param kinesisAsyncClient the kinesis async client used to communicate with Kinesis
+	 * @param configProps configuration properties containing AWS credential and AWS region info
+	 */
+	public KinesisProxyV2(final Properties configProps, List<String> streams) {
+		this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+		this.fanOutRecordPublisherConfiguration = new FanOutRecordPublisherConfiguration(configProps, streams);
+	}
+
+	/**
+	 * Creates a Kinesis proxy V2.
+	 *
+	 * @param configProps configuration properties
+	 * @param streams list of kinesis stream names
+	 * @return the created kinesis proxy v2
+	 */
+	public static KinesisProxyV2Interface create(Properties configProps, List<String> streams) {
+		return new KinesisProxyV2(configProps, streams);
+	}
+
+	/**
+	 * Create the Kinesis client, using the provided configuration properties.
+	 * Derived classes can override this method to customize the client configuration.
+	 *
+	 * @param configProps the properties map used to create the Kinesis Client
+	 * @return a Kinesis Client
+	 */
+	protected KinesisAsyncClient createKinesisAsyncClient(final Properties configProps) {
+		final ClientConfiguration config = new ClientConfigurationFactory().getConfig();
+		return AwsV2Util.createKinesisAsyncClient(configProps, config);
+	}
+
+	/**
+	 * {@inheritDoc}
 	 */
-	public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
-		this.kinesisAsyncClient = kinesisAsyncClient;
+	@Override
+	public Map<String, String> describeStream(List<String> streams) throws InterruptedException, ExecutionException {
+		Map<String, String> result = new HashMap<>();
+		for (String stream : streams) {
+			DescribeStreamRequest describeStreamRequest = DescribeStreamRequest
+				.builder()
+				.streamName(stream)
+				.build();
+			DescribeStreamResponse describeStreamResponse = null;
+
+			int retryCount = 0;
+			while (retryCount <= fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() && describeStreamResponse == null) {
+				try {
+					describeStreamResponse = kinesisAsyncClient.describeStream(describeStreamRequest).get();
+				} catch (ExecutionException ex) {
+					if (AwsV2Util.isRecoverableException(ex)) {
+						long backoffMillis = fullJitterBackoff(
+							fanOutRecordPublisherConfiguration.getDescribeStreamBaseBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamMaxBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamExpConstant(), retryCount++);
+						LOG.warn("Got recoverable AmazonServiceException when trying to describe stream " + stream + ". Backing off for "
+							+ backoffMillis + " millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+						Thread.sleep(backoffMillis);
+					} else {
+						throw ex;
+					}
+				}
+			}
+			if (describeStreamResponse == null) {
+				throw new RuntimeException("Retries exceeded for describeStream operation - all " + fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() +
+					" retry attempts failed.");
+			}
+			result.put(stream, describeStreamResponse.streamDescription().streamARN());
+		}
+		return result;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public List<FanOutStreamInfo> registerStreamConsumer(Map<String, String> streamArns) throws InterruptedException, ExecutionException {
+		Preconditions.checkArgument(fanOutRecordPublisherConfiguration.getConsumerName().isPresent());
+		String consumerName = fanOutRecordPublisherConfiguration.getConsumerName().get();
+		List<FanOutStreamInfo> result = new ArrayList<>();
+		for (Map.Entry<String, String> entry : streamArns.entrySet()) {
+			String stream = entry.getKey();
+			String streamArn = entry.getValue();
+			RegisterStreamConsumerRequest registerStreamConsumerRequest = RegisterStreamConsumerRequest

Review comment:
       Ah I see you are using the `describeStreamConsumer` instead of list. That is better actually. But I think you should do that first, as per the flow chart. Just replace List with Describe. If the consumer is already registered it will be 1 call instead of 2 per task. 
   
   Describe has a higher TPS too so that is good!

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+	private static final Logger LOG = LoggerFactory.getLogger(KinesisProxyV2.class);
+
+	/** Random seed used to calculate backoff jitter for Kinesis operations. */
+	private static final Random seed = new Random();
 
 	private final KinesisAsyncClient kinesisAsyncClient;
 
+	private final FanOutRecordPublisherConfiguration fanOutRecordPublisherConfiguration;
+
 	/**
 	 * Create a new KinesisProxyV2 based on the supplied configuration properties.
 	 *
-	 * @param kinesisAsyncClient the kinesis async client used to communicate with Kinesis
+	 * @param configProps configuration properties containing AWS credential and AWS region info
+	 */
+	public KinesisProxyV2(final Properties configProps, List<String> streams) {
+		this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+		this.fanOutRecordPublisherConfiguration = new FanOutRecordPublisherConfiguration(configProps, streams);
+	}
+
+	/**
+	 * Creates a Kinesis proxy V2.
+	 *
+	 * @param configProps configuration properties
+	 * @param streams list of kinesis stream names
+	 * @return the created kinesis proxy v2
+	 */
+	public static KinesisProxyV2Interface create(Properties configProps, List<String> streams) {
+		return new KinesisProxyV2(configProps, streams);
+	}
+
+	/**
+	 * Create the Kinesis client, using the provided configuration properties.
+	 * Derived classes can override this method to customize the client configuration.
+	 *
+	 * @param configProps the properties map used to create the Kinesis Client
+	 * @return a Kinesis Client
+	 */
+	protected KinesisAsyncClient createKinesisAsyncClient(final Properties configProps) {
+		final ClientConfiguration config = new ClientConfigurationFactory().getConfig();
+		return AwsV2Util.createKinesisAsyncClient(configProps, config);
+	}
+
+	/**
+	 * {@inheritDoc}
 	 */
-	public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
-		this.kinesisAsyncClient = kinesisAsyncClient;
+	@Override
+	public Map<String, String> describeStream(List<String> streams) throws InterruptedException, ExecutionException {

Review comment:
       @xiaolong-sn considering we cannot batch call I would opt to just expose a single `describeStream` method and loop externally to remove the Map and additional complexities around that:
   -  `DescribeStreamResponse describeStream(final String streamName)`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #13005: Feature/flink 18661 efo de registration

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #13005:
URL: https://github.com/apache/flink/pull/13005#discussion_r463384647



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+	private static final Logger LOG = LoggerFactory.getLogger(KinesisProxyV2.class);
+
+	/** Random seed used to calculate backoff jitter for Kinesis operations. */
+	private static final Random seed = new Random();
 
 	private final KinesisAsyncClient kinesisAsyncClient;
 
+	private final FanOutRecordPublisherConfiguration fanOutRecordPublisherConfiguration;
+
 	/**
 	 * Create a new KinesisProxyV2 based on the supplied configuration properties.
 	 *
-	 * @param kinesisAsyncClient the kinesis async client used to communicate with Kinesis
+	 * @param configProps configuration properties containing AWS credential and AWS region info
+	 */
+	public KinesisProxyV2(final Properties configProps, List<String> streams) {
+		this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+		this.fanOutRecordPublisherConfiguration = new FanOutRecordPublisherConfiguration(configProps, streams);
+	}
+
+	/**
+	 * Creates a Kinesis proxy V2.
+	 *
+	 * @param configProps configuration properties
+	 * @param streams list of kinesis stream names
+	 * @return the created kinesis proxy v2
+	 */
+	public static KinesisProxyV2Interface create(Properties configProps, List<String> streams) {
+		return new KinesisProxyV2(configProps, streams);
+	}
+
+	/**
+	 * Create the Kinesis client, using the provided configuration properties.
+	 * Derived classes can override this method to customize the client configuration.
+	 *
+	 * @param configProps the properties map used to create the Kinesis Client
+	 * @return a Kinesis Client
+	 */
+	protected KinesisAsyncClient createKinesisAsyncClient(final Properties configProps) {
+		final ClientConfiguration config = new ClientConfigurationFactory().getConfig();
+		return AwsV2Util.createKinesisAsyncClient(configProps, config);
+	}
+
+	/**
+	 * {@inheritDoc}
 	 */
-	public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
-		this.kinesisAsyncClient = kinesisAsyncClient;
+	@Override
+	public Map<String, String> describeStream(List<String> streams) throws InterruptedException, ExecutionException {
+		Map<String, String> result = new HashMap<>();
+		for (String stream : streams) {
+			DescribeStreamRequest describeStreamRequest = DescribeStreamRequest
+				.builder()
+				.streamName(stream)
+				.build();
+			DescribeStreamResponse describeStreamResponse = null;
+
+			int retryCount = 0;
+			while (retryCount <= fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() && describeStreamResponse == null) {
+				try {
+					describeStreamResponse = kinesisAsyncClient.describeStream(describeStreamRequest).get();
+				} catch (ExecutionException ex) {
+					if (AwsV2Util.isRecoverableException(ex)) {
+						long backoffMillis = fullJitterBackoff(
+							fanOutRecordPublisherConfiguration.getDescribeStreamBaseBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamMaxBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamExpConstant(), retryCount++);
+						LOG.warn("Got recoverable AmazonServiceException when trying to describe stream " + stream + ". Backing off for "
+							+ backoffMillis + " millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+						Thread.sleep(backoffMillis);
+					} else {
+						throw ex;
+					}
+				}
+			}
+			if (describeStreamResponse == null) {
+				throw new RuntimeException("Retries exceeded for describeStream operation - all " + fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() +
+					" retry attempts failed.");
+			}
+			result.put(stream, describeStreamResponse.streamDescription().streamARN());
+		}
+		return result;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public List<FanOutStreamInfo> registerStreamConsumer(Map<String, String> streamArns) throws InterruptedException, ExecutionException {
+		Preconditions.checkArgument(fanOutRecordPublisherConfiguration.getConsumerName().isPresent());
+		String consumerName = fanOutRecordPublisherConfiguration.getConsumerName().get();
+		List<FanOutStreamInfo> result = new ArrayList<>();
+		for (Map.Entry<String, String> entry : streamArns.entrySet()) {
+			String stream = entry.getKey();
+			String streamArn = entry.getValue();
+			RegisterStreamConsumerRequest registerStreamConsumerRequest = RegisterStreamConsumerRequest
+				.builder()
+				.consumerName(consumerName)
+				.streamARN(streamArn)
+				.build();
+			FanOutStreamInfo fanOutStreamInfo = null;
+			int retryCount = 0;
+			while (retryCount <= fanOutRecordPublisherConfiguration.getRegisterStreamMaxRetries() && fanOutStreamInfo == null) {
+				try {
+					RegisterStreamConsumerResponse registerStreamConsumerResponse = kinesisAsyncClient.registerStreamConsumer(registerStreamConsumerRequest).get();
+					fanOutStreamInfo = new FanOutStreamInfo(stream, streamArn, consumerName, registerStreamConsumerResponse.consumer().consumerARN());
+				} catch (ExecutionException ex) {
+					if (AwsV2Util.isResourceInUse(ex)) {
+						fanOutStreamInfo = describeStreamConsumer(stream, streamArn, consumerName);
+					} else if (AwsV2Util.isRecoverableException(ex)) {
+						long backoffMillis = fullJitterBackoff(
+							fanOutRecordPublisherConfiguration.getRegisterStreamBaseBackoffMillis(), fanOutRecordPublisherConfiguration.getRegisterStreamMaxBackoffMillis(), fanOutRecordPublisherConfiguration.getRegisterStreamExpConstant(), retryCount++);
+						LOG.warn("Got recoverable AmazonServiceException when trying to register " + stream + ". Backing off for "
+							+ backoffMillis + " millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+						Thread.sleep(backoffMillis);
+					} else {
+						throw ex;
+					}
+				}
+			}
+
+			if (fanOutStreamInfo == null) {
+				throw new RuntimeException("Retries exceeded for registerStream operation - all " + fanOutRecordPublisherConfiguration.getRegisterStreamMaxRetries() +
+					" retry attempts failed.");
+			}
+			result.add(fanOutStreamInfo);
+		}
+		return result;
+	}
+
+	public FanOutStreamInfo describeStreamConsumer(String stream, String streamArn, String consumerName) throws InterruptedException, ExecutionException  {
+		DescribeStreamConsumerRequest describeStreamConsumerRequest = DescribeStreamConsumerRequest
+			.builder()
+			.streamARN(streamArn)
+			.consumerName(consumerName)
+			.build();
+		FanOutStreamInfo fanOutStreamInfo = null;
+		int retryCount = 0;
+		while (retryCount <= fanOutRecordPublisherConfiguration.getDescribeStreamConsumerMaxRetries() && fanOutStreamInfo == null) {
+			try {
+				DescribeStreamConsumerResponse describeStreamConsumerResponse = kinesisAsyncClient.describeStreamConsumer(describeStreamConsumerRequest).get();
+				fanOutStreamInfo = new FanOutStreamInfo(stream, streamArn, consumerName, describeStreamConsumerResponse.consumerDescription().consumerARN());
+			} catch (ExecutionException ex) {
+				if (AwsV2Util.isRecoverableException(ex)) {
+					long backoffMillis = fullJitterBackoff(
+						fanOutRecordPublisherConfiguration.getDescribeStreamConsumerBaseBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamConsumerMaxBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamConsumerExpConstant(), retryCount++);
+					LOG.warn("Got recoverable AmazonServiceException when trying to describe stream consumer " + stream + ". Backing off for "
+						+ backoffMillis + " millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+					Thread.sleep(backoffMillis);
+				} else {
+					throw ex;
+				}
+			}
+		}
+
+		if (fanOutStreamInfo == null) {
+			throw new RuntimeException("Retries exceeded for registerStream operation - all " + fanOutRecordPublisherConfiguration.getRegisterStreamMaxRetries() +
+				" retry attempts failed.");
+		}
+		return fanOutStreamInfo;
 	}
 
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void deregisterStreamConsumer(List<FanOutStreamInfo> fanOutStreamInfos) throws InterruptedException, ExecutionException {

Review comment:
       About `deregisterStreamConsumer` operation, why can’t we simply deregister first, and if the exception it throws is `ResourceNotFoundException` , we then simply end the operation ( for it indicates that the consumer is already deregistered by others).
   
   That way, we do not have to call the `describeStreamConsumer` operation at all.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #13005: Feature/flink 18661 efo de registration

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #13005:
URL: https://github.com/apache/flink/pull/13005#discussion_r463385433



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutStreamInfo.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import java.util.Objects;
+
+/**
+ * This is a data class which describe all related information when de-/registering streams.
+ */
+public class FanOutStreamInfo {

Review comment:
       OK




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #13005: Feature/flink 18661 efo de registration

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #13005:
URL: https://github.com/apache/flink/pull/13005#discussion_r463452532



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
##########
@@ -147,9 +228,31 @@ public void run() {
 			}
 		} catch (Throwable t) {
 			fetcherRef.stopWithError(t);
+		} finally {
+			if (this.streamInfoList != null) {
+				try {
+					deregisterStreamConsumer();
+				} catch (Throwable t) {
+					fetcherRef.stopWithError(t);
+				}
+			}
 		}
 	}
 
+	private void deregisterStreamConsumer() throws ExecutionException, InterruptedException {

Review comment:
       Moved.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #13005: Feature/flink 18661 efo de registration

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #13005:
URL: https://github.com/apache/flink/pull/13005#discussion_r463453064



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
##########
@@ -400,20 +482,51 @@ private RecordEmitter createRecordEmitter(Properties configProps) {
 		SequenceNumber lastSequenceNum,
 		MetricGroup metricGroup,
 		KinesisDeserializationSchema<T> shardDeserializer) {
+		return createShardConsumer(
+			subscribedShardStateIndex,
+			subscribedShard,
+			lastSequenceNum,
+			metricGroup,
+			shardDeserializer,
+			null
+		);
+	}
+
+	/**
+	 * Create a new shard consumer.
+	 * Override this method to customize shard consumer behavior in subclasses.
+	 * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to
+	 * @param subscribedShard the shard this consumer is subscribed to
+	 * @param lastSequenceNum the sequence number in the shard to start consuming
+	 * @param metricGroup the metric group to report metrics to
+	 * @param streamInfoList the stream info used for enhanced fan-out to consume from
+	 * @return shard consumer
+	 */
+	protected ShardConsumer<T> createShardConsumer(
+		Integer subscribedShardStateIndex,
+		StreamShardHandle subscribedShard,
+		SequenceNumber lastSequenceNum,
+		MetricGroup metricGroup,
+		KinesisDeserializationSchema<T> shardDeserializer,
+		@Nullable List<FanOutStreamInfo> streamInfoList) {

Review comment:
       I reverted the change to ShardConsumer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #13005: Feature/flink 18661 efo de registration

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #13005:
URL: https://github.com/apache/flink/pull/13005#discussion_r463385042



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+	private static final Logger LOG = LoggerFactory.getLogger(KinesisProxyV2.class);
+
+	/** Random seed used to calculate backoff jitter for Kinesis operations. */
+	private static final Random seed = new Random();
 
 	private final KinesisAsyncClient kinesisAsyncClient;
 
+	private final FanOutRecordPublisherConfiguration fanOutRecordPublisherConfiguration;
+
 	/**
 	 * Create a new KinesisProxyV2 based on the supplied configuration properties.
 	 *
-	 * @param kinesisAsyncClient the kinesis async client used to communicate with Kinesis
+	 * @param configProps configuration properties containing AWS credential and AWS region info
+	 */
+	public KinesisProxyV2(final Properties configProps, List<String> streams) {
+		this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+		this.fanOutRecordPublisherConfiguration = new FanOutRecordPublisherConfiguration(configProps, streams);
+	}
+
+	/**
+	 * Creates a Kinesis proxy V2.
+	 *
+	 * @param configProps configuration properties
+	 * @param streams list of kinesis stream names
+	 * @return the created kinesis proxy v2
+	 */
+	public static KinesisProxyV2Interface create(Properties configProps, List<String> streams) {
+		return new KinesisProxyV2(configProps, streams);
+	}
+
+	/**
+	 * Create the Kinesis client, using the provided configuration properties.
+	 * Derived classes can override this method to customize the client configuration.
+	 *
+	 * @param configProps the properties map used to create the Kinesis Client
+	 * @return a Kinesis Client
+	 */
+	protected KinesisAsyncClient createKinesisAsyncClient(final Properties configProps) {
+		final ClientConfiguration config = new ClientConfigurationFactory().getConfig();
+		return AwsV2Util.createKinesisAsyncClient(configProps, config);
+	}
+
+	/**
+	 * {@inheritDoc}
 	 */
-	public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
-		this.kinesisAsyncClient = kinesisAsyncClient;
+	@Override
+	public Map<String, String> describeStream(List<String> streams) throws InterruptedException, ExecutionException {
+		Map<String, String> result = new HashMap<>();
+		for (String stream : streams) {
+			DescribeStreamRequest describeStreamRequest = DescribeStreamRequest
+				.builder()
+				.streamName(stream)
+				.build();
+			DescribeStreamResponse describeStreamResponse = null;
+
+			int retryCount = 0;
+			while (retryCount <= fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() && describeStreamResponse == null) {
+				try {
+					describeStreamResponse = kinesisAsyncClient.describeStream(describeStreamRequest).get();
+				} catch (ExecutionException ex) {
+					if (AwsV2Util.isRecoverableException(ex)) {
+						long backoffMillis = fullJitterBackoff(
+							fanOutRecordPublisherConfiguration.getDescribeStreamBaseBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamMaxBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamExpConstant(), retryCount++);
+						LOG.warn("Got recoverable AmazonServiceException when trying to describe stream " + stream + ". Backing off for "
+							+ backoffMillis + " millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+						Thread.sleep(backoffMillis);
+					} else {
+						throw ex;
+					}
+				}
+			}
+			if (describeStreamResponse == null) {
+				throw new RuntimeException("Retries exceeded for describeStream operation - all " + fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() +
+					" retry attempts failed.");
+			}
+			result.put(stream, describeStreamResponse.streamDescription().streamARN());
+		}
+		return result;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public List<FanOutStreamInfo> registerStreamConsumer(Map<String, String> streamArns) throws InterruptedException, ExecutionException {
+		Preconditions.checkArgument(fanOutRecordPublisherConfiguration.getConsumerName().isPresent());
+		String consumerName = fanOutRecordPublisherConfiguration.getConsumerName().get();
+		List<FanOutStreamInfo> result = new ArrayList<>();
+		for (Map.Entry<String, String> entry : streamArns.entrySet()) {
+			String stream = entry.getKey();
+			String streamArn = entry.getValue();
+			RegisterStreamConsumerRequest registerStreamConsumerRequest = RegisterStreamConsumerRequest
+				.builder()
+				.consumerName(consumerName)
+				.streamARN(streamArn)
+				.build();
+			FanOutStreamInfo fanOutStreamInfo = null;
+			int retryCount = 0;
+			while (retryCount <= fanOutRecordPublisherConfiguration.getRegisterStreamMaxRetries() && fanOutStreamInfo == null) {
+				try {
+					RegisterStreamConsumerResponse registerStreamConsumerResponse = kinesisAsyncClient.registerStreamConsumer(registerStreamConsumerRequest).get();
+					fanOutStreamInfo = new FanOutStreamInfo(stream, streamArn, consumerName, registerStreamConsumerResponse.consumer().consumerARN());
+				} catch (ExecutionException ex) {
+					if (AwsV2Util.isResourceInUse(ex)) {
+						fanOutStreamInfo = describeStreamConsumer(stream, streamArn, consumerName);
+					} else if (AwsV2Util.isRecoverableException(ex)) {
+						long backoffMillis = fullJitterBackoff(
+							fanOutRecordPublisherConfiguration.getRegisterStreamBaseBackoffMillis(), fanOutRecordPublisherConfiguration.getRegisterStreamMaxBackoffMillis(), fanOutRecordPublisherConfiguration.getRegisterStreamExpConstant(), retryCount++);
+						LOG.warn("Got recoverable AmazonServiceException when trying to register " + stream + ". Backing off for "
+							+ backoffMillis + " millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+						Thread.sleep(backoffMillis);
+					} else {
+						throw ex;
+					}
+				}
+			}
+
+			if (fanOutStreamInfo == null) {
+				throw new RuntimeException("Retries exceeded for registerStream operation - all " + fanOutRecordPublisherConfiguration.getRegisterStreamMaxRetries() +
+					" retry attempts failed.");
+			}
+			result.add(fanOutStreamInfo);
+		}
+		return result;
+	}
+
+	public FanOutStreamInfo describeStreamConsumer(String stream, String streamArn, String consumerName) throws InterruptedException, ExecutionException  {
+		DescribeStreamConsumerRequest describeStreamConsumerRequest = DescribeStreamConsumerRequest
+			.builder()
+			.streamARN(streamArn)
+			.consumerName(consumerName)
+			.build();
+		FanOutStreamInfo fanOutStreamInfo = null;
+		int retryCount = 0;
+		while (retryCount <= fanOutRecordPublisherConfiguration.getDescribeStreamConsumerMaxRetries() && fanOutStreamInfo == null) {
+			try {
+				DescribeStreamConsumerResponse describeStreamConsumerResponse = kinesisAsyncClient.describeStreamConsumer(describeStreamConsumerRequest).get();
+				fanOutStreamInfo = new FanOutStreamInfo(stream, streamArn, consumerName, describeStreamConsumerResponse.consumerDescription().consumerARN());
+			} catch (ExecutionException ex) {
+				if (AwsV2Util.isRecoverableException(ex)) {
+					long backoffMillis = fullJitterBackoff(
+						fanOutRecordPublisherConfiguration.getDescribeStreamConsumerBaseBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamConsumerMaxBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamConsumerExpConstant(), retryCount++);
+					LOG.warn("Got recoverable AmazonServiceException when trying to describe stream consumer " + stream + ". Backing off for "
+						+ backoffMillis + " millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+					Thread.sleep(backoffMillis);
+				} else {
+					throw ex;
+				}
+			}
+		}
+
+		if (fanOutStreamInfo == null) {
+			throw new RuntimeException("Retries exceeded for registerStream operation - all " + fanOutRecordPublisherConfiguration.getRegisterStreamMaxRetries() +
+				" retry attempts failed.");
+		}
+		return fanOutStreamInfo;
 	}
 
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void deregisterStreamConsumer(List<FanOutStreamInfo> fanOutStreamInfos) throws InterruptedException, ExecutionException {

Review comment:
       I changed the behavior and interface.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #13005: Feature/flink 18661 efo de registration

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #13005:
URL: https://github.com/apache/flink/pull/13005#discussion_r463386867



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutStreamInfo.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import java.util.Objects;
+
+/**
+ * This is a data class which describe all related information when de-/registering streams.
+ */
+public class FanOutStreamInfo {
+	/** Kinesis stream name. */
+	private final String stream;
+
+	/** Kinesis stream arn. */
+	private final String streamArn;
+
+	/** Registered consumer name for the related stream. */
+	private final String consumerName;
+
+	/** Registered consumer arn for the related stream. */
+	private final String consumerArn;
+
+	/**
+	 * Return the Kinesis stream name.
+	 */
+	public String getStream() {
+		return stream;
+	}
+
+	/**
+	 * Return the Kinesis stream arn.
+	 */
+	public String getStreamArn() {
+		return streamArn;
+	}
+
+	/**
+	 * Return the Kinesis consumer name for an enhanced fan-out consumer.
+	 */
+	public String getConsumerName() {
+		return consumerName;
+	}
+
+	/**
+	 * Return the Kinesis consumer arn for an enhanced fan-out consumer.
+	 */
+	public String getConsumerArn() {
+		return consumerArn;
+	}
+
+	/**
+	 * Public constructor for fan out stream info.
+	 */
+	public FanOutStreamInfo(String stream, String streamArn, String consumerName, String consumerArn) {
+		this.stream = stream;
+		this.streamArn = streamArn;
+		this.consumerName = consumerName;
+		this.consumerArn = consumerArn;
+	}
+
+	@Override
+	public boolean equals(Object o) {

Review comment:
       I was using equal for testing purpose. Looks like all I need is simply a tool function to compare two instances.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13005: Feature/flink 18661 efo de registration

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13005:
URL: https://github.com/apache/flink/pull/13005#issuecomment-664999614






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #13005: Feature/flink 18661 efo de registration

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #13005:
URL: https://github.com/apache/flink/pull/13005#discussion_r463453684



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+	private static final Logger LOG = LoggerFactory.getLogger(KinesisProxyV2.class);
+
+	/** Random seed used to calculate backoff jitter for Kinesis operations. */
+	private static final Random seed = new Random();
 
 	private final KinesisAsyncClient kinesisAsyncClient;
 
+	private final FanOutRecordPublisherConfiguration fanOutRecordPublisherConfiguration;
+
 	/**
 	 * Create a new KinesisProxyV2 based on the supplied configuration properties.
 	 *
-	 * @param kinesisAsyncClient the kinesis async client used to communicate with Kinesis
+	 * @param configProps configuration properties containing AWS credential and AWS region info
+	 */
+	public KinesisProxyV2(final Properties configProps, List<String> streams) {
+		this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+		this.fanOutRecordPublisherConfiguration = new FanOutRecordPublisherConfiguration(configProps, streams);
+	}
+
+	/**
+	 * Creates a Kinesis proxy V2.
+	 *
+	 * @param configProps configuration properties
+	 * @param streams list of kinesis stream names
+	 * @return the created kinesis proxy v2
+	 */
+	public static KinesisProxyV2Interface create(Properties configProps, List<String> streams) {
+		return new KinesisProxyV2(configProps, streams);
+	}
+
+	/**
+	 * Create the Kinesis client, using the provided configuration properties.
+	 * Derived classes can override this method to customize the client configuration.
+	 *
+	 * @param configProps the properties map used to create the Kinesis Client
+	 * @return a Kinesis Client
+	 */
+	protected KinesisAsyncClient createKinesisAsyncClient(final Properties configProps) {
+		final ClientConfiguration config = new ClientConfigurationFactory().getConfig();
+		return AwsV2Util.createKinesisAsyncClient(configProps, config);
+	}
+
+	/**
+	 * {@inheritDoc}
 	 */
-	public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
-		this.kinesisAsyncClient = kinesisAsyncClient;
+	@Override
+	public Map<String, String> describeStream(List<String> streams) throws InterruptedException, ExecutionException {

Review comment:
       I just split it and made the loop external.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #13005: Feature/flink 18661 efo de registration

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #13005:
URL: https://github.com/apache/flink/pull/13005#discussion_r463385155



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+	private static final Logger LOG = LoggerFactory.getLogger(KinesisProxyV2.class);
+
+	/** Random seed used to calculate backoff jitter for Kinesis operations. */
+	private static final Random seed = new Random();
 
 	private final KinesisAsyncClient kinesisAsyncClient;
 
+	private final FanOutRecordPublisherConfiguration fanOutRecordPublisherConfiguration;
+
 	/**
 	 * Create a new KinesisProxyV2 based on the supplied configuration properties.
 	 *
-	 * @param kinesisAsyncClient the kinesis async client used to communicate with Kinesis
+	 * @param configProps configuration properties containing AWS credential and AWS region info
+	 */
+	public KinesisProxyV2(final Properties configProps, List<String> streams) {
+		this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+		this.fanOutRecordPublisherConfiguration = new FanOutRecordPublisherConfiguration(configProps, streams);
+	}
+
+	/**
+	 * Creates a Kinesis proxy V2.
+	 *
+	 * @param configProps configuration properties
+	 * @param streams list of kinesis stream names
+	 * @return the created kinesis proxy v2
+	 */
+	public static KinesisProxyV2Interface create(Properties configProps, List<String> streams) {
+		return new KinesisProxyV2(configProps, streams);
+	}
+
+	/**
+	 * Create the Kinesis client, using the provided configuration properties.
+	 * Derived classes can override this method to customize the client configuration.
+	 *
+	 * @param configProps the properties map used to create the Kinesis Client
+	 * @return a Kinesis Client
+	 */
+	protected KinesisAsyncClient createKinesisAsyncClient(final Properties configProps) {
+		final ClientConfiguration config = new ClientConfigurationFactory().getConfig();
+		return AwsV2Util.createKinesisAsyncClient(configProps, config);
+	}
+
+	/**
+	 * {@inheritDoc}
 	 */
-	public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
-		this.kinesisAsyncClient = kinesisAsyncClient;
+	@Override
+	public Map<String, String> describeStream(List<String> streams) throws InterruptedException, ExecutionException {
+		Map<String, String> result = new HashMap<>();
+		for (String stream : streams) {
+			DescribeStreamRequest describeStreamRequest = DescribeStreamRequest
+				.builder()
+				.streamName(stream)
+				.build();
+			DescribeStreamResponse describeStreamResponse = null;
+
+			int retryCount = 0;
+			while (retryCount <= fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() && describeStreamResponse == null) {
+				try {
+					describeStreamResponse = kinesisAsyncClient.describeStream(describeStreamRequest).get();
+				} catch (ExecutionException ex) {
+					if (AwsV2Util.isRecoverableException(ex)) {
+						long backoffMillis = fullJitterBackoff(
+							fanOutRecordPublisherConfiguration.getDescribeStreamBaseBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamMaxBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamExpConstant(), retryCount++);
+						LOG.warn("Got recoverable AmazonServiceException when trying to describe stream " + stream + ". Backing off for "
+							+ backoffMillis + " millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+						Thread.sleep(backoffMillis);
+					} else {
+						throw ex;
+					}
+				}
+			}
+			if (describeStreamResponse == null) {
+				throw new RuntimeException("Retries exceeded for describeStream operation - all " + fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() +
+					" retry attempts failed.");
+			}
+			result.put(stream, describeStreamResponse.streamDescription().streamARN());
+		}
+		return result;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public List<FanOutStreamInfo> registerStreamConsumer(Map<String, String> streamArns) throws InterruptedException, ExecutionException {

Review comment:
       I changed the Interface accordingly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13005: Feature/flink 18661 efo de registration

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13005:
URL: https://github.com/apache/flink/pull/13005#issuecomment-665003864


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "bf29035a5c92930535a80692d61d0819605c4206",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4965",
       "triggerID" : "bf29035a5c92930535a80692d61d0819605c4206",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c55d9c0b24133265d11f05ef84d0d93b46c67bec",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5067",
       "triggerID" : "c55d9c0b24133265d11f05ef84d0d93b46c67bec",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9f1b031f968c039824b8f895f44877de921a3c92",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9f1b031f968c039824b8f895f44877de921a3c92",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c55d9c0b24133265d11f05ef84d0d93b46c67bec Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5067) 
   * 9f1b031f968c039824b8f895f44877de921a3c92 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dannycranmer commented on pull request #13005: Feature/flink 18661 efo de registration

Posted by GitBox <gi...@apache.org>.
dannycranmer commented on pull request #13005:
URL: https://github.com/apache/flink/pull/13005#issuecomment-669201145


   @xiaolong-sn can you please rename this PR inline with the contribution guide?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #13005: Feature/flink 18661 efo de registration

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #13005:
URL: https://github.com/apache/flink/pull/13005#discussion_r463453275



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
##########
@@ -362,6 +440,10 @@ protected KinesisDataFetcher(List<String> streams,
 		this.watermarkTracker = watermarkTracker;
 		this.kinesisProxyFactory = checkNotNull(kinesisProxyFactory);
 		this.kinesis = kinesisProxyFactory.create(configProps);
+		this.kinesisProxyV2Factory = checkNotNull(kinesisProxyV2Factory);
+		if (shouldRegisterConsumerEagerly()) {

Review comment:
       Made the eager registration on FlinkKinesisConsumer




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #13005: Feature/flink 18661 efo de registration

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #13005:
URL: https://github.com/apache/flink/pull/13005#discussion_r463384942



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+	private static final Logger LOG = LoggerFactory.getLogger(KinesisProxyV2.class);
+
+	/** Random seed used to calculate backoff jitter for Kinesis operations. */
+	private static final Random seed = new Random();
 
 	private final KinesisAsyncClient kinesisAsyncClient;
 
+	private final FanOutRecordPublisherConfiguration fanOutRecordPublisherConfiguration;
+
 	/**
 	 * Create a new KinesisProxyV2 based on the supplied configuration properties.
 	 *
-	 * @param kinesisAsyncClient the kinesis async client used to communicate with Kinesis
+	 * @param configProps configuration properties containing AWS credential and AWS region info
+	 */
+	public KinesisProxyV2(final Properties configProps, List<String> streams) {
+		this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+		this.fanOutRecordPublisherConfiguration = new FanOutRecordPublisherConfiguration(configProps, streams);
+	}
+
+	/**
+	 * Creates a Kinesis proxy V2.
+	 *
+	 * @param configProps configuration properties
+	 * @param streams list of kinesis stream names
+	 * @return the created kinesis proxy v2
+	 */
+	public static KinesisProxyV2Interface create(Properties configProps, List<String> streams) {
+		return new KinesisProxyV2(configProps, streams);
+	}
+
+	/**
+	 * Create the Kinesis client, using the provided configuration properties.
+	 * Derived classes can override this method to customize the client configuration.
+	 *
+	 * @param configProps the properties map used to create the Kinesis Client
+	 * @return a Kinesis Client
+	 */
+	protected KinesisAsyncClient createKinesisAsyncClient(final Properties configProps) {
+		final ClientConfiguration config = new ClientConfigurationFactory().getConfig();
+		return AwsV2Util.createKinesisAsyncClient(configProps, config);
+	}
+
+	/**
+	 * {@inheritDoc}
 	 */
-	public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
-		this.kinesisAsyncClient = kinesisAsyncClient;
+	@Override
+	public Map<String, String> describeStream(List<String> streams) throws InterruptedException, ExecutionException {
+		Map<String, String> result = new HashMap<>();
+		for (String stream : streams) {
+			DescribeStreamRequest describeStreamRequest = DescribeStreamRequest
+				.builder()
+				.streamName(stream)
+				.build();
+			DescribeStreamResponse describeStreamResponse = null;
+
+			int retryCount = 0;
+			while (retryCount <= fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() && describeStreamResponse == null) {
+				try {
+					describeStreamResponse = kinesisAsyncClient.describeStream(describeStreamRequest).get();
+				} catch (ExecutionException ex) {
+					if (AwsV2Util.isRecoverableException(ex)) {
+						long backoffMillis = fullJitterBackoff(
+							fanOutRecordPublisherConfiguration.getDescribeStreamBaseBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamMaxBackoffMillis(), fanOutRecordPublisherConfiguration.getDescribeStreamExpConstant(), retryCount++);
+						LOG.warn("Got recoverable AmazonServiceException when trying to describe stream " + stream + ". Backing off for "
+							+ backoffMillis + " millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+						Thread.sleep(backoffMillis);
+					} else {
+						throw ex;
+					}
+				}
+			}
+			if (describeStreamResponse == null) {
+				throw new RuntimeException("Retries exceeded for describeStream operation - all " + fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() +
+					" retry attempts failed.");
+			}
+			result.put(stream, describeStreamResponse.streamDescription().streamARN());
+		}
+		return result;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public List<FanOutStreamInfo> registerStreamConsumer(Map<String, String> streamArns) throws InterruptedException, ExecutionException {
+		Preconditions.checkArgument(fanOutRecordPublisherConfiguration.getConsumerName().isPresent());
+		String consumerName = fanOutRecordPublisherConfiguration.getConsumerName().get();
+		List<FanOutStreamInfo> result = new ArrayList<>();
+		for (Map.Entry<String, String> entry : streamArns.entrySet()) {
+			String stream = entry.getKey();
+			String streamArn = entry.getValue();
+			RegisterStreamConsumerRequest registerStreamConsumerRequest = RegisterStreamConsumerRequest

Review comment:
       I changed the interface of `registerStreamConsumer`, it's my fault that I did not fully understand the FLIP before.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org