You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/07/17 09:13:48 UTC

[GitHub] [flink] xiaolong-sn opened a new pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

xiaolong-sn opened a new pull request #12920:
URL: https://github.com/apache/flink/pull/12920


   
   ## What is the purpose of the change
   
   *This is the fifth milestone of [FLIP-128](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers) to add EFO support to the FlinkKinesisConsumer. The EFO related configurations has been introduced.
   This change should not introduce any functional differences.*
   
   
   ## Brief change log
   
     - `ConsumerConfigConstants`
       - *Add several efo related configuration constants* 
     - `KinesisConfigUtil`
         - *Add the validation of efo related configuration constants.*
     - `KinesisProxy`
         - *Add fields about operations like `registerStream`, `deregisterStream`, `listStream`*
         - *Give initial value to those fields.*
      - `FanOutProperties`
          - *Add a new configuration class for the future `FanOutRecordPublisher`*
          - *Validate fields and give them initial values*
      - `FlinkKinesisConsumer`
          - *Since the change of the validation interface, modify the call of validation method.*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - *`KinesisConfigUtilTest`*
     - *`FanOutPropertiesTest`*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12920:
URL: https://github.com/apache/flink/pull/12920#issuecomment-659996635


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5f259aa1cf05a64249f9ac6edfb6971bb9b33841",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4583",
       "triggerID" : "5f259aa1cf05a64249f9ac6edfb6971bb9b33841",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cb726b5ff64a0d57fa13a5033a354828447c2b99",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "cb726b5ff64a0d57fa13a5033a354828447c2b99",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5f259aa1cf05a64249f9ac6edfb6971bb9b33841 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4583) 
   * cb726b5ff64a0d57fa13a5033a354828447c2b99 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #12920:
URL: https://github.com/apache/flink/pull/12920#discussion_r457109772



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
##########
@@ -159,6 +174,54 @@ public static void validateConsumerConfiguration(Properties config) {
 		validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT,
 			"Invalid value given for list shards operation backoff exponential constant. Must be a valid non-negative double value.");
 
+		validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.REGISTER_STREAM_RETRIES,
+			"Invalid value given for maximum retry attempts for register stream operation. Must be a valid non-negative integer value.");
+
+		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_BASE,
+			"Invalid value given for register stream operation base backoff milliseconds. Must be a valid non-negative long value.");
+
+		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_MAX,
+			"Invalid value given for register stream operation max backoff milliseconds. Must be a valid non-negative long value.");
+
+		validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT,
+			"Invalid value given for register stream operation backoff exponential constant. Must be a valid non-negative double value.");
+
+		validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.DEREGISTER_STREAM_RETRIES,
+			"Invalid value given for maximum retry attempts for deregister stream operation. Must be a valid non-negative integer value.");
+
+		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_BASE,
+			"Invalid value given for deregister stream operation base backoff milliseconds. Must be a valid non-negative long value.");
+
+		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_MAX,
+			"Invalid value given for deregister stream operation max backoff milliseconds. Must be a valid non-negative long value.");
+
+		validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT,
+			"Invalid value given for deregister stream operation backoff exponential constant. Must be a valid non-negative double value.");
+
+		validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.LIST_STREAM_RETRIES,

Review comment:
       All is renamed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dannycranmer commented on a change in pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
dannycranmer commented on a change in pull request #12920:
URL: https://github.com/apache/flink/pull/12920#discussion_r460710302



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,377 @@
+/*

Review comment:
       @xiaolong-sn can you please move this class so it is consistent with the Polling configuration to:
   - `org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutPublisherConfiguration`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #12920:
URL: https://github.com/apache/flink/pull/12920#discussion_r457127316



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {
+	private static final long serialVersionUID = 3204635913413261619L;
+
+	private EFORegistrationType efoRegistrationType;
+	@Nullable
+	private String consumerName;
+	@Nullable
+	private List<String> streamConsumerArns;
+
+	private int subscribeToShardMaxRetries;
+
+	private long subscribeToShardMaxBackoffMillis;
+
+	private long subscribeToShardBaseBackoffMillis;
+
+	private double subscribeToShardExpConstant;
+
+	public FanOutProperties(Properties properties, List<String> streams) {
+		//validate the properties

Review comment:
       Yeah, I see your point. The comments are redundant.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn edited a comment on pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
xiaolong-sn edited a comment on pull request #12920:
URL: https://github.com/apache/flink/pull/12920#issuecomment-665518978


   I lost my fork on the code so I reopen a new PR.
   https://github.com/apache/flink/pull/13015


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12920:
URL: https://github.com/apache/flink/pull/12920#issuecomment-659996635


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5f259aa1cf05a64249f9ac6edfb6971bb9b33841",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4583",
       "triggerID" : "5f259aa1cf05a64249f9ac6edfb6971bb9b33841",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cb726b5ff64a0d57fa13a5033a354828447c2b99",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4658",
       "triggerID" : "cb726b5ff64a0d57fa13a5033a354828447c2b99",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5aef6d842709a1ec308fb8cfe89e0928ebf19f7e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4660",
       "triggerID" : "5aef6d842709a1ec308fb8cfe89e0928ebf19f7e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5f259aa1cf05a64249f9ac6edfb6971bb9b33841 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4583) 
   * cb726b5ff64a0d57fa13a5033a354828447c2b99 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4658) 
   * 5aef6d842709a1ec308fb8cfe89e0928ebf19f7e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4660) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn closed pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
xiaolong-sn closed pull request #12920:
URL: https://github.com/apache/flink/pull/12920


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12920:
URL: https://github.com/apache/flink/pull/12920#issuecomment-659996635


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5f259aa1cf05a64249f9ac6edfb6971bb9b33841",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4583",
       "triggerID" : "5f259aa1cf05a64249f9ac6edfb6971bb9b33841",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cb726b5ff64a0d57fa13a5033a354828447c2b99",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4658",
       "triggerID" : "cb726b5ff64a0d57fa13a5033a354828447c2b99",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5f259aa1cf05a64249f9ac6edfb6971bb9b33841 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4583) 
   * cb726b5ff64a0d57fa13a5033a354828447c2b99 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4658) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12920:
URL: https://github.com/apache/flink/pull/12920#issuecomment-659996635


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5f259aa1cf05a64249f9ac6edfb6971bb9b33841",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4583",
       "triggerID" : "5f259aa1cf05a64249f9ac6edfb6971bb9b33841",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cb726b5ff64a0d57fa13a5033a354828447c2b99",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4658",
       "triggerID" : "cb726b5ff64a0d57fa13a5033a354828447c2b99",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5aef6d842709a1ec308fb8cfe89e0928ebf19f7e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4660",
       "triggerID" : "5aef6d842709a1ec308fb8cfe89e0928ebf19f7e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "68304c8bc1e5ccae037269302bd4c15ea41dc7a8",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4679",
       "triggerID" : "68304c8bc1e5ccae037269302bd4c15ea41dc7a8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 68304c8bc1e5ccae037269302bd4c15ea41dc7a8 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4679) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #12920:
URL: https://github.com/apache/flink/pull/12920#discussion_r457138712



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {
+	private static final long serialVersionUID = 3204635913413261619L;
+
+	private EFORegistrationType efoRegistrationType;
+	@Nullable
+	private String consumerName;
+	@Nullable
+	private List<String> streamConsumerArns;
+
+	private int subscribeToShardMaxRetries;
+
+	private long subscribeToShardMaxBackoffMillis;
+
+	private long subscribeToShardBaseBackoffMillis;
+
+	private double subscribeToShardExpConstant;
+
+	public FanOutProperties(Properties properties, List<String> streams) {
+		//validate the properties
+		Preconditions.checkArgument(properties.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE).equals(RecordPublisherType.EFO.toString()), "Only efo record publisher can register a FanOutProperties.");
+		KinesisConfigUtil.validateEFOConfiguration(properties, streams);
+
+		efoRegistrationType = EFORegistrationType.valueOf(properties.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, EFORegistrationType.EAGER.toString()));
+		//if efo registration type is EAGER|LAZY, then user should explicitly provide a consumer name for each stream.
+		if (efoRegistrationType == EFORegistrationType.EAGER || efoRegistrationType == EFORegistrationType.LAZY) {
+			consumerName = properties.getProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME);
+		} else {
+			//else users should explicitly provide consumer arns.
+			streamConsumerArns = new ArrayList<>();
+			for (String stream:streams) {
+				String key = ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX + "." + stream;
+				streamConsumerArns.add(properties.getProperty(key));

Review comment:
       I see your point. I've changed the data structure here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12920:
URL: https://github.com/apache/flink/pull/12920#issuecomment-659986056


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 5f259aa1cf05a64249f9ac6edfb6971bb9b33841 (Fri Jul 17 09:16:45 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #12920:
URL: https://github.com/apache/flink/pull/12920#discussion_r457101518



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {

Review comment:
       I see your point. I'll remove the `Serializable` interface.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12920:
URL: https://github.com/apache/flink/pull/12920#issuecomment-659996635


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5f259aa1cf05a64249f9ac6edfb6971bb9b33841",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4583",
       "triggerID" : "5f259aa1cf05a64249f9ac6edfb6971bb9b33841",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cb726b5ff64a0d57fa13a5033a354828447c2b99",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4658",
       "triggerID" : "cb726b5ff64a0d57fa13a5033a354828447c2b99",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5aef6d842709a1ec308fb8cfe89e0928ebf19f7e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4660",
       "triggerID" : "5aef6d842709a1ec308fb8cfe89e0928ebf19f7e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "68304c8bc1e5ccae037269302bd4c15ea41dc7a8",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4679",
       "triggerID" : "68304c8bc1e5ccae037269302bd4c15ea41dc7a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "68304c8bc1e5ccae037269302bd4c15ea41dc7a8",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4815",
       "triggerID" : "661623165",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "68304c8bc1e5ccae037269302bd4c15ea41dc7a8",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4816",
       "triggerID" : "661807806",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 68304c8bc1e5ccae037269302bd4c15ea41dc7a8 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4679) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4815) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4816) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #12920:
URL: https://github.com/apache/flink/pull/12920#discussion_r457122192



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
##########
@@ -169,6 +232,55 @@ public static void validateConsumerConfiguration(Properties config) {
 		}
 	}
 
+	public static void validateEFOConfiguration(Properties config, List<String> streams) {
+		if (config.containsKey(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE)) {
+			String recordPublisherType = config.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE);
+
+			// specified record publisher type in stream must be either EFO or POLLING
+			try {
+				RecordPublisherType.valueOf(recordPublisherType);
+			} catch (IllegalArgumentException e) {
+				StringBuilder sb = new StringBuilder();
+				for (RecordPublisherType rp : RecordPublisherType.values()) {
+					sb.append(rp.toString()).append(", ");
+				}
+				throw new IllegalArgumentException("Invalid record publisher type in stream set in config. Valid values are: " + sb.toString());

Review comment:
       I agree. I've moved this part to a new method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] krin-wang commented on a change in pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
krin-wang commented on a change in pull request #12920:
URL: https://github.com/apache/flink/pull/12920#discussion_r458537917



##########
File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
##########
@@ -255,7 +351,7 @@ public void testStreamInitPositionTypeSetToAtTimestampButNoInitTimestampSetInCon
 	}
 
 	@Test
-	public void testUnparsableDateForInitialTimestampInConfig() {
+	public void testUnparsableDatEforInitialTimestampInConfig() {

Review comment:
       Typo?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #12920:
URL: https://github.com/apache/flink/pull/12920#discussion_r457137798



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
##########
@@ -169,6 +232,55 @@ public static void validateConsumerConfiguration(Properties config) {
 		}
 	}
 
+	public static void validateEFOConfiguration(Properties config, List<String> streams) {
+		if (config.containsKey(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE)) {

Review comment:
       I've separate the method to a new one, so no `if` here. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #12920:
URL: https://github.com/apache/flink/pull/12920#discussion_r457126453



##########
File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
##########
@@ -277,7 +377,7 @@ public void testIllegalValueForInitialTimestampInConfig() {
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "-1.0");
 
-		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig, new ArrayList<>());

Review comment:
       I've overloaded this method. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12920:
URL: https://github.com/apache/flink/pull/12920#issuecomment-659996635


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5f259aa1cf05a64249f9ac6edfb6971bb9b33841",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4583",
       "triggerID" : "5f259aa1cf05a64249f9ac6edfb6971bb9b33841",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cb726b5ff64a0d57fa13a5033a354828447c2b99",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4658",
       "triggerID" : "cb726b5ff64a0d57fa13a5033a354828447c2b99",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5aef6d842709a1ec308fb8cfe89e0928ebf19f7e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4660",
       "triggerID" : "5aef6d842709a1ec308fb8cfe89e0928ebf19f7e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * cb726b5ff64a0d57fa13a5033a354828447c2b99 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4658) 
   * 5aef6d842709a1ec308fb8cfe89e0928ebf19f7e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4660) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dannycranmer edited a comment on pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
dannycranmer edited a comment on pull request #12920:
URL: https://github.com/apache/flink/pull/12920#issuecomment-660107444


   @xiaolong-sn can you please either squash and repush or update your commit messages as per ["The commit message should point to the corresponding Jira issue by starting with [FLINK-XXXX]."](https://flink.apache.org/contributing/contribute-documentation.html)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #12920:
URL: https://github.com/apache/flink/pull/12920#discussion_r457246071



##########
File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutPropertiesTest.java
##########
@@ -0,0 +1,120 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link FanOutProperties}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(FanOutProperties.class)
+public class FanOutPropertiesTest extends TestLogger {
+	@Rule
+	private ExpectedException exception = ExpectedException.none();
+
+	@Test
+	public void testPollingRecordPublisher() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Only efo record publisher can register a FanOutProperties.");
+
+		Properties testConfig = TestUtils.getStandardProperties();
+		testConfig.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, RecordPublisherType.POLLING.toString());
+
+		new FanOutProperties(testConfig, new ArrayList<>());
+	}
+
+	@Test
+	public void testEagerStrategyWithConsumerName() {
+		String fakedConsumerName = "fakedconsumername";
+		Properties testConfig = TestUtils.getStandardProperties();
+		testConfig.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, RecordPublisherType.EFO.toString());
+		testConfig.setProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME, fakedConsumerName);
+		FanOutProperties fanOutProperties = new FanOutProperties(testConfig, new ArrayList<>());
+		assertEquals(fanOutProperties.getConsumerName(), fakedConsumerName);
+	}
+
+	@Test
+	public void testEagerStrategyWithNoConsumerName() {
+		String msg = "No valid enhanced fan-out consumer name is set through " + ConsumerConfigConstants.EFO_CONSUMER_NAME;
+
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage(msg);
+
+		Properties testConfig = TestUtils.getStandardProperties();
+		testConfig.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, RecordPublisherType.EFO.toString());
+		new FanOutProperties(testConfig, new ArrayList<>());
+	}
+
+	@Test
+	public void testNoneStrategyWithStreams() {
+		String fakedStream1 = "fakedstream1";

Review comment:
       I changed the codes accordingly. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #12920:
URL: https://github.com/apache/flink/pull/12920#discussion_r457122566



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
##########
@@ -169,6 +232,55 @@ public static void validateConsumerConfiguration(Properties config) {
 		}
 	}
 
+	public static void validateEFOConfiguration(Properties config, List<String> streams) {
+		if (config.containsKey(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE)) {
+			String recordPublisherType = config.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE);
+
+			// specified record publisher type in stream must be either EFO or POLLING
+			try {
+				RecordPublisherType.valueOf(recordPublisherType);
+			} catch (IllegalArgumentException e) {
+				StringBuilder sb = new StringBuilder();
+				for (RecordPublisherType rp : RecordPublisherType.values()) {
+					sb.append(rp.toString()).append(", ");
+				}
+				throw new IllegalArgumentException("Invalid record publisher type in stream set in config. Valid values are: " + sb.toString());
+			}
+			if (RecordPublisherType.valueOf(recordPublisherType) == RecordPublisherType.EFO) {
+				String efoRegistrationType;
+				if (config.containsKey(ConsumerConfigConstants.EFO_REGISTRATION_TYPE)) {
+					efoRegistrationType = config.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE);
+					// specified efo registration type in stream must be either LAZY, EAGER or NONE.
+					try {
+						EFORegistrationType.valueOf(efoRegistrationType);
+					} catch (IllegalArgumentException e) {
+						StringBuilder sb = new StringBuilder();
+						for (EFORegistrationType ert : EFORegistrationType.values()) {
+							sb.append(ert.toString()).append(", ");
+						}
+						throw new IllegalArgumentException("Invalid efo registration type in stream set in config. Valid values are: " + sb.toString());
+					}
+				} else {
+					efoRegistrationType = EFORegistrationType.LAZY.toString();
+				}
+				if (EFORegistrationType.valueOf(efoRegistrationType) == EFORegistrationType.NONE) {
+					//if the registration type is NONE, then for each stream there must be an according consumer ARN
+					for (String stream : streams) {
+						String efoConsumerARNKey = ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX + "." + stream;
+						if (!config.containsKey(efoConsumerARNKey)) {
+							throw new IllegalArgumentException("Invalid efo consumer arn settings for not providing " + efoConsumerARNKey);

Review comment:
       I've collected all the missing keys and given a single error message.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dannycranmer commented on pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
dannycranmer commented on pull request #12920:
URL: https://github.com/apache/flink/pull/12920#issuecomment-660107444


   @xiaolong-sn can you please either squash and repush or update your commit messages as per ["Commit your changes to your local git repository. The commit message should point to the corresponding Jira issue by starting with [FLINK-XXXX]."](https://flink.apache.org/contributing/contribute-documentation.html)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12920:
URL: https://github.com/apache/flink/pull/12920#issuecomment-659996635


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5f259aa1cf05a64249f9ac6edfb6971bb9b33841",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4583",
       "triggerID" : "5f259aa1cf05a64249f9ac6edfb6971bb9b33841",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cb726b5ff64a0d57fa13a5033a354828447c2b99",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4658",
       "triggerID" : "cb726b5ff64a0d57fa13a5033a354828447c2b99",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5aef6d842709a1ec308fb8cfe89e0928ebf19f7e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4660",
       "triggerID" : "5aef6d842709a1ec308fb8cfe89e0928ebf19f7e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "68304c8bc1e5ccae037269302bd4c15ea41dc7a8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "68304c8bc1e5ccae037269302bd4c15ea41dc7a8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5aef6d842709a1ec308fb8cfe89e0928ebf19f7e Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4660) 
   * 68304c8bc1e5ccae037269302bd4c15ea41dc7a8 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on pull request #12920:
URL: https://github.com/apache/flink/pull/12920#issuecomment-661623378


   > @xiaolong-sn can you please either squash and repush or update your commit messages as per ["The commit message should point to the corresponding Jira issue by starting with [FLINK-XXXX]."](https://flink.apache.org/contributing/contribute-documentation.html)
   
   Thanks, I've squash all changes into one commit.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12920:
URL: https://github.com/apache/flink/pull/12920#issuecomment-659996635


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5f259aa1cf05a64249f9ac6edfb6971bb9b33841",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4583",
       "triggerID" : "5f259aa1cf05a64249f9ac6edfb6971bb9b33841",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5f259aa1cf05a64249f9ac6edfb6971bb9b33841 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4583) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12920:
URL: https://github.com/apache/flink/pull/12920#issuecomment-659996635


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5f259aa1cf05a64249f9ac6edfb6971bb9b33841",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4583",
       "triggerID" : "5f259aa1cf05a64249f9ac6edfb6971bb9b33841",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cb726b5ff64a0d57fa13a5033a354828447c2b99",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4658",
       "triggerID" : "cb726b5ff64a0d57fa13a5033a354828447c2b99",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5aef6d842709a1ec308fb8cfe89e0928ebf19f7e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5aef6d842709a1ec308fb8cfe89e0928ebf19f7e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5f259aa1cf05a64249f9ac6edfb6971bb9b33841 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4583) 
   * cb726b5ff64a0d57fa13a5033a354828447c2b99 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4658) 
   * 5aef6d842709a1ec308fb8cfe89e0928ebf19f7e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #12920:
URL: https://github.com/apache/flink/pull/12920#discussion_r457244884



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
##########
@@ -137,6 +137,48 @@
 	/* Exponential backoff power constant for the describe stream operation. */
 	private final double describeStreamExpConstant;
 
+	// ------------------------------------------------------------------------
+	//  registerStream() related performance settings
+	// ------------------------------------------------------------------------
+	/** Base backoff millis for the register stream operation. */
+	private final long registerStreamBaseBackoffMillis;
+
+	/** Maximum backoff millis for the register stream operation. */
+	private final long registerStreamMaxBackoffMillis;
+
+	/** Exponential backoff power constant for the register stream operation. */
+	private final double registerStreamExpConstant;
+
+	/** Maximum retry attempts for the register stream operation. */
+	private final int registerStreamMaxRetries;
+	// ------------------------------------------------------------------------
+	//  deregisterStream() related performance settings
+	// ------------------------------------------------------------------------
+	/** Base backoff millis for the deregister stream operation. */
+	private final long deregisterStreamBaseBackoffMillis;
+
+	/** Maximum backoff millis for the deregister stream operation. */
+	private final long deregisterStreamMaxBackoffMillis;
+
+	/** Exponential backoff power constant for the deregister stream operation. */
+	private final double deregisterStreamExpConstant;
+
+	/** Maximum retry attempts for the deregister stream operation. */
+	private final int deregisterStreamMaxRetries;
+	// ------------------------------------------------------------------------
+	//  listStream() related performance settings

Review comment:
       renamed it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on pull request #12920:
URL: https://github.com/apache/flink/pull/12920#issuecomment-665518978


   I lost my fork on the code so I reopen a new PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #12920:
URL: https://github.com/apache/flink/pull/12920#discussion_r457103401



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {
+	private static final long serialVersionUID = 3204635913413261619L;
+
+	private EFORegistrationType efoRegistrationType;
+	@Nullable
+	private String consumerName;
+	@Nullable
+	private List<String> streamConsumerArns;
+
+	private int subscribeToShardMaxRetries;
+
+	private long subscribeToShardMaxBackoffMillis;
+
+	private long subscribeToShardBaseBackoffMillis;
+
+	private double subscribeToShardExpConstant;
+
+	public FanOutProperties(Properties properties, List<String> streams) {

Review comment:
       Thanks, I forgot.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #12920:
URL: https://github.com/apache/flink/pull/12920#discussion_r457983934



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
##########
@@ -169,6 +245,72 @@ public static void validateConsumerConfiguration(Properties config) {
 		}
 	}
 
+	/**
+	 * Validate the record publisher type.
+	 * @param config config properties
+	 * @return if `ConsumerConfigConstants.RECORD_PUBLISHER_TYPE` is set, return the parsed record publisher type. Else return polling record publisher type.

Review comment:
       I see, I'll use the {@code} here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #12920:
URL: https://github.com/apache/flink/pull/12920#discussion_r457136749



##########
File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
##########
@@ -180,6 +182,104 @@ public void testUnrecognizableCredentialProviderTypeInConfig() {
 		KinesisConfigUtil.validateAwsConfiguration(testConfig);
 	}
 
+	// ----------------------------------------------------------------------
+	// validateEFOConfiguration() tests
+	// ----------------------------------------------------------------------
+
+	@Test
+	public void testNoRecordPublisherTypeInConfig() {
+		Properties testConfig = TestUtils.getStandardProperties();
+
+		KinesisConfigUtil.validateEFOConfiguration(testConfig, new ArrayList<>());

Review comment:
       I've overload the method, so no need to write the `new ArrayList<>()` now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12920:
URL: https://github.com/apache/flink/pull/12920#issuecomment-659996635


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5f259aa1cf05a64249f9ac6edfb6971bb9b33841",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4583",
       "triggerID" : "5f259aa1cf05a64249f9ac6edfb6971bb9b33841",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cb726b5ff64a0d57fa13a5033a354828447c2b99",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4658",
       "triggerID" : "cb726b5ff64a0d57fa13a5033a354828447c2b99",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5aef6d842709a1ec308fb8cfe89e0928ebf19f7e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4660",
       "triggerID" : "5aef6d842709a1ec308fb8cfe89e0928ebf19f7e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5aef6d842709a1ec308fb8cfe89e0928ebf19f7e Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4660) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12920:
URL: https://github.com/apache/flink/pull/12920#issuecomment-659996635


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5f259aa1cf05a64249f9ac6edfb6971bb9b33841",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4583",
       "triggerID" : "5f259aa1cf05a64249f9ac6edfb6971bb9b33841",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cb726b5ff64a0d57fa13a5033a354828447c2b99",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4658",
       "triggerID" : "cb726b5ff64a0d57fa13a5033a354828447c2b99",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5aef6d842709a1ec308fb8cfe89e0928ebf19f7e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4660",
       "triggerID" : "5aef6d842709a1ec308fb8cfe89e0928ebf19f7e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "68304c8bc1e5ccae037269302bd4c15ea41dc7a8",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4679",
       "triggerID" : "68304c8bc1e5ccae037269302bd4c15ea41dc7a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "68304c8bc1e5ccae037269302bd4c15ea41dc7a8",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4815",
       "triggerID" : "661623165",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "68304c8bc1e5ccae037269302bd4c15ea41dc7a8",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4816",
       "triggerID" : "661807806",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "68304c8bc1e5ccae037269302bd4c15ea41dc7a8",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4679",
       "triggerID" : "661623165",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "68304c8bc1e5ccae037269302bd4c15ea41dc7a8",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4679",
       "triggerID" : "661807806",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "68304c8bc1e5ccae037269302bd4c15ea41dc7a8",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4815",
       "triggerID" : "661807806",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 68304c8bc1e5ccae037269302bd4c15ea41dc7a8 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4816) Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4679) Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4815) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #12920:
URL: https://github.com/apache/flink/pull/12920#discussion_r457243125



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {
+	private static final long serialVersionUID = 3204635913413261619L;
+
+	private EFORegistrationType efoRegistrationType;
+	@Nullable
+	private String consumerName;
+	@Nullable
+	private List<String> streamConsumerArns;
+
+	private int subscribeToShardMaxRetries;
+
+	private long subscribeToShardMaxBackoffMillis;
+
+	private long subscribeToShardBaseBackoffMillis;
+
+	private double subscribeToShardExpConstant;
+
+	public FanOutProperties(Properties properties, List<String> streams) {
+		//validate the properties
+		Preconditions.checkArgument(properties.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE).equals(RecordPublisherType.EFO.toString()), "Only efo record publisher can register a FanOutProperties.");
+		KinesisConfigUtil.validateEFOConfiguration(properties, streams);
+
+		efoRegistrationType = EFORegistrationType.valueOf(properties.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, EFORegistrationType.EAGER.toString()));
+		//if efo registration type is EAGER|LAZY, then user should explicitly provide a consumer name for each stream.
+		if (efoRegistrationType == EFORegistrationType.EAGER || efoRegistrationType == EFORegistrationType.LAZY) {
+			consumerName = properties.getProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME);
+		} else {
+			//else users should explicitly provide consumer arns.
+			streamConsumerArns = new ArrayList<>();
+			for (String stream:streams) {
+				String key = ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX + "." + stream;
+				streamConsumerArns.add(properties.getProperty(key));
+			}
+		}
+
+		this.subscribeToShardMaxRetries = Integer.parseInt(
+			properties.getProperty(
+				ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES)));
+		this.subscribeToShardBaseBackoffMillis = Long.parseLong(
+			properties.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_BASE)));
+		this.subscribeToShardMaxBackoffMillis = Long.parseLong(
+			properties.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_MAX)));
+		this.subscribeToShardExpConstant = Double.parseDouble(
+			properties.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT)));
+	}
+
+	public void setEfoRegistrationType(EFORegistrationType efoRegistrationType) {

Review comment:
       I've delete all the setters and make most fields `final`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12920:
URL: https://github.com/apache/flink/pull/12920#issuecomment-659996635


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5f259aa1cf05a64249f9ac6edfb6971bb9b33841",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5f259aa1cf05a64249f9ac6edfb6971bb9b33841",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5f259aa1cf05a64249f9ac6edfb6971bb9b33841 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dannycranmer commented on pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
dannycranmer commented on pull request #12920:
URL: https://github.com/apache/flink/pull/12920#issuecomment-662101593


   @tzulitai ready for your review. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #12920:
URL: https://github.com/apache/flink/pull/12920#discussion_r457978644



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties {
+
+	/**
+	 * The efo registration type for de-/registration of streams.
+	 */
+	private final EFORegistrationType efoRegistrationType;
+
+	/**
+	 * The efo stream consumer name. Should not be Null if the efoRegistrationType is either LAZY or EAGER.
+	 */
+	@Nullable
+	private String consumerName;
+
+	/**
+	 * The manual set efo consumer arns for each stream. Should not be Null if the efoRegistrationType is NONE
+	 */
+	@Nullable
+	private Map<String, String> streamConsumerArns;
+
+	/**
+	 * Base backoff millis for the deregister stream operation.
+	 */
+	private final int subscribeToShardMaxRetries;
+
+	/**
+	 * Maximum backoff millis for the subscribe to shard operation.
+	 */
+	private final long subscribeToShardMaxBackoffMillis;
+
+	/**
+	 * Base backoff millis for the subscribe to shard operation.
+	 */
+	private final long subscribeToShardBaseBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the subscribe to shard operation.
+	 */
+	private final double subscribeToShardExpConstant;
+
+	/**
+	 * Base backoff millis for the register stream operation.
+	 */
+	private final long registerStreamBaseBackoffMillis;
+
+	/**
+	 * Maximum backoff millis for the register stream operation.
+	 */
+	private final long registerStreamMaxBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the register stream operation.
+	 */
+	private final double registerStreamExpConstant;
+
+	/**
+	 * Maximum retry attempts for the register stream operation.
+	 */
+	private final int registerStreamMaxRetries;
+
+	/**
+	 * Base backoff millis for the deregister stream operation.
+	 */
+	private final long deregisterStreamBaseBackoffMillis;
+
+	/**
+	 * Maximum backoff millis for the deregister stream operation.
+	 */
+	private final long deregisterStreamMaxBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the deregister stream operation.
+	 */
+	private final double deregisterStreamExpConstant;
+
+	/**
+	 * Maximum retry attempts for the deregister stream operation.
+	 */
+	private final int deregisterStreamMaxRetries;
+
+	/**
+	 * Base backoff millis for the list stream operation.
+	 */
+	private final long listStreamConsumersBaseBackoffMillis;
+
+	/**
+	 * Maximum backoff millis for the list stream operation.
+	 */
+	private final long listStreamConsumersMaxBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the list stream operation.
+	 */
+	private final double listStreamConsumersExpConstant;
+
+	/**
+	 * Maximum retry attempts for the list stream operation.
+	 */
+	private final int listStreamConsumersMaxRetries;
+
+	// ------------------------------------------------------------------------
+	//  registerStream() related performance settings

Review comment:
       Yes, I removed it. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #12920:
URL: https://github.com/apache/flink/pull/12920#discussion_r457109472



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
##########
@@ -137,6 +137,48 @@
 	/* Exponential backoff power constant for the describe stream operation. */
 	private final double describeStreamExpConstant;
 
+	// ------------------------------------------------------------------------

Review comment:
       I merged them into FanOutProperties




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dannycranmer commented on a change in pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
dannycranmer commented on a change in pull request #12920:
URL: https://github.com/apache/flink/pull/12920#discussion_r456415363



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;

Review comment:
       @xiaolong-sn missing Apache copyright statement

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {
+	private static final long serialVersionUID = 3204635913413261619L;
+
+	private EFORegistrationType efoRegistrationType;
+	@Nullable
+	private String consumerName;
+	@Nullable
+	private List<String> streamConsumerArns;
+
+	private int subscribeToShardMaxRetries;
+
+	private long subscribeToShardMaxBackoffMillis;
+
+	private long subscribeToShardBaseBackoffMillis;
+
+	private double subscribeToShardExpConstant;
+
+	public FanOutProperties(Properties properties, List<String> streams) {

Review comment:
       @xiaolong-sn missing comment ["All public/protected methods and classes must have a Javadoc."](https://flink.apache.org/contributing/code-style-and-quality-formatting.html)

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
##########
@@ -169,6 +232,55 @@ public static void validateConsumerConfiguration(Properties config) {
 		}
 	}
 
+	public static void validateEFOConfiguration(Properties config, List<String> streams) {
+		if (config.containsKey(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE)) {
+			String recordPublisherType = config.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE);
+
+			// specified record publisher type in stream must be either EFO or POLLING
+			try {
+				RecordPublisherType.valueOf(recordPublisherType);
+			} catch (IllegalArgumentException e) {
+				StringBuilder sb = new StringBuilder();
+				for (RecordPublisherType rp : RecordPublisherType.values()) {
+					sb.append(rp.toString()).append(", ");
+				}
+				throw new IllegalArgumentException("Invalid record publisher type in stream set in config. Valid values are: " + sb.toString());

Review comment:
       @xiaolong-sn this validation is general stream consumer validation yet is within the `validateEFOConfiguration` method. Suggest moving to a new method and invoke from the parent to restrict responsibility. 

##########
File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutPropertiesTest.java
##########
@@ -0,0 +1,120 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;

Review comment:
       @xiaolong-sn Missing Apache copyright 

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
##########
@@ -169,6 +232,55 @@ public static void validateConsumerConfiguration(Properties config) {
 		}
 	}
 
+	public static void validateEFOConfiguration(Properties config, List<String> streams) {
+		if (config.containsKey(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE)) {
+			String recordPublisherType = config.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE);
+
+			// specified record publisher type in stream must be either EFO or POLLING
+			try {
+				RecordPublisherType.valueOf(recordPublisherType);
+			} catch (IllegalArgumentException e) {
+				StringBuilder sb = new StringBuilder();
+				for (RecordPublisherType rp : RecordPublisherType.values()) {
+					sb.append(rp.toString()).append(", ");
+				}
+				throw new IllegalArgumentException("Invalid record publisher type in stream set in config. Valid values are: " + sb.toString());
+			}
+			if (RecordPublisherType.valueOf(recordPublisherType) == RecordPublisherType.EFO) {
+				String efoRegistrationType;
+				if (config.containsKey(ConsumerConfigConstants.EFO_REGISTRATION_TYPE)) {
+					efoRegistrationType = config.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE);
+					// specified efo registration type in stream must be either LAZY, EAGER or NONE.
+					try {
+						EFORegistrationType.valueOf(efoRegistrationType);
+					} catch (IllegalArgumentException e) {
+						StringBuilder sb = new StringBuilder();
+						for (EFORegistrationType ert : EFORegistrationType.values()) {
+							sb.append(ert.toString()).append(", ");
+						}
+						throw new IllegalArgumentException("Invalid efo registration type in stream set in config. Valid values are: " + sb.toString());
+					}
+				} else {
+					efoRegistrationType = EFORegistrationType.LAZY.toString();
+				}
+				if (EFORegistrationType.valueOf(efoRegistrationType) == EFORegistrationType.NONE) {
+					//if the registration type is NONE, then for each stream there must be an according consumer ARN
+					for (String stream : streams) {
+						String efoConsumerARNKey = ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX + "." + stream;
+						if (!config.containsKey(efoConsumerARNKey)) {
+							throw new IllegalArgumentException("Invalid efo consumer arn settings for not providing " + efoConsumerARNKey);

Review comment:
       @xiaolong-sn nit: This is fine, however consider collecting all missing keys rather than fail fast to give the user a single error message

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {
+	private static final long serialVersionUID = 3204635913413261619L;
+
+	private EFORegistrationType efoRegistrationType;
+	@Nullable
+	private String consumerName;
+	@Nullable
+	private List<String> streamConsumerArns;
+
+	private int subscribeToShardMaxRetries;
+
+	private long subscribeToShardMaxBackoffMillis;
+
+	private long subscribeToShardBaseBackoffMillis;
+
+	private double subscribeToShardExpConstant;
+
+	public FanOutProperties(Properties properties, List<String> streams) {
+		//validate the properties
+		Preconditions.checkArgument(properties.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE).equals(RecordPublisherType.EFO.toString()), "Only efo record publisher can register a FanOutProperties.");
+		KinesisConfigUtil.validateEFOConfiguration(properties, streams);
+
+		efoRegistrationType = EFORegistrationType.valueOf(properties.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, EFORegistrationType.EAGER.toString()));
+		//if efo registration type is EAGER|LAZY, then user should explicitly provide a consumer name for each stream.
+		if (efoRegistrationType == EFORegistrationType.EAGER || efoRegistrationType == EFORegistrationType.LAZY) {
+			consumerName = properties.getProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME);
+		} else {
+			//else users should explicitly provide consumer arns.
+			streamConsumerArns = new ArrayList<>();
+			for (String stream:streams) {

Review comment:
       @xiaolong-sn nit: spaces around `:`

##########
File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
##########
@@ -180,6 +182,104 @@ public void testUnrecognizableCredentialProviderTypeInConfig() {
 		KinesisConfigUtil.validateAwsConfiguration(testConfig);
 	}
 
+	// ----------------------------------------------------------------------
+	// validateEFOConfiguration() tests
+	// ----------------------------------------------------------------------
+
+	@Test
+	public void testNoRecordPublisherTypeInConfig() {
+		Properties testConfig = TestUtils.getStandardProperties();
+
+		KinesisConfigUtil.validateEFOConfiguration(testConfig, new ArrayList<>());

Review comment:
       @xiaolong-sn nit: `Collections.emptyList()` is more expressive

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {

Review comment:
       @xiaolong-sn does this need to be `Serializable`? I think the `Properties` are serialised which are used to construct this class, and therefore this class does not need it.

##########
File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutPropertiesTest.java
##########
@@ -0,0 +1,120 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link FanOutProperties}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(FanOutProperties.class)
+public class FanOutPropertiesTest extends TestLogger {
+	@Rule
+	private ExpectedException exception = ExpectedException.none();
+
+	@Test
+	public void testPollingRecordPublisher() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Only efo record publisher can register a FanOutProperties.");
+
+		Properties testConfig = TestUtils.getStandardProperties();
+		testConfig.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, RecordPublisherType.POLLING.toString());
+
+		new FanOutProperties(testConfig, new ArrayList<>());
+	}
+
+	@Test
+	public void testEagerStrategyWithConsumerName() {
+		String fakedConsumerName = "fakedconsumername";
+		Properties testConfig = TestUtils.getStandardProperties();
+		testConfig.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, RecordPublisherType.EFO.toString());
+		testConfig.setProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME, fakedConsumerName);
+		FanOutProperties fanOutProperties = new FanOutProperties(testConfig, new ArrayList<>());
+		assertEquals(fanOutProperties.getConsumerName(), fakedConsumerName);
+	}
+
+	@Test
+	public void testEagerStrategyWithNoConsumerName() {
+		String msg = "No valid enhanced fan-out consumer name is set through " + ConsumerConfigConstants.EFO_CONSUMER_NAME;
+
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage(msg);
+
+		Properties testConfig = TestUtils.getStandardProperties();
+		testConfig.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, RecordPublisherType.EFO.toString());
+		new FanOutProperties(testConfig, new ArrayList<>());
+	}
+
+	@Test
+	public void testNoneStrategyWithStreams() {
+		String fakedStream1 = "fakedstream1";

Review comment:
       @xiaolong-sn these 5 lines can be reduced to `List<String> streams = Arrays.asList("fakedstream1", "fakedstream2");`

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
##########
@@ -137,6 +137,48 @@
 	/* Exponential backoff power constant for the describe stream operation. */
 	private final double describeStreamExpConstant;
 
+	// ------------------------------------------------------------------------
+	//  registerStream() related performance settings
+	// ------------------------------------------------------------------------
+	/** Base backoff millis for the register stream operation. */
+	private final long registerStreamBaseBackoffMillis;
+
+	/** Maximum backoff millis for the register stream operation. */
+	private final long registerStreamMaxBackoffMillis;
+
+	/** Exponential backoff power constant for the register stream operation. */
+	private final double registerStreamExpConstant;
+
+	/** Maximum retry attempts for the register stream operation. */
+	private final int registerStreamMaxRetries;
+	// ------------------------------------------------------------------------
+	//  deregisterStream() related performance settings
+	// ------------------------------------------------------------------------
+	/** Base backoff millis for the deregister stream operation. */
+	private final long deregisterStreamBaseBackoffMillis;
+
+	/** Maximum backoff millis for the deregister stream operation. */
+	private final long deregisterStreamMaxBackoffMillis;
+
+	/** Exponential backoff power constant for the deregister stream operation. */
+	private final double deregisterStreamExpConstant;
+
+	/** Maximum retry attempts for the deregister stream operation. */
+	private final int deregisterStreamMaxRetries;
+	// ------------------------------------------------------------------------
+	//  listStream() related performance settings

Review comment:
       @xiaolong-sn these should be `listStreamConsumers`

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
##########
@@ -137,6 +137,48 @@
 	/* Exponential backoff power constant for the describe stream operation. */
 	private final double describeStreamExpConstant;
 
+	// ------------------------------------------------------------------------

Review comment:
       @xiaolong-sn as discussed the plan was to implement the de-/registration in the `KinesisProxyV2`. Can we please move these properties to a new configuration class, similar to the `FanOutProperties`, or merge them all into a single configuration class?

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
##########
@@ -159,6 +174,54 @@ public static void validateConsumerConfiguration(Properties config) {
 		validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT,
 			"Invalid value given for list shards operation backoff exponential constant. Must be a valid non-negative double value.");
 
+		validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.REGISTER_STREAM_RETRIES,
+			"Invalid value given for maximum retry attempts for register stream operation. Must be a valid non-negative integer value.");
+
+		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_BASE,
+			"Invalid value given for register stream operation base backoff milliseconds. Must be a valid non-negative long value.");
+
+		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_MAX,
+			"Invalid value given for register stream operation max backoff milliseconds. Must be a valid non-negative long value.");
+
+		validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT,
+			"Invalid value given for register stream operation backoff exponential constant. Must be a valid non-negative double value.");
+
+		validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.DEREGISTER_STREAM_RETRIES,
+			"Invalid value given for maximum retry attempts for deregister stream operation. Must be a valid non-negative integer value.");
+
+		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_BASE,
+			"Invalid value given for deregister stream operation base backoff milliseconds. Must be a valid non-negative long value.");
+
+		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_MAX,
+			"Invalid value given for deregister stream operation max backoff milliseconds. Must be a valid non-negative long value.");
+
+		validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT,
+			"Invalid value given for deregister stream operation backoff exponential constant. Must be a valid non-negative double value.");
+
+		validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.LIST_STREAM_RETRIES,

Review comment:
       @xiaolong-sn should be list stream consumers (for all the new list properties)

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {
+	private static final long serialVersionUID = 3204635913413261619L;
+
+	private EFORegistrationType efoRegistrationType;
+	@Nullable
+	private String consumerName;
+	@Nullable
+	private List<String> streamConsumerArns;
+
+	private int subscribeToShardMaxRetries;
+
+	private long subscribeToShardMaxBackoffMillis;
+
+	private long subscribeToShardBaseBackoffMillis;
+
+	private double subscribeToShardExpConstant;
+
+	public FanOutProperties(Properties properties, List<String> streams) {
+		//validate the properties
+		Preconditions.checkArgument(properties.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE).equals(RecordPublisherType.EFO.toString()), "Only efo record publisher can register a FanOutProperties.");
+		KinesisConfigUtil.validateEFOConfiguration(properties, streams);
+
+		efoRegistrationType = EFORegistrationType.valueOf(properties.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, EFORegistrationType.EAGER.toString()));
+		//if efo registration type is EAGER|LAZY, then user should explicitly provide a consumer name for each stream.
+		if (efoRegistrationType == EFORegistrationType.EAGER || efoRegistrationType == EFORegistrationType.LAZY) {
+			consumerName = properties.getProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME);
+		} else {
+			//else users should explicitly provide consumer arns.
+			streamConsumerArns = new ArrayList<>();
+			for (String stream:streams) {
+				String key = ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX + "." + stream;
+				streamConsumerArns.add(properties.getProperty(key));
+			}
+		}
+
+		this.subscribeToShardMaxRetries = Integer.parseInt(
+			properties.getProperty(
+				ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES)));
+		this.subscribeToShardBaseBackoffMillis = Long.parseLong(
+			properties.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_BASE)));
+		this.subscribeToShardMaxBackoffMillis = Long.parseLong(
+			properties.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_MAX)));
+		this.subscribeToShardExpConstant = Double.parseDouble(
+			properties.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT)));
+	}
+
+	public void setEfoRegistrationType(EFORegistrationType efoRegistrationType) {

Review comment:
       @xiaolong-sn as the setters are not needed, we should delete them and make the class immutable (imo). Then we can make the fields `final` (["A good general approach is to try and make as many fields of a class final as possible"](https://flink.apache.org/contributing/code-style-and-quality-common.html))

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {
+	private static final long serialVersionUID = 3204635913413261619L;
+
+	private EFORegistrationType efoRegistrationType;
+	@Nullable

Review comment:
       @xiaolong-sn nit: My preference is a new line before each field (before the annotations)

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {
+	private static final long serialVersionUID = 3204635913413261619L;
+
+	private EFORegistrationType efoRegistrationType;
+	@Nullable
+	private String consumerName;
+	@Nullable
+	private List<String> streamConsumerArns;

Review comment:
       @xiaolong-sn having a `List` means we lose the Stream > ARN relationship. Can you use a `Map` instead where the key is stream and value is ARN?

##########
File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
##########
@@ -277,7 +377,7 @@ public void testIllegalValueForInitialTimestampInConfig() {
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "-1.0");
 
-		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig, new ArrayList<>());

Review comment:
       @xiaolong-sn consider overloading the method for backwards compatibility

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
##########
@@ -66,27 +74,35 @@
 	 **/
 	protected static final String THREAD_POOL_SIZE = "ThreadPoolSize";
 
-	/** Default values for RateLimit. **/
+	/**
+	 * Default values for RateLimit.
+	 **/
 	protected static final long DEFAULT_RATE_LIMIT = 100L;
 
-	/** Default value for ThreadingModel. **/
+	/**
+	 * Default value for ThreadingModel.
+	 **/
 	protected static final KinesisProducerConfiguration.ThreadingModel DEFAULT_THREADING_MODEL = KinesisProducerConfiguration.ThreadingModel.POOLED;
 
-	/** Default values for ThreadPoolSize. **/
+	/**
+	 * Default values for ThreadPoolSize.
+	 **/
 	protected static final int DEFAULT_THREAD_POOL_SIZE = 10;
 
 	/**
 	 * Validate configuration properties for {@link FlinkKinesisConsumer}.
 	 */
-	public static void validateConsumerConfiguration(Properties config) {
+	public static void validateConsumerConfiguration(Properties config, List<String> streams) {
 		checkNotNull(config, "config can not be null");
 
 		validateAwsConfiguration(config);
 
+		validateEFOConfiguration(config, streams);

Review comment:
       @xiaolong-sn nit: Camelcase applies to acronyms too, typically `validateEFOConfiguration` should be `validateEfoConfiguration`. This applies to all instances of fields/methods in the PR

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {
+	private static final long serialVersionUID = 3204635913413261619L;
+
+	private EFORegistrationType efoRegistrationType;
+	@Nullable
+	private String consumerName;
+	@Nullable
+	private List<String> streamConsumerArns;
+
+	private int subscribeToShardMaxRetries;
+
+	private long subscribeToShardMaxBackoffMillis;
+
+	private long subscribeToShardBaseBackoffMillis;
+
+	private double subscribeToShardExpConstant;
+
+	public FanOutProperties(Properties properties, List<String> streams) {
+		//validate the properties
+		Preconditions.checkArgument(properties.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE).equals(RecordPublisherType.EFO.toString()), "Only efo record publisher can register a FanOutProperties.");
+		KinesisConfigUtil.validateEFOConfiguration(properties, streams);
+
+		efoRegistrationType = EFORegistrationType.valueOf(properties.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, EFORegistrationType.EAGER.toString()));
+		//if efo registration type is EAGER|LAZY, then user should explicitly provide a consumer name for each stream.
+		if (efoRegistrationType == EFORegistrationType.EAGER || efoRegistrationType == EFORegistrationType.LAZY) {
+			consumerName = properties.getProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME);
+		} else {
+			//else users should explicitly provide consumer arns.
+			streamConsumerArns = new ArrayList<>();
+			for (String stream:streams) {
+				String key = ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX + "." + stream;
+				streamConsumerArns.add(properties.getProperty(key));

Review comment:
       @xiaolong-sn using a map `streamConsumerArns.put(stream, properties.getProperty(key));`

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
##########
@@ -169,6 +232,55 @@ public static void validateConsumerConfiguration(Properties config) {
 		}
 	}
 
+	public static void validateEFOConfiguration(Properties config, List<String> streams) {
+		if (config.containsKey(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE)) {

Review comment:
       @xiaolong-sn Consider ["Avoid deep nesting of scopes, by flipping the if condition and exiting early."](https://flink.apache.org/contributing/code-style-and-quality-common.html)

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {
+	private static final long serialVersionUID = 3204635913413261619L;
+
+	private EFORegistrationType efoRegistrationType;
+	@Nullable
+	private String consumerName;
+	@Nullable
+	private List<String> streamConsumerArns;
+
+	private int subscribeToShardMaxRetries;
+
+	private long subscribeToShardMaxBackoffMillis;
+
+	private long subscribeToShardBaseBackoffMillis;
+
+	private double subscribeToShardExpConstant;
+
+	public FanOutProperties(Properties properties, List<String> streams) {
+		//validate the properties

Review comment:
       @xiaolong-sn some of your comments might violate the "golden rule", ["Golden rule: Comment as much as necessary to support code understanding, but don’t add redundant information."](https://flink.apache.org/contributing/code-style-and-quality-common.html#comments-and-code-readability)

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {
+	private static final long serialVersionUID = 3204635913413261619L;
+
+	private EFORegistrationType efoRegistrationType;
+	@Nullable
+	private String consumerName;
+	@Nullable
+	private List<String> streamConsumerArns;
+
+	private int subscribeToShardMaxRetries;
+
+	private long subscribeToShardMaxBackoffMillis;
+
+	private long subscribeToShardBaseBackoffMillis;
+
+	private double subscribeToShardExpConstant;
+
+	public FanOutProperties(Properties properties, List<String> streams) {
+		//validate the properties
+		Preconditions.checkArgument(properties.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE).equals(RecordPublisherType.EFO.toString()), "Only efo record publisher can register a FanOutProperties.");
+		KinesisConfigUtil.validateEFOConfiguration(properties, streams);
+
+		efoRegistrationType = EFORegistrationType.valueOf(properties.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, EFORegistrationType.EAGER.toString()));
+		//if efo registration type is EAGER|LAZY, then user should explicitly provide a consumer name for each stream.
+		if (efoRegistrationType == EFORegistrationType.EAGER || efoRegistrationType == EFORegistrationType.LAZY) {
+			consumerName = properties.getProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME);
+		} else {
+			//else users should explicitly provide consumer arns.
+			streamConsumerArns = new ArrayList<>();
+			for (String stream:streams) {
+				String key = ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX + "." + stream;
+				streamConsumerArns.add(properties.getProperty(key));
+			}
+		}
+
+		this.subscribeToShardMaxRetries = Integer.parseInt(
+			properties.getProperty(
+				ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES)));
+		this.subscribeToShardBaseBackoffMillis = Long.parseLong(
+			properties.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_BASE)));
+		this.subscribeToShardMaxBackoffMillis = Long.parseLong(
+			properties.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_MAX)));
+		this.subscribeToShardExpConstant = Double.parseDouble(
+			properties.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT)));
+	}
+
+	public void setEfoRegistrationType(EFORegistrationType efoRegistrationType) {
+		this.efoRegistrationType = efoRegistrationType;
+	}
+
+	public void setConsumerName(@Nullable String consumerName) {
+		this.consumerName = consumerName;
+	}
+
+	public void setStreamConsumerArns(@Nullable List<String> streamConsumerArns) {
+		this.streamConsumerArns = streamConsumerArns;
+	}
+
+	public void setSubscribeToShardMaxRetries(int subscribeToShardMaxRetries) {
+		this.subscribeToShardMaxRetries = subscribeToShardMaxRetries;
+	}
+
+	public void setSubscribeToShardMaxBackoffMillis(long subscribeToShardMaxBackoffMillis) {
+		this.subscribeToShardMaxBackoffMillis = subscribeToShardMaxBackoffMillis;
+	}
+
+	public void setSubscribeToShardBaseBackoffMillis(long subscribeToShardBaseBackoffMillis) {
+		this.subscribeToShardBaseBackoffMillis = subscribeToShardBaseBackoffMillis;
+	}
+
+	public void setSubscribeToShardExpConstant(double subscribeToShardExpConstant) {
+		this.subscribeToShardExpConstant = subscribeToShardExpConstant;
+	}
+
+	public EFORegistrationType getEfoRegistrationType() {
+		return efoRegistrationType;
+	}
+
+	@Nullable
+	public String getConsumerName() {
+		return consumerName;
+	}
+
+	@Nullable
+	public List<String> getStreamConsumerArns() {

Review comment:
       @xiaolong-sn when we have a map we can add a method `public String getStreamConsumerArn(String stream)`

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
##########
@@ -169,6 +232,55 @@ public static void validateConsumerConfiguration(Properties config) {
 		}
 	}
 
+	public static void validateEFOConfiguration(Properties config, List<String> streams) {
+		if (config.containsKey(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE)) {
+			String recordPublisherType = config.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE);
+
+			// specified record publisher type in stream must be either EFO or POLLING
+			try {
+				RecordPublisherType.valueOf(recordPublisherType);
+			} catch (IllegalArgumentException e) {
+				StringBuilder sb = new StringBuilder();
+				for (RecordPublisherType rp : RecordPublisherType.values()) {
+					sb.append(rp.toString()).append(", ");
+				}
+				throw new IllegalArgumentException("Invalid record publisher type in stream set in config. Valid values are: " + sb.toString());
+			}
+			if (RecordPublisherType.valueOf(recordPublisherType) == RecordPublisherType.EFO) {
+				String efoRegistrationType;
+				if (config.containsKey(ConsumerConfigConstants.EFO_REGISTRATION_TYPE)) {
+					efoRegistrationType = config.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE);
+					// specified efo registration type in stream must be either LAZY, EAGER or NONE.
+					try {
+						EFORegistrationType.valueOf(efoRegistrationType);
+					} catch (IllegalArgumentException e) {
+						StringBuilder sb = new StringBuilder();

Review comment:
       @xiaolong-sn nit: nit be nicer to build with stream
   
   ```
   String errorMessage = Arrays.stream(EFORegistrationType.values())
       .map(Enum::name).collect(Collectors.joining(", "));
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12920:
URL: https://github.com/apache/flink/pull/12920#issuecomment-659996635


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5f259aa1cf05a64249f9ac6edfb6971bb9b33841",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4583",
       "triggerID" : "5f259aa1cf05a64249f9ac6edfb6971bb9b33841",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cb726b5ff64a0d57fa13a5033a354828447c2b99",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4658",
       "triggerID" : "cb726b5ff64a0d57fa13a5033a354828447c2b99",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5aef6d842709a1ec308fb8cfe89e0928ebf19f7e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4660",
       "triggerID" : "5aef6d842709a1ec308fb8cfe89e0928ebf19f7e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "68304c8bc1e5ccae037269302bd4c15ea41dc7a8",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4679",
       "triggerID" : "68304c8bc1e5ccae037269302bd4c15ea41dc7a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "68304c8bc1e5ccae037269302bd4c15ea41dc7a8",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4815",
       "triggerID" : "661623165",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "68304c8bc1e5ccae037269302bd4c15ea41dc7a8",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4816",
       "triggerID" : "661807806",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "68304c8bc1e5ccae037269302bd4c15ea41dc7a8",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4679",
       "triggerID" : "661623165",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "68304c8bc1e5ccae037269302bd4c15ea41dc7a8",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4679",
       "triggerID" : "661807806",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 68304c8bc1e5ccae037269302bd4c15ea41dc7a8 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4815) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4816) Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4679) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #12920:
URL: https://github.com/apache/flink/pull/12920#discussion_r457139138



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {
+	private static final long serialVersionUID = 3204635913413261619L;
+
+	private EFORegistrationType efoRegistrationType;
+	@Nullable
+	private String consumerName;
+	@Nullable
+	private List<String> streamConsumerArns;

Review comment:
       Yes, I replace the `List` with `Map` now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #12920:
URL: https://github.com/apache/flink/pull/12920#discussion_r461279105



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,377 @@
+/*

Review comment:
       OK

##########
File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
##########
@@ -255,7 +351,7 @@ public void testStreamInitPositionTypeSetToAtTimestampButNoInitTimestampSetInCon
 	}
 
 	@Test
-	public void testUnparsableDateForInitialTimestampInConfig() {
+	public void testUnparsableDatEforInitialTimestampInConfig() {

Review comment:
       Sorry, I think I made a spell mistake here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #12920:
URL: https://github.com/apache/flink/pull/12920#discussion_r457140642



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {
+	private static final long serialVersionUID = 3204635913413261619L;
+
+	private EFORegistrationType efoRegistrationType;
+	@Nullable
+	private String consumerName;
+	@Nullable
+	private List<String> streamConsumerArns;
+
+	private int subscribeToShardMaxRetries;
+
+	private long subscribeToShardMaxBackoffMillis;
+
+	private long subscribeToShardBaseBackoffMillis;
+
+	private double subscribeToShardExpConstant;
+
+	public FanOutProperties(Properties properties, List<String> streams) {
+		//validate the properties
+		Preconditions.checkArgument(properties.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE).equals(RecordPublisherType.EFO.toString()), "Only efo record publisher can register a FanOutProperties.");
+		KinesisConfigUtil.validateEFOConfiguration(properties, streams);
+
+		efoRegistrationType = EFORegistrationType.valueOf(properties.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, EFORegistrationType.EAGER.toString()));
+		//if efo registration type is EAGER|LAZY, then user should explicitly provide a consumer name for each stream.
+		if (efoRegistrationType == EFORegistrationType.EAGER || efoRegistrationType == EFORegistrationType.LAZY) {
+			consumerName = properties.getProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME);
+		} else {
+			//else users should explicitly provide consumer arns.
+			streamConsumerArns = new ArrayList<>();
+			for (String stream:streams) {
+				String key = ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX + "." + stream;
+				streamConsumerArns.add(properties.getProperty(key));
+			}
+		}
+
+		this.subscribeToShardMaxRetries = Integer.parseInt(
+			properties.getProperty(
+				ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES)));
+		this.subscribeToShardBaseBackoffMillis = Long.parseLong(
+			properties.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_BASE)));
+		this.subscribeToShardMaxBackoffMillis = Long.parseLong(
+			properties.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_MAX)));
+		this.subscribeToShardExpConstant = Double.parseDouble(
+			properties.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT)));
+	}
+
+	public void setEfoRegistrationType(EFORegistrationType efoRegistrationType) {
+		this.efoRegistrationType = efoRegistrationType;
+	}
+
+	public void setConsumerName(@Nullable String consumerName) {
+		this.consumerName = consumerName;
+	}
+
+	public void setStreamConsumerArns(@Nullable List<String> streamConsumerArns) {
+		this.streamConsumerArns = streamConsumerArns;
+	}
+
+	public void setSubscribeToShardMaxRetries(int subscribeToShardMaxRetries) {
+		this.subscribeToShardMaxRetries = subscribeToShardMaxRetries;
+	}
+
+	public void setSubscribeToShardMaxBackoffMillis(long subscribeToShardMaxBackoffMillis) {
+		this.subscribeToShardMaxBackoffMillis = subscribeToShardMaxBackoffMillis;
+	}
+
+	public void setSubscribeToShardBaseBackoffMillis(long subscribeToShardBaseBackoffMillis) {
+		this.subscribeToShardBaseBackoffMillis = subscribeToShardBaseBackoffMillis;
+	}
+
+	public void setSubscribeToShardExpConstant(double subscribeToShardExpConstant) {
+		this.subscribeToShardExpConstant = subscribeToShardExpConstant;
+	}
+
+	public EFORegistrationType getEfoRegistrationType() {
+		return efoRegistrationType;
+	}
+
+	@Nullable
+	public String getConsumerName() {
+		return consumerName;
+	}
+
+	@Nullable
+	public List<String> getStreamConsumerArns() {

Review comment:
       Yes, I added one. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #12920:
URL: https://github.com/apache/flink/pull/12920#discussion_r457984340



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
##########
@@ -169,6 +245,72 @@ public static void validateConsumerConfiguration(Properties config) {
 		}
 	}
 
+	/**
+	 * Validate the record publisher type.
+	 * @param config config properties
+	 * @return if `ConsumerConfigConstants.RECORD_PUBLISHER_TYPE` is set, return the parsed record publisher type. Else return polling record publisher type.
+	 */
+	public static RecordPublisherType validateRecordPublisherType(Properties config) {
+		if (config.containsKey(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE)) {
+			String recordPublisherType = config.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE);
+
+			// specified record publisher type in stream must be either EFO or POLLING
+			try {
+				return RecordPublisherType.valueOf(recordPublisherType);
+			} catch (IllegalArgumentException e) {
+				String errorMessage = Arrays.stream(RecordPublisherType.values())
+					.map(Enum::name).collect(Collectors.joining(", "));
+				throw new IllegalArgumentException("Invalid record publisher type in stream set in config. Valid values are: " + errorMessage);
+			}
+		} else {
+			return RecordPublisherType.POLLING;
+		}
+	}
+
+	/**
+	 * Validate if the given config is a valid EFO configuration.
+	 * @param config  config properties.
+	 * @param streams the streams which is sent to match the EFO consumer arn if the EFO registration mode is set to `NONE`.
+	 */
+	public static void validateEfoConfiguration(Properties config, List<String> streams) {
+		String efoRegistrationType;

Review comment:
       I've got your point. That is brilliant.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on pull request #12920:
URL: https://github.com/apache/flink/pull/12920#issuecomment-661623165


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dannycranmer commented on a change in pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
dannycranmer commented on a change in pull request #12920:
URL: https://github.com/apache/flink/pull/12920#discussion_r457941435



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
##########
@@ -86,6 +125,54 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
 	/** The power constant for exponential backoff between each listShards attempt. */
 	public static final String LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT = "flink.list.shards.backoff.expconst";
 
+	/** The maximum number of registerStream attempts if we get a recoverable exception. */
+	public static final String REGISTER_STREAM_RETRIES = "flink.stream.registerstreamconsumer.maxretries";
+
+	/** The base backoff time between each registerStream attempt. */
+	public static final String REGISTER_STREAM_BACKOFF_BASE = "flink.stream.registerstreamconsumer.backoff.base";
+
+	/** The maximum backoff time between each registerStream attempt. */
+	public static final String REGISTER_STREAM_BACKOFF_MAX = "flink.stream.registerstreamconsumer.backoff.max";
+
+	/** The power constant for exponential backoff between each registerStream attempt. */
+	public static final String REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT = "flink.stream.registerstreamconsumer.backoff.expconst";
+
+	/** The maximum number of deregisterStream attempts if we get a recoverable exception. */
+	public static final String DEREGISTER_STREAM_RETRIES = "flink.stream.deregisterstreamconsumer.maxretries";
+
+	/** The base backoff time between each deregisterStream attempt. */
+	public static final String DEREGISTER_STREAM_BACKOFF_BASE = "flink.stream.deregisterstreamconsumer.backoff.base";
+
+	/** The maximum backoff time between each deregisterStream attempt. */
+	public static final String DEREGISTER_STREAM_BACKOFF_MAX = "flink.stream.deregisterstreamconsumer.backoff.max";
+
+	/** The power constant for exponential backoff between each deregisterStream attempt. */
+	public static final String DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT = "flink.stream.deregisterstreamconsumer.backoff.expconst";
+
+	/** The maximum number of listStream attempts if we get a recoverable exception. */

Review comment:
       @xiaolong-sn outdated comments for ListStreamConsumers

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
##########
@@ -86,6 +125,54 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
 	/** The power constant for exponential backoff between each listShards attempt. */
 	public static final String LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT = "flink.list.shards.backoff.expconst";
 
+	/** The maximum number of registerStream attempts if we get a recoverable exception. */
+	public static final String REGISTER_STREAM_RETRIES = "flink.stream.registerstreamconsumer.maxretries";
+
+	/** The base backoff time between each registerStream attempt. */
+	public static final String REGISTER_STREAM_BACKOFF_BASE = "flink.stream.registerstreamconsumer.backoff.base";
+
+	/** The maximum backoff time between each registerStream attempt. */
+	public static final String REGISTER_STREAM_BACKOFF_MAX = "flink.stream.registerstreamconsumer.backoff.max";
+
+	/** The power constant for exponential backoff between each registerStream attempt. */
+	public static final String REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT = "flink.stream.registerstreamconsumer.backoff.expconst";
+
+	/** The maximum number of deregisterStream attempts if we get a recoverable exception. */
+	public static final String DEREGISTER_STREAM_RETRIES = "flink.stream.deregisterstreamconsumer.maxretries";
+
+	/** The base backoff time between each deregisterStream attempt. */
+	public static final String DEREGISTER_STREAM_BACKOFF_BASE = "flink.stream.deregisterstreamconsumer.backoff.base";
+
+	/** The maximum backoff time between each deregisterStream attempt. */
+	public static final String DEREGISTER_STREAM_BACKOFF_MAX = "flink.stream.deregisterstreamconsumer.backoff.max";
+
+	/** The power constant for exponential backoff between each deregisterStream attempt. */
+	public static final String DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT = "flink.stream.deregisterstreamconsumer.backoff.expconst";
+
+	/** The maximum number of listStream attempts if we get a recoverable exception. */
+	public static final String LIST_STREAM_CONSUMERS_RETRIES = "flink.stream.liststreamconsumer.maxretries";
+
+	/** The base backoff time between each listStream attempt. */
+	public static final String LIST_STREAM_CONSUMERS_BACKOFF_BASE = "flink.stream.liststreamconsumer.backoff.base";
+
+	/** The maximum backoff time between each listStream attempt. */
+	public static final String LIST_STREAM_CONSUMERS_BACKOFF_MAX = "flink.stream.liststreamconsumer.backoff.max";
+
+	/** The power constant for exponential backoff between each listStream attempt. */
+	public static final String LIST_STREAM_CONSUMERS_BACKOFF_EXPONENTIAL_CONSTANT = "flink.stream.liststreamconsumer.backoff.expconst";
+
+	/** The maximum number of subscribeToShard attempts if we get a recoverable exception. */
+	public static final String SUBSCRIBE_TO_SHARD_RETRIES = "flink.shard.subscribetoshard.maxretries";
+
+	/** The base backoff time between each subscribeToShard  attempt. */
+	public static final String SUBSCRIBE_TO_SHARD_BACKOFF_BASE = "flink.shard.subscribetoshard.backoff.base";
+
+	/** The maximum backoff time between each subscribeToShard  attempt. */

Review comment:
       @xiaolong-sn nit: Additional space after `subscribeToShard` 

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties {
+
+	/**
+	 * The efo registration type for de-/registration of streams.
+	 */
+	private final EFORegistrationType efoRegistrationType;
+
+	/**
+	 * The efo stream consumer name. Should not be Null if the efoRegistrationType is either LAZY or EAGER.
+	 */
+	@Nullable
+	private String consumerName;
+
+	/**
+	 * The manual set efo consumer arns for each stream. Should not be Null if the efoRegistrationType is NONE
+	 */
+	@Nullable
+	private Map<String, String> streamConsumerArns;
+
+	/**
+	 * Base backoff millis for the deregister stream operation.
+	 */
+	private final int subscribeToShardMaxRetries;
+
+	/**
+	 * Maximum backoff millis for the subscribe to shard operation.
+	 */
+	private final long subscribeToShardMaxBackoffMillis;
+
+	/**
+	 * Base backoff millis for the subscribe to shard operation.
+	 */
+	private final long subscribeToShardBaseBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the subscribe to shard operation.
+	 */
+	private final double subscribeToShardExpConstant;
+
+	/**
+	 * Base backoff millis for the register stream operation.
+	 */
+	private final long registerStreamBaseBackoffMillis;
+
+	/**
+	 * Maximum backoff millis for the register stream operation.
+	 */
+	private final long registerStreamMaxBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the register stream operation.
+	 */
+	private final double registerStreamExpConstant;
+
+	/**
+	 * Maximum retry attempts for the register stream operation.
+	 */
+	private final int registerStreamMaxRetries;
+
+	/**
+	 * Base backoff millis for the deregister stream operation.
+	 */
+	private final long deregisterStreamBaseBackoffMillis;
+
+	/**
+	 * Maximum backoff millis for the deregister stream operation.
+	 */
+	private final long deregisterStreamMaxBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the deregister stream operation.
+	 */
+	private final double deregisterStreamExpConstant;
+
+	/**
+	 * Maximum retry attempts for the deregister stream operation.
+	 */
+	private final int deregisterStreamMaxRetries;
+
+	/**
+	 * Base backoff millis for the list stream operation.

Review comment:
       @xiaolong-sn outdated comments

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties {
+
+	/**
+	 * The efo registration type for de-/registration of streams.
+	 */
+	private final EFORegistrationType efoRegistrationType;
+
+	/**
+	 * The efo stream consumer name. Should not be Null if the efoRegistrationType is either LAZY or EAGER.
+	 */
+	@Nullable
+	private String consumerName;
+
+	/**
+	 * The manual set efo consumer arns for each stream. Should not be Null if the efoRegistrationType is NONE
+	 */
+	@Nullable
+	private Map<String, String> streamConsumerArns;
+
+	/**
+	 * Base backoff millis for the deregister stream operation.
+	 */
+	private final int subscribeToShardMaxRetries;
+
+	/**
+	 * Maximum backoff millis for the subscribe to shard operation.
+	 */
+	private final long subscribeToShardMaxBackoffMillis;
+
+	/**
+	 * Base backoff millis for the subscribe to shard operation.
+	 */
+	private final long subscribeToShardBaseBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the subscribe to shard operation.
+	 */
+	private final double subscribeToShardExpConstant;
+
+	/**
+	 * Base backoff millis for the register stream operation.
+	 */
+	private final long registerStreamBaseBackoffMillis;
+
+	/**
+	 * Maximum backoff millis for the register stream operation.
+	 */
+	private final long registerStreamMaxBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the register stream operation.
+	 */
+	private final double registerStreamExpConstant;
+
+	/**
+	 * Maximum retry attempts for the register stream operation.
+	 */
+	private final int registerStreamMaxRetries;
+
+	/**
+	 * Base backoff millis for the deregister stream operation.
+	 */
+	private final long deregisterStreamBaseBackoffMillis;
+
+	/**
+	 * Maximum backoff millis for the deregister stream operation.
+	 */
+	private final long deregisterStreamMaxBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the deregister stream operation.
+	 */
+	private final double deregisterStreamExpConstant;
+
+	/**
+	 * Maximum retry attempts for the deregister stream operation.
+	 */
+	private final int deregisterStreamMaxRetries;
+
+	/**
+	 * Base backoff millis for the list stream operation.
+	 */
+	private final long listStreamConsumersBaseBackoffMillis;
+
+	/**
+	 * Maximum backoff millis for the list stream operation.
+	 */
+	private final long listStreamConsumersMaxBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the list stream operation.
+	 */
+	private final double listStreamConsumersExpConstant;
+
+	/**
+	 * Maximum retry attempts for the list stream operation.
+	 */
+	private final int listStreamConsumersMaxRetries;
+
+	// ------------------------------------------------------------------------
+	//  registerStream() related performance settings

Review comment:
       @xiaolong-sn I think this comment is misplaced?

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties {
+
+	/**
+	 * The efo registration type for de-/registration of streams.
+	 */
+	private final EFORegistrationType efoRegistrationType;
+
+	/**
+	 * The efo stream consumer name. Should not be Null if the efoRegistrationType is either LAZY or EAGER.
+	 */
+	@Nullable
+	private String consumerName;
+
+	/**
+	 * The manual set efo consumer arns for each stream. Should not be Null if the efoRegistrationType is NONE
+	 */
+	@Nullable
+	private Map<String, String> streamConsumerArns;
+
+	/**
+	 * Base backoff millis for the deregister stream operation.
+	 */
+	private final int subscribeToShardMaxRetries;
+
+	/**
+	 * Maximum backoff millis for the subscribe to shard operation.
+	 */
+	private final long subscribeToShardMaxBackoffMillis;
+
+	/**
+	 * Base backoff millis for the subscribe to shard operation.
+	 */
+	private final long subscribeToShardBaseBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the subscribe to shard operation.
+	 */
+	private final double subscribeToShardExpConstant;
+
+	/**
+	 * Base backoff millis for the register stream operation.
+	 */
+	private final long registerStreamBaseBackoffMillis;
+
+	/**
+	 * Maximum backoff millis for the register stream operation.
+	 */
+	private final long registerStreamMaxBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the register stream operation.
+	 */
+	private final double registerStreamExpConstant;
+
+	/**
+	 * Maximum retry attempts for the register stream operation.
+	 */
+	private final int registerStreamMaxRetries;
+
+	/**
+	 * Base backoff millis for the deregister stream operation.
+	 */
+	private final long deregisterStreamBaseBackoffMillis;
+
+	/**
+	 * Maximum backoff millis for the deregister stream operation.
+	 */
+	private final long deregisterStreamMaxBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the deregister stream operation.
+	 */
+	private final double deregisterStreamExpConstant;
+
+	/**
+	 * Maximum retry attempts for the deregister stream operation.
+	 */
+	private final int deregisterStreamMaxRetries;
+
+	/**
+	 * Base backoff millis for the list stream operation.
+	 */
+	private final long listStreamConsumersBaseBackoffMillis;
+
+	/**
+	 * Maximum backoff millis for the list stream operation.
+	 */
+	private final long listStreamConsumersMaxBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the list stream operation.
+	 */
+	private final double listStreamConsumersExpConstant;
+
+	/**
+	 * Maximum retry attempts for the list stream operation.
+	 */
+	private final int listStreamConsumersMaxRetries;
+
+	// ------------------------------------------------------------------------
+	//  registerStream() related performance settings
+	// ------------------------------------------------------------------------
+	/**
+	 * Creates a FanOutProperties.
+	 *
+	 * @param configProps the configuration properties from config file.
+	 * @param streams     the streams which is sent to match the EFO consumer arn if the EFO registration mode is set to `NONE`.
+	 */
+	public FanOutProperties(Properties configProps, List<String> streams) {
+		Preconditions.checkArgument(configProps.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE).equals(RecordPublisherType.EFO.toString()), "Only efo record publisher can register a FanOutProperties.");
+		KinesisConfigUtil.validateEfoConfiguration(configProps, streams);
+
+		efoRegistrationType = EFORegistrationType.valueOf(configProps.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, EFORegistrationType.EAGER.toString()));
+		//if efo registration type is EAGER|LAZY, then user should explicitly provide a consumer name for each stream.
+		if (efoRegistrationType == EFORegistrationType.EAGER || efoRegistrationType == EFORegistrationType.LAZY) {
+			consumerName = configProps.getProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME);
+		} else {
+			//else users should explicitly provide consumer arns.
+			streamConsumerArns = new HashMap<>();
+			for (String stream : streams) {
+				String key = ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX + "." + stream;
+				streamConsumerArns.put(stream, configProps.getProperty(key));
+			}
+		}
+
+		this.subscribeToShardMaxRetries = Integer.parseInt(

Review comment:
       @xiaolong-sn nit: personally I would break these blocks into private methods for readability, but this is developer preference 

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties {
+
+	/**
+	 * The efo registration type for de-/registration of streams.
+	 */
+	private final EFORegistrationType efoRegistrationType;
+
+	/**
+	 * The efo stream consumer name. Should not be Null if the efoRegistrationType is either LAZY or EAGER.
+	 */
+	@Nullable
+	private String consumerName;
+
+	/**
+	 * The manual set efo consumer arns for each stream. Should not be Null if the efoRegistrationType is NONE
+	 */
+	@Nullable
+	private Map<String, String> streamConsumerArns;
+
+	/**
+	 * Base backoff millis for the deregister stream operation.
+	 */
+	private final int subscribeToShardMaxRetries;
+
+	/**
+	 * Maximum backoff millis for the subscribe to shard operation.
+	 */
+	private final long subscribeToShardMaxBackoffMillis;
+
+	/**
+	 * Base backoff millis for the subscribe to shard operation.
+	 */
+	private final long subscribeToShardBaseBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the subscribe to shard operation.
+	 */
+	private final double subscribeToShardExpConstant;
+
+	/**
+	 * Base backoff millis for the register stream operation.
+	 */
+	private final long registerStreamBaseBackoffMillis;
+
+	/**
+	 * Maximum backoff millis for the register stream operation.
+	 */
+	private final long registerStreamMaxBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the register stream operation.
+	 */
+	private final double registerStreamExpConstant;
+
+	/**
+	 * Maximum retry attempts for the register stream operation.
+	 */
+	private final int registerStreamMaxRetries;
+
+	/**
+	 * Base backoff millis for the deregister stream operation.
+	 */
+	private final long deregisterStreamBaseBackoffMillis;
+
+	/**
+	 * Maximum backoff millis for the deregister stream operation.
+	 */
+	private final long deregisterStreamMaxBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the deregister stream operation.
+	 */
+	private final double deregisterStreamExpConstant;
+
+	/**
+	 * Maximum retry attempts for the deregister stream operation.
+	 */
+	private final int deregisterStreamMaxRetries;
+
+	/**
+	 * Base backoff millis for the list stream operation.
+	 */
+	private final long listStreamConsumersBaseBackoffMillis;
+
+	/**
+	 * Maximum backoff millis for the list stream operation.
+	 */
+	private final long listStreamConsumersMaxBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the list stream operation.
+	 */
+	private final double listStreamConsumersExpConstant;
+
+	/**
+	 * Maximum retry attempts for the list stream operation.
+	 */
+	private final int listStreamConsumersMaxRetries;
+
+	// ------------------------------------------------------------------------
+	//  registerStream() related performance settings
+	// ------------------------------------------------------------------------
+	/**
+	 * Creates a FanOutProperties.
+	 *
+	 * @param configProps the configuration properties from config file.
+	 * @param streams     the streams which is sent to match the EFO consumer arn if the EFO registration mode is set to `NONE`.
+	 */
+	public FanOutProperties(Properties configProps, List<String> streams) {
+		Preconditions.checkArgument(configProps.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE).equals(RecordPublisherType.EFO.toString()), "Only efo record publisher can register a FanOutProperties.");
+		KinesisConfigUtil.validateEfoConfiguration(configProps, streams);
+
+		efoRegistrationType = EFORegistrationType.valueOf(configProps.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, EFORegistrationType.EAGER.toString()));
+		//if efo registration type is EAGER|LAZY, then user should explicitly provide a consumer name for each stream.
+		if (efoRegistrationType == EFORegistrationType.EAGER || efoRegistrationType == EFORegistrationType.LAZY) {
+			consumerName = configProps.getProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME);
+		} else {
+			//else users should explicitly provide consumer arns.
+			streamConsumerArns = new HashMap<>();
+			for (String stream : streams) {
+				String key = ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX + "." + stream;
+				streamConsumerArns.put(stream, configProps.getProperty(key));
+			}
+		}
+
+		this.subscribeToShardMaxRetries = Integer.parseInt(
+			configProps.getProperty(
+				ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES)));
+		this.subscribeToShardBaseBackoffMillis = Long.parseLong(
+			configProps.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_BASE)));
+		this.subscribeToShardMaxBackoffMillis = Long.parseLong(
+			configProps.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_MAX)));
+		this.subscribeToShardExpConstant = Double.parseDouble(
+			configProps.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT)));
+
+		this.registerStreamBaseBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_BACKOFF_BASE)));
+		this.registerStreamMaxBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_BACKOFF_MAX)));
+		this.registerStreamExpConstant = Double.parseDouble(
+			configProps.getProperty(
+				ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT)));
+		this.registerStreamMaxRetries = Integer.parseInt(
+			configProps.getProperty(
+				ConsumerConfigConstants.REGISTER_STREAM_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_RETRIES)));
+
+		this.deregisterStreamBaseBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_BACKOFF_BASE)));
+		this.deregisterStreamMaxBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_BACKOFF_MAX)));
+		this.deregisterStreamExpConstant = Double.parseDouble(
+			configProps.getProperty(
+				ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT)));
+		this.deregisterStreamMaxRetries = Integer.parseInt(
+			configProps.getProperty(
+				ConsumerConfigConstants.DEREGISTER_STREAM_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_RETRIES)));
+
+		this.listStreamConsumersBaseBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.LIST_STREAM_CONSUMERS_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_LIST_STREAM_CONSUMERS_BACKOFF_BASE)));
+		this.listStreamConsumersMaxBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.LIST_STREAM_CONSUMERS_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_LIST_STREAM_CONSUMERS_BACKOFF_MAX)));
+		this.listStreamConsumersExpConstant = Double.parseDouble(
+			configProps.getProperty(
+				ConsumerConfigConstants.LIST_STREAM_CONSUMERS_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_LIST_STREAM_CONSUMERS_BACKOFF_EXPONENTIAL_CONSTANT)));
+		this.listStreamConsumersMaxRetries = Integer.parseInt(
+			configProps.getProperty(
+				ConsumerConfigConstants.LIST_STREAM_CONSUMERS_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_LIST_STREAM_CONSUMERS_RETRIES)));
+	}
+
+	// ------------------------------------------------------------------------
+	//  subscribeToShard() related performance settings
+	// ------------------------------------------------------------------------
+	/**
+	 * Get maximum retry attempts for the subscribe to shard operation.
+	 */
+	public int getSubscribeToShardMaxRetries() {
+		return subscribeToShardMaxRetries;
+	}
+
+	/**
+	 * Get maximum backoff millis for the subscribe to shard operation.
+	 */
+	public long getSubscribeToShardMaxBackoffMillis() {
+		return subscribeToShardMaxBackoffMillis;
+	}
+
+	/**
+	 * Get base backoff millis for the subscribe to shard operation.
+	 */
+	public long getSubscribeToShardBaseBackoffMillis() {
+		return subscribeToShardBaseBackoffMillis;
+	}
+
+	/**
+	 * Get exponential backoff power constant for the subscribe to shard operation.
+	 */
+	public double getSubscribeToShardExpConstant() {
+		return subscribeToShardExpConstant;
+	}
+	// ------------------------------------------------------------------------
+	//  registerStream() related performance settings
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Get base backoff millis for the register stream operation.
+	 */
+	public long getRegisterStreamBaseBackoffMillis() {
+		return registerStreamBaseBackoffMillis;
+	}
+
+	/**
+	 * Get maximum backoff millis for the register stream operation.
+	 */
+	public long getRegisterStreamMaxBackoffMillis() {
+		return registerStreamMaxBackoffMillis;
+	}
+
+	/**
+	 * Get exponential backoff power constant for the register stream operation.
+	 */
+	public double getRegisterStreamExpConstant() {
+		return registerStreamExpConstant;
+	}
+
+	/**
+	 * Get maximum retry attempts for the register stream operation.
+	 */
+	public int getRegisterStreamMaxRetries() {
+		return registerStreamMaxRetries;
+	}
+
+	// ------------------------------------------------------------------------
+	//  deregisterStream() related performance settings
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Get base backoff millis for the deregister stream operation.
+	 */
+	public long getDeregisterStreamBaseBackoffMillis() {
+		return deregisterStreamBaseBackoffMillis;
+	}
+
+	/**
+	 * Get maximum backoff millis for the deregister stream operation.
+	 */
+	public long getDeregisterStreamMaxBackoffMillis() {
+		return deregisterStreamMaxBackoffMillis;
+	}
+
+	/**
+	 * Get exponential backoff power constant for the deregister stream operation.
+	 */
+	public double getDeregisterStreamExpConstant() {
+		return deregisterStreamExpConstant;
+	}
+
+	/**
+	 * Get maximum retry attempts for the register stream operation.
+	 */
+	public int getDeregisterStreamMaxRetries() {
+		return deregisterStreamMaxRetries;
+	}
+	// ------------------------------------------------------------------------
+	//  listStream() related performance settings
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Get base backoff millis for the list stream consumers operation.
+	 */
+	public long getListStreamConsumersBaseBackoffMillis() {
+		return listStreamConsumersBaseBackoffMillis;
+	}
+
+	/**
+	 * Get maximum backoff millis for the list stream consumers operation.
+	 */
+	public long getListStreamConsumersMaxBackoffMillis() {
+		return listStreamConsumersMaxBackoffMillis;
+	}
+
+	/**
+	 * Get exponential backoff power constant for the list stream consumers operation.
+	 */
+	public double getListStreamConsumersExpConstant() {
+		return listStreamConsumersExpConstant;
+	}
+
+	/**
+	 * Get maximum retry attempts for the list stream consumers operation.
+	 */
+	public int getListStreamConsumersMaxRetries() {
+		return listStreamConsumersMaxRetries;
+	}
+
+	/**
+	 * Get efo registration type.
+	 */
+	public EFORegistrationType getEfoRegistrationType() {
+		return efoRegistrationType;
+	}
+
+	/**
+	 * Get consumer name, will be null if efo registration type is 'NONE'.
+	 */
+	@Nullable
+	public String getConsumerName() {
+		return consumerName;
+	}
+
+	/**
+	 * Get stream consumer arns, will be null if efo registration type is 'LAZY' or 'EAGER'.
+	 */
+	@Nullable
+	public Map<String, String> getStreamConsumerArns() {

Review comment:
       @xiaolong-sn same as above, use `Optional`

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
##########
@@ -169,6 +245,72 @@ public static void validateConsumerConfiguration(Properties config) {
 		}
 	}
 
+	/**
+	 * Validate the record publisher type.
+	 * @param config config properties
+	 * @return if `ConsumerConfigConstants.RECORD_PUBLISHER_TYPE` is set, return the parsed record publisher type. Else return polling record publisher type.

Review comment:
       @xiaolong-sn nit: Looks like you are using markdown in the javadoc. Usually you would write code using `{@code code goes here}` or links to classes/values `{@link MyClass}` like this

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties {
+
+	/**
+	 * The efo registration type for de-/registration of streams.
+	 */
+	private final EFORegistrationType efoRegistrationType;
+
+	/**
+	 * The efo stream consumer name. Should not be Null if the efoRegistrationType is either LAZY or EAGER.
+	 */
+	@Nullable
+	private String consumerName;
+
+	/**
+	 * The manual set efo consumer arns for each stream. Should not be Null if the efoRegistrationType is NONE
+	 */
+	@Nullable
+	private Map<String, String> streamConsumerArns;
+
+	/**
+	 * Base backoff millis for the deregister stream operation.
+	 */
+	private final int subscribeToShardMaxRetries;
+
+	/**
+	 * Maximum backoff millis for the subscribe to shard operation.
+	 */
+	private final long subscribeToShardMaxBackoffMillis;
+
+	/**
+	 * Base backoff millis for the subscribe to shard operation.
+	 */
+	private final long subscribeToShardBaseBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the subscribe to shard operation.
+	 */
+	private final double subscribeToShardExpConstant;
+
+	/**
+	 * Base backoff millis for the register stream operation.
+	 */
+	private final long registerStreamBaseBackoffMillis;
+
+	/**
+	 * Maximum backoff millis for the register stream operation.
+	 */
+	private final long registerStreamMaxBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the register stream operation.
+	 */
+	private final double registerStreamExpConstant;
+
+	/**
+	 * Maximum retry attempts for the register stream operation.
+	 */
+	private final int registerStreamMaxRetries;
+
+	/**
+	 * Base backoff millis for the deregister stream operation.
+	 */
+	private final long deregisterStreamBaseBackoffMillis;
+
+	/**
+	 * Maximum backoff millis for the deregister stream operation.
+	 */
+	private final long deregisterStreamMaxBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the deregister stream operation.
+	 */
+	private final double deregisterStreamExpConstant;
+
+	/**
+	 * Maximum retry attempts for the deregister stream operation.
+	 */
+	private final int deregisterStreamMaxRetries;
+
+	/**
+	 * Base backoff millis for the list stream operation.
+	 */
+	private final long listStreamConsumersBaseBackoffMillis;
+
+	/**
+	 * Maximum backoff millis for the list stream operation.
+	 */
+	private final long listStreamConsumersMaxBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the list stream operation.
+	 */
+	private final double listStreamConsumersExpConstant;
+
+	/**
+	 * Maximum retry attempts for the list stream operation.
+	 */
+	private final int listStreamConsumersMaxRetries;
+
+	// ------------------------------------------------------------------------
+	//  registerStream() related performance settings
+	// ------------------------------------------------------------------------
+	/**
+	 * Creates a FanOutProperties.
+	 *
+	 * @param configProps the configuration properties from config file.
+	 * @param streams     the streams which is sent to match the EFO consumer arn if the EFO registration mode is set to `NONE`.
+	 */
+	public FanOutProperties(Properties configProps, List<String> streams) {
+		Preconditions.checkArgument(configProps.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE).equals(RecordPublisherType.EFO.toString()), "Only efo record publisher can register a FanOutProperties.");
+		KinesisConfigUtil.validateEfoConfiguration(configProps, streams);
+
+		efoRegistrationType = EFORegistrationType.valueOf(configProps.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, EFORegistrationType.EAGER.toString()));
+		//if efo registration type is EAGER|LAZY, then user should explicitly provide a consumer name for each stream.
+		if (efoRegistrationType == EFORegistrationType.EAGER || efoRegistrationType == EFORegistrationType.LAZY) {
+			consumerName = configProps.getProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME);
+		} else {
+			//else users should explicitly provide consumer arns.
+			streamConsumerArns = new HashMap<>();
+			for (String stream : streams) {
+				String key = ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX + "." + stream;
+				streamConsumerArns.put(stream, configProps.getProperty(key));
+			}
+		}
+
+		this.subscribeToShardMaxRetries = Integer.parseInt(
+			configProps.getProperty(
+				ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES)));
+		this.subscribeToShardBaseBackoffMillis = Long.parseLong(
+			configProps.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_BASE)));
+		this.subscribeToShardMaxBackoffMillis = Long.parseLong(
+			configProps.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_MAX)));
+		this.subscribeToShardExpConstant = Double.parseDouble(
+			configProps.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT)));
+
+		this.registerStreamBaseBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_BACKOFF_BASE)));
+		this.registerStreamMaxBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_BACKOFF_MAX)));
+		this.registerStreamExpConstant = Double.parseDouble(
+			configProps.getProperty(
+				ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT)));
+		this.registerStreamMaxRetries = Integer.parseInt(
+			configProps.getProperty(
+				ConsumerConfigConstants.REGISTER_STREAM_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_RETRIES)));
+
+		this.deregisterStreamBaseBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_BACKOFF_BASE)));
+		this.deregisterStreamMaxBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_BACKOFF_MAX)));
+		this.deregisterStreamExpConstant = Double.parseDouble(
+			configProps.getProperty(
+				ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT)));
+		this.deregisterStreamMaxRetries = Integer.parseInt(
+			configProps.getProperty(
+				ConsumerConfigConstants.DEREGISTER_STREAM_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_RETRIES)));
+
+		this.listStreamConsumersBaseBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.LIST_STREAM_CONSUMERS_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_LIST_STREAM_CONSUMERS_BACKOFF_BASE)));
+		this.listStreamConsumersMaxBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.LIST_STREAM_CONSUMERS_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_LIST_STREAM_CONSUMERS_BACKOFF_MAX)));
+		this.listStreamConsumersExpConstant = Double.parseDouble(
+			configProps.getProperty(
+				ConsumerConfigConstants.LIST_STREAM_CONSUMERS_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_LIST_STREAM_CONSUMERS_BACKOFF_EXPONENTIAL_CONSTANT)));
+		this.listStreamConsumersMaxRetries = Integer.parseInt(
+			configProps.getProperty(
+				ConsumerConfigConstants.LIST_STREAM_CONSUMERS_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_LIST_STREAM_CONSUMERS_RETRIES)));
+	}
+
+	// ------------------------------------------------------------------------
+	//  subscribeToShard() related performance settings
+	// ------------------------------------------------------------------------
+	/**
+	 * Get maximum retry attempts for the subscribe to shard operation.
+	 */
+	public int getSubscribeToShardMaxRetries() {
+		return subscribeToShardMaxRetries;
+	}
+
+	/**
+	 * Get maximum backoff millis for the subscribe to shard operation.
+	 */
+	public long getSubscribeToShardMaxBackoffMillis() {
+		return subscribeToShardMaxBackoffMillis;
+	}
+
+	/**
+	 * Get base backoff millis for the subscribe to shard operation.
+	 */
+	public long getSubscribeToShardBaseBackoffMillis() {
+		return subscribeToShardBaseBackoffMillis;
+	}
+
+	/**
+	 * Get exponential backoff power constant for the subscribe to shard operation.
+	 */
+	public double getSubscribeToShardExpConstant() {
+		return subscribeToShardExpConstant;
+	}
+	// ------------------------------------------------------------------------
+	//  registerStream() related performance settings
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Get base backoff millis for the register stream operation.
+	 */
+	public long getRegisterStreamBaseBackoffMillis() {
+		return registerStreamBaseBackoffMillis;
+	}
+
+	/**
+	 * Get maximum backoff millis for the register stream operation.
+	 */
+	public long getRegisterStreamMaxBackoffMillis() {
+		return registerStreamMaxBackoffMillis;
+	}
+
+	/**
+	 * Get exponential backoff power constant for the register stream operation.
+	 */
+	public double getRegisterStreamExpConstant() {
+		return registerStreamExpConstant;
+	}
+
+	/**
+	 * Get maximum retry attempts for the register stream operation.
+	 */
+	public int getRegisterStreamMaxRetries() {
+		return registerStreamMaxRetries;
+	}
+
+	// ------------------------------------------------------------------------
+	//  deregisterStream() related performance settings
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Get base backoff millis for the deregister stream operation.
+	 */
+	public long getDeregisterStreamBaseBackoffMillis() {
+		return deregisterStreamBaseBackoffMillis;
+	}
+
+	/**
+	 * Get maximum backoff millis for the deregister stream operation.
+	 */
+	public long getDeregisterStreamMaxBackoffMillis() {
+		return deregisterStreamMaxBackoffMillis;
+	}
+
+	/**
+	 * Get exponential backoff power constant for the deregister stream operation.
+	 */
+	public double getDeregisterStreamExpConstant() {
+		return deregisterStreamExpConstant;
+	}
+
+	/**
+	 * Get maximum retry attempts for the register stream operation.
+	 */
+	public int getDeregisterStreamMaxRetries() {
+		return deregisterStreamMaxRetries;
+	}
+	// ------------------------------------------------------------------------
+	//  listStream() related performance settings
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Get base backoff millis for the list stream consumers operation.
+	 */
+	public long getListStreamConsumersBaseBackoffMillis() {
+		return listStreamConsumersBaseBackoffMillis;
+	}
+
+	/**
+	 * Get maximum backoff millis for the list stream consumers operation.
+	 */
+	public long getListStreamConsumersMaxBackoffMillis() {
+		return listStreamConsumersMaxBackoffMillis;
+	}
+
+	/**
+	 * Get exponential backoff power constant for the list stream consumers operation.
+	 */
+	public double getListStreamConsumersExpConstant() {
+		return listStreamConsumersExpConstant;
+	}
+
+	/**
+	 * Get maximum retry attempts for the list stream consumers operation.
+	 */
+	public int getListStreamConsumersMaxRetries() {
+		return listStreamConsumersMaxRetries;
+	}
+
+	/**
+	 * Get efo registration type.
+	 */
+	public EFORegistrationType getEfoRegistrationType() {
+		return efoRegistrationType;
+	}
+
+	/**
+	 * Get consumer name, will be null if efo registration type is 'NONE'.
+	 */
+	@Nullable
+	public String getConsumerName() {
+		return consumerName;
+	}
+
+	/**
+	 * Get stream consumer arns, will be null if efo registration type is 'LAZY' or 'EAGER'.
+	 */
+	@Nullable
+	public Map<String, String> getStreamConsumerArns() {
+		return streamConsumerArns;
+	}
+
+	/**
+	 * Get the according consumer arn to the stream, will be null if efo registration type is 'LAZY' or 'EAGER'.
+	 */
+	@Nullable
+	public String getStreamConsumerArn(String stream) {
+		if (this.streamConsumerArns == null) {
+			return null;
+		}
+		return streamConsumerArns.get(stream);

Review comment:
       @xiaolong-sn nit: `Optional` means you do not need to null check:
   ```
   return Optional.ofNullable(streamConsumerArns).map(arns -> arns.get(stream));
   ```

##########
File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
##########
@@ -180,14 +182,108 @@ public void testUnrecognizableCredentialProviderTypeInConfig() {
 		KinesisConfigUtil.validateAwsConfiguration(testConfig);
 	}
 
+	// ----------------------------------------------------------------------
+	// validateRecordPublisherType() tests
+	// ----------------------------------------------------------------------
+	@Test
+	public void testNoRecordPublisherTypeInConfig() {
+		Properties testConfig = TestUtils.getStandardProperties();
+		ConsumerConfigConstants.RecordPublisherType recordPublisherType = KinesisConfigUtil.validateRecordPublisherType(testConfig);
+		assertEquals(recordPublisherType, ConsumerConfigConstants.RecordPublisherType.POLLING);
+	}
+
+	@Test
+	public void testUnrecognizableRecordPublisherTypeInConfig() {
+		String errorMessage = Arrays.stream(ConsumerConfigConstants.RecordPublisherType.values())
+			.map(Enum::name).collect(Collectors.joining(", "));
+		String msg = "Invalid record publisher type in stream set in config. Valid values are: " + errorMessage;
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage(msg);
+		Properties testConfig = TestUtils.getStandardProperties();
+		testConfig.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, "unrecognizableRecordPublisher");
+
+		KinesisConfigUtil.validateRecordPublisherType(testConfig);
+	}
+	// ----------------------------------------------------------------------
+	// validateEfoConfiguration() tests

Review comment:
       @xiaolong-sn This comment looks like it should be on the line below?

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
##########
@@ -169,6 +245,72 @@ public static void validateConsumerConfiguration(Properties config) {
 		}
 	}
 
+	/**
+	 * Validate the record publisher type.
+	 * @param config config properties
+	 * @return if `ConsumerConfigConstants.RECORD_PUBLISHER_TYPE` is set, return the parsed record publisher type. Else return polling record publisher type.
+	 */
+	public static RecordPublisherType validateRecordPublisherType(Properties config) {
+		if (config.containsKey(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE)) {
+			String recordPublisherType = config.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE);
+
+			// specified record publisher type in stream must be either EFO or POLLING
+			try {
+				return RecordPublisherType.valueOf(recordPublisherType);
+			} catch (IllegalArgumentException e) {
+				String errorMessage = Arrays.stream(RecordPublisherType.values())
+					.map(Enum::name).collect(Collectors.joining(", "));
+				throw new IllegalArgumentException("Invalid record publisher type in stream set in config. Valid values are: " + errorMessage);
+			}
+		} else {
+			return RecordPublisherType.POLLING;
+		}
+	}
+
+	/**
+	 * Validate if the given config is a valid EFO configuration.
+	 * @param config  config properties.
+	 * @param streams the streams which is sent to match the EFO consumer arn if the EFO registration mode is set to `NONE`.
+	 */
+	public static void validateEfoConfiguration(Properties config, List<String> streams) {
+		String efoRegistrationType;

Review comment:
       @xiaolong-sn this String could be an `enum` instead, then you will not need to convert it back to none below

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties {
+
+	/**
+	 * The efo registration type for de-/registration of streams.
+	 */
+	private final EFORegistrationType efoRegistrationType;
+
+	/**
+	 * The efo stream consumer name. Should not be Null if the efoRegistrationType is either LAZY or EAGER.
+	 */
+	@Nullable
+	private String consumerName;
+
+	/**
+	 * The manual set efo consumer arns for each stream. Should not be Null if the efoRegistrationType is NONE
+	 */
+	@Nullable
+	private Map<String, String> streamConsumerArns;
+
+	/**
+	 * Base backoff millis for the deregister stream operation.
+	 */
+	private final int subscribeToShardMaxRetries;
+
+	/**
+	 * Maximum backoff millis for the subscribe to shard operation.
+	 */
+	private final long subscribeToShardMaxBackoffMillis;
+
+	/**
+	 * Base backoff millis for the subscribe to shard operation.
+	 */
+	private final long subscribeToShardBaseBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the subscribe to shard operation.
+	 */
+	private final double subscribeToShardExpConstant;
+
+	/**
+	 * Base backoff millis for the register stream operation.
+	 */
+	private final long registerStreamBaseBackoffMillis;
+
+	/**
+	 * Maximum backoff millis for the register stream operation.
+	 */
+	private final long registerStreamMaxBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the register stream operation.
+	 */
+	private final double registerStreamExpConstant;
+
+	/**
+	 * Maximum retry attempts for the register stream operation.
+	 */
+	private final int registerStreamMaxRetries;
+
+	/**
+	 * Base backoff millis for the deregister stream operation.
+	 */
+	private final long deregisterStreamBaseBackoffMillis;
+
+	/**
+	 * Maximum backoff millis for the deregister stream operation.
+	 */
+	private final long deregisterStreamMaxBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the deregister stream operation.
+	 */
+	private final double deregisterStreamExpConstant;
+
+	/**
+	 * Maximum retry attempts for the deregister stream operation.
+	 */
+	private final int deregisterStreamMaxRetries;
+
+	/**
+	 * Base backoff millis for the list stream operation.
+	 */
+	private final long listStreamConsumersBaseBackoffMillis;
+
+	/**
+	 * Maximum backoff millis for the list stream operation.
+	 */
+	private final long listStreamConsumersMaxBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the list stream operation.
+	 */
+	private final double listStreamConsumersExpConstant;
+
+	/**
+	 * Maximum retry attempts for the list stream operation.
+	 */
+	private final int listStreamConsumersMaxRetries;
+
+	// ------------------------------------------------------------------------
+	//  registerStream() related performance settings
+	// ------------------------------------------------------------------------
+	/**
+	 * Creates a FanOutProperties.
+	 *
+	 * @param configProps the configuration properties from config file.
+	 * @param streams     the streams which is sent to match the EFO consumer arn if the EFO registration mode is set to `NONE`.
+	 */
+	public FanOutProperties(Properties configProps, List<String> streams) {
+		Preconditions.checkArgument(configProps.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE).equals(RecordPublisherType.EFO.toString()), "Only efo record publisher can register a FanOutProperties.");
+		KinesisConfigUtil.validateEfoConfiguration(configProps, streams);
+
+		efoRegistrationType = EFORegistrationType.valueOf(configProps.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, EFORegistrationType.EAGER.toString()));
+		//if efo registration type is EAGER|LAZY, then user should explicitly provide a consumer name for each stream.
+		if (efoRegistrationType == EFORegistrationType.EAGER || efoRegistrationType == EFORegistrationType.LAZY) {
+			consumerName = configProps.getProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME);
+		} else {
+			//else users should explicitly provide consumer arns.
+			streamConsumerArns = new HashMap<>();
+			for (String stream : streams) {
+				String key = ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX + "." + stream;
+				streamConsumerArns.put(stream, configProps.getProperty(key));
+			}
+		}
+
+		this.subscribeToShardMaxRetries = Integer.parseInt(
+			configProps.getProperty(
+				ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES)));
+		this.subscribeToShardBaseBackoffMillis = Long.parseLong(
+			configProps.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_BASE)));
+		this.subscribeToShardMaxBackoffMillis = Long.parseLong(
+			configProps.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_MAX)));
+		this.subscribeToShardExpConstant = Double.parseDouble(
+			configProps.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT)));
+
+		this.registerStreamBaseBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_BACKOFF_BASE)));
+		this.registerStreamMaxBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_BACKOFF_MAX)));
+		this.registerStreamExpConstant = Double.parseDouble(
+			configProps.getProperty(
+				ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT)));
+		this.registerStreamMaxRetries = Integer.parseInt(
+			configProps.getProperty(
+				ConsumerConfigConstants.REGISTER_STREAM_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_RETRIES)));
+
+		this.deregisterStreamBaseBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_BACKOFF_BASE)));
+		this.deregisterStreamMaxBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_BACKOFF_MAX)));
+		this.deregisterStreamExpConstant = Double.parseDouble(
+			configProps.getProperty(
+				ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT)));
+		this.deregisterStreamMaxRetries = Integer.parseInt(
+			configProps.getProperty(
+				ConsumerConfigConstants.DEREGISTER_STREAM_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_RETRIES)));
+
+		this.listStreamConsumersBaseBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.LIST_STREAM_CONSUMERS_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_LIST_STREAM_CONSUMERS_BACKOFF_BASE)));
+		this.listStreamConsumersMaxBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.LIST_STREAM_CONSUMERS_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_LIST_STREAM_CONSUMERS_BACKOFF_MAX)));
+		this.listStreamConsumersExpConstant = Double.parseDouble(
+			configProps.getProperty(
+				ConsumerConfigConstants.LIST_STREAM_CONSUMERS_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_LIST_STREAM_CONSUMERS_BACKOFF_EXPONENTIAL_CONSTANT)));
+		this.listStreamConsumersMaxRetries = Integer.parseInt(
+			configProps.getProperty(
+				ConsumerConfigConstants.LIST_STREAM_CONSUMERS_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_LIST_STREAM_CONSUMERS_RETRIES)));
+	}
+
+	// ------------------------------------------------------------------------
+	//  subscribeToShard() related performance settings
+	// ------------------------------------------------------------------------
+	/**
+	 * Get maximum retry attempts for the subscribe to shard operation.
+	 */
+	public int getSubscribeToShardMaxRetries() {
+		return subscribeToShardMaxRetries;
+	}
+
+	/**
+	 * Get maximum backoff millis for the subscribe to shard operation.
+	 */
+	public long getSubscribeToShardMaxBackoffMillis() {
+		return subscribeToShardMaxBackoffMillis;
+	}
+
+	/**
+	 * Get base backoff millis for the subscribe to shard operation.
+	 */
+	public long getSubscribeToShardBaseBackoffMillis() {
+		return subscribeToShardBaseBackoffMillis;
+	}
+
+	/**
+	 * Get exponential backoff power constant for the subscribe to shard operation.
+	 */
+	public double getSubscribeToShardExpConstant() {
+		return subscribeToShardExpConstant;
+	}
+	// ------------------------------------------------------------------------
+	//  registerStream() related performance settings
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Get base backoff millis for the register stream operation.
+	 */
+	public long getRegisterStreamBaseBackoffMillis() {
+		return registerStreamBaseBackoffMillis;
+	}
+
+	/**
+	 * Get maximum backoff millis for the register stream operation.
+	 */
+	public long getRegisterStreamMaxBackoffMillis() {
+		return registerStreamMaxBackoffMillis;
+	}
+
+	/**
+	 * Get exponential backoff power constant for the register stream operation.
+	 */
+	public double getRegisterStreamExpConstant() {
+		return registerStreamExpConstant;
+	}
+
+	/**
+	 * Get maximum retry attempts for the register stream operation.
+	 */
+	public int getRegisterStreamMaxRetries() {
+		return registerStreamMaxRetries;
+	}
+
+	// ------------------------------------------------------------------------
+	//  deregisterStream() related performance settings
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Get base backoff millis for the deregister stream operation.
+	 */
+	public long getDeregisterStreamBaseBackoffMillis() {
+		return deregisterStreamBaseBackoffMillis;
+	}
+
+	/**
+	 * Get maximum backoff millis for the deregister stream operation.
+	 */
+	public long getDeregisterStreamMaxBackoffMillis() {
+		return deregisterStreamMaxBackoffMillis;
+	}
+
+	/**
+	 * Get exponential backoff power constant for the deregister stream operation.
+	 */
+	public double getDeregisterStreamExpConstant() {
+		return deregisterStreamExpConstant;
+	}
+
+	/**
+	 * Get maximum retry attempts for the register stream operation.
+	 */
+	public int getDeregisterStreamMaxRetries() {
+		return deregisterStreamMaxRetries;
+	}
+	// ------------------------------------------------------------------------
+	//  listStream() related performance settings

Review comment:
       @xiaolong-sn `listStreamConsumers`

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties {
+
+	/**
+	 * The efo registration type for de-/registration of streams.
+	 */
+	private final EFORegistrationType efoRegistrationType;
+
+	/**
+	 * The efo stream consumer name. Should not be Null if the efoRegistrationType is either LAZY or EAGER.
+	 */
+	@Nullable
+	private String consumerName;
+
+	/**
+	 * The manual set efo consumer arns for each stream. Should not be Null if the efoRegistrationType is NONE
+	 */
+	@Nullable
+	private Map<String, String> streamConsumerArns;
+
+	/**
+	 * Base backoff millis for the deregister stream operation.
+	 */
+	private final int subscribeToShardMaxRetries;
+
+	/**
+	 * Maximum backoff millis for the subscribe to shard operation.
+	 */
+	private final long subscribeToShardMaxBackoffMillis;
+
+	/**
+	 * Base backoff millis for the subscribe to shard operation.
+	 */
+	private final long subscribeToShardBaseBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the subscribe to shard operation.
+	 */
+	private final double subscribeToShardExpConstant;
+
+	/**
+	 * Base backoff millis for the register stream operation.
+	 */
+	private final long registerStreamBaseBackoffMillis;
+
+	/**
+	 * Maximum backoff millis for the register stream operation.
+	 */
+	private final long registerStreamMaxBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the register stream operation.
+	 */
+	private final double registerStreamExpConstant;
+
+	/**
+	 * Maximum retry attempts for the register stream operation.
+	 */
+	private final int registerStreamMaxRetries;
+
+	/**
+	 * Base backoff millis for the deregister stream operation.
+	 */
+	private final long deregisterStreamBaseBackoffMillis;
+
+	/**
+	 * Maximum backoff millis for the deregister stream operation.
+	 */
+	private final long deregisterStreamMaxBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the deregister stream operation.
+	 */
+	private final double deregisterStreamExpConstant;
+
+	/**
+	 * Maximum retry attempts for the deregister stream operation.
+	 */
+	private final int deregisterStreamMaxRetries;
+
+	/**
+	 * Base backoff millis for the list stream operation.
+	 */
+	private final long listStreamConsumersBaseBackoffMillis;
+
+	/**
+	 * Maximum backoff millis for the list stream operation.
+	 */
+	private final long listStreamConsumersMaxBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the list stream operation.
+	 */
+	private final double listStreamConsumersExpConstant;
+
+	/**
+	 * Maximum retry attempts for the list stream operation.
+	 */
+	private final int listStreamConsumersMaxRetries;
+
+	// ------------------------------------------------------------------------
+	//  registerStream() related performance settings
+	// ------------------------------------------------------------------------
+	/**
+	 * Creates a FanOutProperties.
+	 *
+	 * @param configProps the configuration properties from config file.
+	 * @param streams     the streams which is sent to match the EFO consumer arn if the EFO registration mode is set to `NONE`.
+	 */
+	public FanOutProperties(Properties configProps, List<String> streams) {
+		Preconditions.checkArgument(configProps.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE).equals(RecordPublisherType.EFO.toString()), "Only efo record publisher can register a FanOutProperties.");
+		KinesisConfigUtil.validateEfoConfiguration(configProps, streams);
+
+		efoRegistrationType = EFORegistrationType.valueOf(configProps.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, EFORegistrationType.EAGER.toString()));
+		//if efo registration type is EAGER|LAZY, then user should explicitly provide a consumer name for each stream.
+		if (efoRegistrationType == EFORegistrationType.EAGER || efoRegistrationType == EFORegistrationType.LAZY) {
+			consumerName = configProps.getProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME);
+		} else {
+			//else users should explicitly provide consumer arns.
+			streamConsumerArns = new HashMap<>();
+			for (String stream : streams) {
+				String key = ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX + "." + stream;
+				streamConsumerArns.put(stream, configProps.getProperty(key));
+			}
+		}
+
+		this.subscribeToShardMaxRetries = Integer.parseInt(
+			configProps.getProperty(
+				ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES)));
+		this.subscribeToShardBaseBackoffMillis = Long.parseLong(
+			configProps.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_BASE)));
+		this.subscribeToShardMaxBackoffMillis = Long.parseLong(
+			configProps.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_MAX)));
+		this.subscribeToShardExpConstant = Double.parseDouble(
+			configProps.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT)));
+
+		this.registerStreamBaseBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_BACKOFF_BASE)));
+		this.registerStreamMaxBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_BACKOFF_MAX)));
+		this.registerStreamExpConstant = Double.parseDouble(
+			configProps.getProperty(
+				ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT)));
+		this.registerStreamMaxRetries = Integer.parseInt(
+			configProps.getProperty(
+				ConsumerConfigConstants.REGISTER_STREAM_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_RETRIES)));
+
+		this.deregisterStreamBaseBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_BACKOFF_BASE)));
+		this.deregisterStreamMaxBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_BACKOFF_MAX)));
+		this.deregisterStreamExpConstant = Double.parseDouble(
+			configProps.getProperty(
+				ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT)));
+		this.deregisterStreamMaxRetries = Integer.parseInt(
+			configProps.getProperty(
+				ConsumerConfigConstants.DEREGISTER_STREAM_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_RETRIES)));
+
+		this.listStreamConsumersBaseBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.LIST_STREAM_CONSUMERS_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_LIST_STREAM_CONSUMERS_BACKOFF_BASE)));
+		this.listStreamConsumersMaxBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.LIST_STREAM_CONSUMERS_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_LIST_STREAM_CONSUMERS_BACKOFF_MAX)));
+		this.listStreamConsumersExpConstant = Double.parseDouble(
+			configProps.getProperty(
+				ConsumerConfigConstants.LIST_STREAM_CONSUMERS_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_LIST_STREAM_CONSUMERS_BACKOFF_EXPONENTIAL_CONSTANT)));
+		this.listStreamConsumersMaxRetries = Integer.parseInt(
+			configProps.getProperty(
+				ConsumerConfigConstants.LIST_STREAM_CONSUMERS_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_LIST_STREAM_CONSUMERS_RETRIES)));
+	}
+
+	// ------------------------------------------------------------------------
+	//  subscribeToShard() related performance settings
+	// ------------------------------------------------------------------------
+	/**
+	 * Get maximum retry attempts for the subscribe to shard operation.
+	 */
+	public int getSubscribeToShardMaxRetries() {
+		return subscribeToShardMaxRetries;
+	}
+
+	/**
+	 * Get maximum backoff millis for the subscribe to shard operation.
+	 */
+	public long getSubscribeToShardMaxBackoffMillis() {
+		return subscribeToShardMaxBackoffMillis;
+	}
+
+	/**
+	 * Get base backoff millis for the subscribe to shard operation.
+	 */
+	public long getSubscribeToShardBaseBackoffMillis() {
+		return subscribeToShardBaseBackoffMillis;
+	}
+
+	/**
+	 * Get exponential backoff power constant for the subscribe to shard operation.
+	 */
+	public double getSubscribeToShardExpConstant() {
+		return subscribeToShardExpConstant;
+	}
+	// ------------------------------------------------------------------------
+	//  registerStream() related performance settings
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Get base backoff millis for the register stream operation.
+	 */
+	public long getRegisterStreamBaseBackoffMillis() {
+		return registerStreamBaseBackoffMillis;
+	}
+
+	/**
+	 * Get maximum backoff millis for the register stream operation.
+	 */
+	public long getRegisterStreamMaxBackoffMillis() {
+		return registerStreamMaxBackoffMillis;
+	}
+
+	/**
+	 * Get exponential backoff power constant for the register stream operation.
+	 */
+	public double getRegisterStreamExpConstant() {
+		return registerStreamExpConstant;
+	}
+
+	/**
+	 * Get maximum retry attempts for the register stream operation.
+	 */
+	public int getRegisterStreamMaxRetries() {
+		return registerStreamMaxRetries;
+	}
+
+	// ------------------------------------------------------------------------
+	//  deregisterStream() related performance settings
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Get base backoff millis for the deregister stream operation.
+	 */
+	public long getDeregisterStreamBaseBackoffMillis() {
+		return deregisterStreamBaseBackoffMillis;
+	}
+
+	/**
+	 * Get maximum backoff millis for the deregister stream operation.
+	 */
+	public long getDeregisterStreamMaxBackoffMillis() {
+		return deregisterStreamMaxBackoffMillis;
+	}
+
+	/**
+	 * Get exponential backoff power constant for the deregister stream operation.
+	 */
+	public double getDeregisterStreamExpConstant() {
+		return deregisterStreamExpConstant;
+	}
+
+	/**
+	 * Get maximum retry attempts for the register stream operation.
+	 */
+	public int getDeregisterStreamMaxRetries() {
+		return deregisterStreamMaxRetries;
+	}
+	// ------------------------------------------------------------------------
+	//  listStream() related performance settings
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Get base backoff millis for the list stream consumers operation.
+	 */
+	public long getListStreamConsumersBaseBackoffMillis() {
+		return listStreamConsumersBaseBackoffMillis;
+	}
+
+	/**
+	 * Get maximum backoff millis for the list stream consumers operation.
+	 */
+	public long getListStreamConsumersMaxBackoffMillis() {
+		return listStreamConsumersMaxBackoffMillis;
+	}
+
+	/**
+	 * Get exponential backoff power constant for the list stream consumers operation.
+	 */
+	public double getListStreamConsumersExpConstant() {
+		return listStreamConsumersExpConstant;
+	}
+
+	/**
+	 * Get maximum retry attempts for the list stream consumers operation.
+	 */
+	public int getListStreamConsumersMaxRetries() {
+		return listStreamConsumersMaxRetries;
+	}
+
+	/**
+	 * Get efo registration type.
+	 */
+	public EFORegistrationType getEfoRegistrationType() {
+		return efoRegistrationType;
+	}
+
+	/**
+	 * Get consumer name, will be null if efo registration type is 'NONE'.
+	 */
+	@Nullable
+	public String getConsumerName() {

Review comment:
       @xiaolong-sn This is a [good usecase](https://flink.apache.org/contributing/code-style-and-quality-java.html#java-optional) for `Optional` if possible

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties {
+
+	/**
+	 * The efo registration type for de-/registration of streams.
+	 */
+	private final EFORegistrationType efoRegistrationType;
+
+	/**
+	 * The efo stream consumer name. Should not be Null if the efoRegistrationType is either LAZY or EAGER.
+	 */
+	@Nullable
+	private String consumerName;
+
+	/**
+	 * The manual set efo consumer arns for each stream. Should not be Null if the efoRegistrationType is NONE
+	 */
+	@Nullable
+	private Map<String, String> streamConsumerArns;
+
+	/**
+	 * Base backoff millis for the deregister stream operation.
+	 */
+	private final int subscribeToShardMaxRetries;
+
+	/**
+	 * Maximum backoff millis for the subscribe to shard operation.
+	 */
+	private final long subscribeToShardMaxBackoffMillis;
+
+	/**
+	 * Base backoff millis for the subscribe to shard operation.
+	 */
+	private final long subscribeToShardBaseBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the subscribe to shard operation.
+	 */
+	private final double subscribeToShardExpConstant;
+
+	/**
+	 * Base backoff millis for the register stream operation.
+	 */
+	private final long registerStreamBaseBackoffMillis;
+
+	/**
+	 * Maximum backoff millis for the register stream operation.
+	 */
+	private final long registerStreamMaxBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the register stream operation.
+	 */
+	private final double registerStreamExpConstant;
+
+	/**
+	 * Maximum retry attempts for the register stream operation.
+	 */
+	private final int registerStreamMaxRetries;
+
+	/**
+	 * Base backoff millis for the deregister stream operation.
+	 */
+	private final long deregisterStreamBaseBackoffMillis;
+
+	/**
+	 * Maximum backoff millis for the deregister stream operation.
+	 */
+	private final long deregisterStreamMaxBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the deregister stream operation.
+	 */
+	private final double deregisterStreamExpConstant;
+
+	/**
+	 * Maximum retry attempts for the deregister stream operation.
+	 */
+	private final int deregisterStreamMaxRetries;
+
+	/**
+	 * Base backoff millis for the list stream operation.
+	 */
+	private final long listStreamConsumersBaseBackoffMillis;
+
+	/**
+	 * Maximum backoff millis for the list stream operation.
+	 */
+	private final long listStreamConsumersMaxBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the list stream operation.
+	 */
+	private final double listStreamConsumersExpConstant;
+
+	/**
+	 * Maximum retry attempts for the list stream operation.
+	 */
+	private final int listStreamConsumersMaxRetries;
+
+	// ------------------------------------------------------------------------
+	//  registerStream() related performance settings
+	// ------------------------------------------------------------------------
+	/**
+	 * Creates a FanOutProperties.
+	 *
+	 * @param configProps the configuration properties from config file.
+	 * @param streams     the streams which is sent to match the EFO consumer arn if the EFO registration mode is set to `NONE`.
+	 */
+	public FanOutProperties(Properties configProps, List<String> streams) {
+		Preconditions.checkArgument(configProps.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE).equals(RecordPublisherType.EFO.toString()), "Only efo record publisher can register a FanOutProperties.");
+		KinesisConfigUtil.validateEfoConfiguration(configProps, streams);
+
+		efoRegistrationType = EFORegistrationType.valueOf(configProps.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, EFORegistrationType.EAGER.toString()));
+		//if efo registration type is EAGER|LAZY, then user should explicitly provide a consumer name for each stream.
+		if (efoRegistrationType == EFORegistrationType.EAGER || efoRegistrationType == EFORegistrationType.LAZY) {
+			consumerName = configProps.getProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME);
+		} else {
+			//else users should explicitly provide consumer arns.
+			streamConsumerArns = new HashMap<>();
+			for (String stream : streams) {
+				String key = ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX + "." + stream;
+				streamConsumerArns.put(stream, configProps.getProperty(key));
+			}
+		}
+
+		this.subscribeToShardMaxRetries = Integer.parseInt(
+			configProps.getProperty(
+				ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES)));
+		this.subscribeToShardBaseBackoffMillis = Long.parseLong(
+			configProps.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_BASE)));
+		this.subscribeToShardMaxBackoffMillis = Long.parseLong(
+			configProps.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_MAX)));
+		this.subscribeToShardExpConstant = Double.parseDouble(
+			configProps.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT)));
+
+		this.registerStreamBaseBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_BACKOFF_BASE)));
+		this.registerStreamMaxBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_BACKOFF_MAX)));
+		this.registerStreamExpConstant = Double.parseDouble(
+			configProps.getProperty(
+				ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT)));
+		this.registerStreamMaxRetries = Integer.parseInt(
+			configProps.getProperty(
+				ConsumerConfigConstants.REGISTER_STREAM_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_RETRIES)));
+
+		this.deregisterStreamBaseBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_BACKOFF_BASE)));
+		this.deregisterStreamMaxBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_BACKOFF_MAX)));
+		this.deregisterStreamExpConstant = Double.parseDouble(
+			configProps.getProperty(
+				ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT)));
+		this.deregisterStreamMaxRetries = Integer.parseInt(
+			configProps.getProperty(
+				ConsumerConfigConstants.DEREGISTER_STREAM_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_RETRIES)));
+
+		this.listStreamConsumersBaseBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.LIST_STREAM_CONSUMERS_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_LIST_STREAM_CONSUMERS_BACKOFF_BASE)));
+		this.listStreamConsumersMaxBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.LIST_STREAM_CONSUMERS_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_LIST_STREAM_CONSUMERS_BACKOFF_MAX)));
+		this.listStreamConsumersExpConstant = Double.parseDouble(
+			configProps.getProperty(
+				ConsumerConfigConstants.LIST_STREAM_CONSUMERS_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_LIST_STREAM_CONSUMERS_BACKOFF_EXPONENTIAL_CONSTANT)));
+		this.listStreamConsumersMaxRetries = Integer.parseInt(
+			configProps.getProperty(
+				ConsumerConfigConstants.LIST_STREAM_CONSUMERS_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_LIST_STREAM_CONSUMERS_RETRIES)));
+	}
+
+	// ------------------------------------------------------------------------
+	//  subscribeToShard() related performance settings
+	// ------------------------------------------------------------------------
+	/**
+	 * Get maximum retry attempts for the subscribe to shard operation.
+	 */
+	public int getSubscribeToShardMaxRetries() {
+		return subscribeToShardMaxRetries;
+	}
+
+	/**
+	 * Get maximum backoff millis for the subscribe to shard operation.
+	 */
+	public long getSubscribeToShardMaxBackoffMillis() {
+		return subscribeToShardMaxBackoffMillis;
+	}
+
+	/**
+	 * Get base backoff millis for the subscribe to shard operation.
+	 */
+	public long getSubscribeToShardBaseBackoffMillis() {
+		return subscribeToShardBaseBackoffMillis;
+	}
+
+	/**
+	 * Get exponential backoff power constant for the subscribe to shard operation.
+	 */
+	public double getSubscribeToShardExpConstant() {
+		return subscribeToShardExpConstant;
+	}
+	// ------------------------------------------------------------------------
+	//  registerStream() related performance settings
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Get base backoff millis for the register stream operation.
+	 */
+	public long getRegisterStreamBaseBackoffMillis() {
+		return registerStreamBaseBackoffMillis;
+	}
+
+	/**
+	 * Get maximum backoff millis for the register stream operation.
+	 */
+	public long getRegisterStreamMaxBackoffMillis() {
+		return registerStreamMaxBackoffMillis;
+	}
+
+	/**
+	 * Get exponential backoff power constant for the register stream operation.
+	 */
+	public double getRegisterStreamExpConstant() {
+		return registerStreamExpConstant;
+	}
+
+	/**
+	 * Get maximum retry attempts for the register stream operation.
+	 */
+	public int getRegisterStreamMaxRetries() {
+		return registerStreamMaxRetries;
+	}
+
+	// ------------------------------------------------------------------------
+	//  deregisterStream() related performance settings
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Get base backoff millis for the deregister stream operation.
+	 */
+	public long getDeregisterStreamBaseBackoffMillis() {
+		return deregisterStreamBaseBackoffMillis;
+	}
+
+	/**
+	 * Get maximum backoff millis for the deregister stream operation.
+	 */
+	public long getDeregisterStreamMaxBackoffMillis() {
+		return deregisterStreamMaxBackoffMillis;
+	}
+
+	/**
+	 * Get exponential backoff power constant for the deregister stream operation.
+	 */
+	public double getDeregisterStreamExpConstant() {
+		return deregisterStreamExpConstant;
+	}
+
+	/**
+	 * Get maximum retry attempts for the register stream operation.
+	 */
+	public int getDeregisterStreamMaxRetries() {
+		return deregisterStreamMaxRetries;
+	}
+	// ------------------------------------------------------------------------
+	//  listStream() related performance settings
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Get base backoff millis for the list stream consumers operation.
+	 */
+	public long getListStreamConsumersBaseBackoffMillis() {
+		return listStreamConsumersBaseBackoffMillis;
+	}
+
+	/**
+	 * Get maximum backoff millis for the list stream consumers operation.
+	 */
+	public long getListStreamConsumersMaxBackoffMillis() {
+		return listStreamConsumersMaxBackoffMillis;
+	}
+
+	/**
+	 * Get exponential backoff power constant for the list stream consumers operation.
+	 */
+	public double getListStreamConsumersExpConstant() {
+		return listStreamConsumersExpConstant;
+	}
+
+	/**
+	 * Get maximum retry attempts for the list stream consumers operation.
+	 */
+	public int getListStreamConsumersMaxRetries() {
+		return listStreamConsumersMaxRetries;
+	}
+
+	/**
+	 * Get efo registration type.
+	 */
+	public EFORegistrationType getEfoRegistrationType() {
+		return efoRegistrationType;
+	}
+
+	/**
+	 * Get consumer name, will be null if efo registration type is 'NONE'.
+	 */
+	@Nullable
+	public String getConsumerName() {
+		return consumerName;
+	}
+
+	/**
+	 * Get stream consumer arns, will be null if efo registration type is 'LAZY' or 'EAGER'.
+	 */
+	@Nullable
+	public Map<String, String> getStreamConsumerArns() {
+		return streamConsumerArns;
+	}
+
+	/**
+	 * Get the according consumer arn to the stream, will be null if efo registration type is 'LAZY' or 'EAGER'.
+	 */
+	@Nullable

Review comment:
       @xiaolong-sn same as above, use `Optional`

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
##########
@@ -169,6 +245,72 @@ public static void validateConsumerConfiguration(Properties config) {
 		}
 	}
 
+	/**
+	 * Validate the record publisher type.
+	 * @param config config properties
+	 * @return if `ConsumerConfigConstants.RECORD_PUBLISHER_TYPE` is set, return the parsed record publisher type. Else return polling record publisher type.
+	 */
+	public static RecordPublisherType validateRecordPublisherType(Properties config) {
+		if (config.containsKey(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE)) {
+			String recordPublisherType = config.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE);
+
+			// specified record publisher type in stream must be either EFO or POLLING
+			try {
+				return RecordPublisherType.valueOf(recordPublisherType);
+			} catch (IllegalArgumentException e) {
+				String errorMessage = Arrays.stream(RecordPublisherType.values())
+					.map(Enum::name).collect(Collectors.joining(", "));
+				throw new IllegalArgumentException("Invalid record publisher type in stream set in config. Valid values are: " + errorMessage);
+			}
+		} else {
+			return RecordPublisherType.POLLING;
+		}
+	}
+
+	/**
+	 * Validate if the given config is a valid EFO configuration.
+	 * @param config  config properties.
+	 * @param streams the streams which is sent to match the EFO consumer arn if the EFO registration mode is set to `NONE`.
+	 */
+	public static void validateEfoConfiguration(Properties config, List<String> streams) {
+		String efoRegistrationType;

Review comment:
       @xiaolong-sn nit: this String could be an `enum` instead, then you will not need to convert it back to none below

##########
File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
##########
@@ -180,14 +182,108 @@ public void testUnrecognizableCredentialProviderTypeInConfig() {
 		KinesisConfigUtil.validateAwsConfiguration(testConfig);
 	}
 
+	// ----------------------------------------------------------------------
+	// validateRecordPublisherType() tests
+	// ----------------------------------------------------------------------
+	@Test
+	public void testNoRecordPublisherTypeInConfig() {
+		Properties testConfig = TestUtils.getStandardProperties();
+		ConsumerConfigConstants.RecordPublisherType recordPublisherType = KinesisConfigUtil.validateRecordPublisherType(testConfig);
+		assertEquals(recordPublisherType, ConsumerConfigConstants.RecordPublisherType.POLLING);
+	}
+
+	@Test
+	public void testUnrecognizableRecordPublisherTypeInConfig() {
+		String errorMessage = Arrays.stream(ConsumerConfigConstants.RecordPublisherType.values())
+			.map(Enum::name).collect(Collectors.joining(", "));
+		String msg = "Invalid record publisher type in stream set in config. Valid values are: " + errorMessage;
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage(msg);
+		Properties testConfig = TestUtils.getStandardProperties();
+		testConfig.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, "unrecognizableRecordPublisher");
+
+		KinesisConfigUtil.validateRecordPublisherType(testConfig);
+	}
+	// ----------------------------------------------------------------------
+	// validateEfoConfiguration() tests

Review comment:
       @xiaolong-sn nit: This comment looks like it should be on the line below?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12920:
URL: https://github.com/apache/flink/pull/12920#issuecomment-659996635


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5f259aa1cf05a64249f9ac6edfb6971bb9b33841",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4583",
       "triggerID" : "5f259aa1cf05a64249f9ac6edfb6971bb9b33841",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cb726b5ff64a0d57fa13a5033a354828447c2b99",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4658",
       "triggerID" : "cb726b5ff64a0d57fa13a5033a354828447c2b99",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5aef6d842709a1ec308fb8cfe89e0928ebf19f7e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4660",
       "triggerID" : "5aef6d842709a1ec308fb8cfe89e0928ebf19f7e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "68304c8bc1e5ccae037269302bd4c15ea41dc7a8",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4679",
       "triggerID" : "68304c8bc1e5ccae037269302bd4c15ea41dc7a8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5aef6d842709a1ec308fb8cfe89e0928ebf19f7e Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4660) 
   * 68304c8bc1e5ccae037269302bd4c15ea41dc7a8 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4679) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on pull request #12920:
URL: https://github.com/apache/flink/pull/12920#issuecomment-661807806


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12920:
URL: https://github.com/apache/flink/pull/12920#issuecomment-659996635


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5f259aa1cf05a64249f9ac6edfb6971bb9b33841",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4583",
       "triggerID" : "5f259aa1cf05a64249f9ac6edfb6971bb9b33841",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cb726b5ff64a0d57fa13a5033a354828447c2b99",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4658",
       "triggerID" : "cb726b5ff64a0d57fa13a5033a354828447c2b99",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5aef6d842709a1ec308fb8cfe89e0928ebf19f7e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4660",
       "triggerID" : "5aef6d842709a1ec308fb8cfe89e0928ebf19f7e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "68304c8bc1e5ccae037269302bd4c15ea41dc7a8",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4679",
       "triggerID" : "68304c8bc1e5ccae037269302bd4c15ea41dc7a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "68304c8bc1e5ccae037269302bd4c15ea41dc7a8",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4815",
       "triggerID" : "661623165",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "68304c8bc1e5ccae037269302bd4c15ea41dc7a8",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4816",
       "triggerID" : "661807806",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "68304c8bc1e5ccae037269302bd4c15ea41dc7a8",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4679",
       "triggerID" : "661623165",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "68304c8bc1e5ccae037269302bd4c15ea41dc7a8",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4679",
       "triggerID" : "661807806",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "68304c8bc1e5ccae037269302bd4c15ea41dc7a8",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4815",
       "triggerID" : "661807806",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 68304c8bc1e5ccae037269302bd4c15ea41dc7a8 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4816) Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4679) Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4815) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #12920:
URL: https://github.com/apache/flink/pull/12920#discussion_r457980119



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties {
+
+	/**
+	 * The efo registration type for de-/registration of streams.
+	 */
+	private final EFORegistrationType efoRegistrationType;
+
+	/**
+	 * The efo stream consumer name. Should not be Null if the efoRegistrationType is either LAZY or EAGER.
+	 */
+	@Nullable
+	private String consumerName;
+
+	/**
+	 * The manual set efo consumer arns for each stream. Should not be Null if the efoRegistrationType is NONE
+	 */
+	@Nullable
+	private Map<String, String> streamConsumerArns;
+
+	/**
+	 * Base backoff millis for the deregister stream operation.
+	 */
+	private final int subscribeToShardMaxRetries;
+
+	/**
+	 * Maximum backoff millis for the subscribe to shard operation.
+	 */
+	private final long subscribeToShardMaxBackoffMillis;
+
+	/**
+	 * Base backoff millis for the subscribe to shard operation.
+	 */
+	private final long subscribeToShardBaseBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the subscribe to shard operation.
+	 */
+	private final double subscribeToShardExpConstant;
+
+	/**
+	 * Base backoff millis for the register stream operation.
+	 */
+	private final long registerStreamBaseBackoffMillis;
+
+	/**
+	 * Maximum backoff millis for the register stream operation.
+	 */
+	private final long registerStreamMaxBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the register stream operation.
+	 */
+	private final double registerStreamExpConstant;
+
+	/**
+	 * Maximum retry attempts for the register stream operation.
+	 */
+	private final int registerStreamMaxRetries;
+
+	/**
+	 * Base backoff millis for the deregister stream operation.
+	 */
+	private final long deregisterStreamBaseBackoffMillis;
+
+	/**
+	 * Maximum backoff millis for the deregister stream operation.
+	 */
+	private final long deregisterStreamMaxBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the deregister stream operation.
+	 */
+	private final double deregisterStreamExpConstant;
+
+	/**
+	 * Maximum retry attempts for the deregister stream operation.
+	 */
+	private final int deregisterStreamMaxRetries;
+
+	/**
+	 * Base backoff millis for the list stream operation.
+	 */
+	private final long listStreamConsumersBaseBackoffMillis;
+
+	/**
+	 * Maximum backoff millis for the list stream operation.
+	 */
+	private final long listStreamConsumersMaxBackoffMillis;
+
+	/**
+	 * Exponential backoff power constant for the list stream operation.
+	 */
+	private final double listStreamConsumersExpConstant;
+
+	/**
+	 * Maximum retry attempts for the list stream operation.
+	 */
+	private final int listStreamConsumersMaxRetries;
+
+	// ------------------------------------------------------------------------
+	//  registerStream() related performance settings
+	// ------------------------------------------------------------------------
+	/**
+	 * Creates a FanOutProperties.
+	 *
+	 * @param configProps the configuration properties from config file.
+	 * @param streams     the streams which is sent to match the EFO consumer arn if the EFO registration mode is set to `NONE`.
+	 */
+	public FanOutProperties(Properties configProps, List<String> streams) {
+		Preconditions.checkArgument(configProps.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE).equals(RecordPublisherType.EFO.toString()), "Only efo record publisher can register a FanOutProperties.");
+		KinesisConfigUtil.validateEfoConfiguration(configProps, streams);
+
+		efoRegistrationType = EFORegistrationType.valueOf(configProps.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, EFORegistrationType.EAGER.toString()));
+		//if efo registration type is EAGER|LAZY, then user should explicitly provide a consumer name for each stream.
+		if (efoRegistrationType == EFORegistrationType.EAGER || efoRegistrationType == EFORegistrationType.LAZY) {
+			consumerName = configProps.getProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME);
+		} else {
+			//else users should explicitly provide consumer arns.
+			streamConsumerArns = new HashMap<>();
+			for (String stream : streams) {
+				String key = ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX + "." + stream;
+				streamConsumerArns.put(stream, configProps.getProperty(key));
+			}
+		}
+
+		this.subscribeToShardMaxRetries = Integer.parseInt(
+			configProps.getProperty(
+				ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES)));
+		this.subscribeToShardBaseBackoffMillis = Long.parseLong(
+			configProps.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_BASE)));
+		this.subscribeToShardMaxBackoffMillis = Long.parseLong(
+			configProps.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_MAX)));
+		this.subscribeToShardExpConstant = Double.parseDouble(
+			configProps.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT)));
+
+		this.registerStreamBaseBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_BACKOFF_BASE)));
+		this.registerStreamMaxBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_BACKOFF_MAX)));
+		this.registerStreamExpConstant = Double.parseDouble(
+			configProps.getProperty(
+				ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT)));
+		this.registerStreamMaxRetries = Integer.parseInt(
+			configProps.getProperty(
+				ConsumerConfigConstants.REGISTER_STREAM_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_REGISTER_STREAM_RETRIES)));
+
+		this.deregisterStreamBaseBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_BACKOFF_BASE)));
+		this.deregisterStreamMaxBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_BACKOFF_MAX)));
+		this.deregisterStreamExpConstant = Double.parseDouble(
+			configProps.getProperty(
+				ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT)));
+		this.deregisterStreamMaxRetries = Integer.parseInt(
+			configProps.getProperty(
+				ConsumerConfigConstants.DEREGISTER_STREAM_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_DEREGISTER_STREAM_RETRIES)));
+
+		this.listStreamConsumersBaseBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.LIST_STREAM_CONSUMERS_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_LIST_STREAM_CONSUMERS_BACKOFF_BASE)));
+		this.listStreamConsumersMaxBackoffMillis = Long.parseLong(
+			configProps.getProperty(
+				ConsumerConfigConstants.LIST_STREAM_CONSUMERS_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_LIST_STREAM_CONSUMERS_BACKOFF_MAX)));
+		this.listStreamConsumersExpConstant = Double.parseDouble(
+			configProps.getProperty(
+				ConsumerConfigConstants.LIST_STREAM_CONSUMERS_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_LIST_STREAM_CONSUMERS_BACKOFF_EXPONENTIAL_CONSTANT)));
+		this.listStreamConsumersMaxRetries = Integer.parseInt(
+			configProps.getProperty(
+				ConsumerConfigConstants.LIST_STREAM_CONSUMERS_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_LIST_STREAM_CONSUMERS_RETRIES)));
+	}
+
+	// ------------------------------------------------------------------------
+	//  subscribeToShard() related performance settings
+	// ------------------------------------------------------------------------
+	/**
+	 * Get maximum retry attempts for the subscribe to shard operation.
+	 */
+	public int getSubscribeToShardMaxRetries() {
+		return subscribeToShardMaxRetries;
+	}
+
+	/**
+	 * Get maximum backoff millis for the subscribe to shard operation.
+	 */
+	public long getSubscribeToShardMaxBackoffMillis() {
+		return subscribeToShardMaxBackoffMillis;
+	}
+
+	/**
+	 * Get base backoff millis for the subscribe to shard operation.
+	 */
+	public long getSubscribeToShardBaseBackoffMillis() {
+		return subscribeToShardBaseBackoffMillis;
+	}
+
+	/**
+	 * Get exponential backoff power constant for the subscribe to shard operation.
+	 */
+	public double getSubscribeToShardExpConstant() {
+		return subscribeToShardExpConstant;
+	}
+	// ------------------------------------------------------------------------
+	//  registerStream() related performance settings
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Get base backoff millis for the register stream operation.
+	 */
+	public long getRegisterStreamBaseBackoffMillis() {
+		return registerStreamBaseBackoffMillis;
+	}
+
+	/**
+	 * Get maximum backoff millis for the register stream operation.
+	 */
+	public long getRegisterStreamMaxBackoffMillis() {
+		return registerStreamMaxBackoffMillis;
+	}
+
+	/**
+	 * Get exponential backoff power constant for the register stream operation.
+	 */
+	public double getRegisterStreamExpConstant() {
+		return registerStreamExpConstant;
+	}
+
+	/**
+	 * Get maximum retry attempts for the register stream operation.
+	 */
+	public int getRegisterStreamMaxRetries() {
+		return registerStreamMaxRetries;
+	}
+
+	// ------------------------------------------------------------------------
+	//  deregisterStream() related performance settings
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Get base backoff millis for the deregister stream operation.
+	 */
+	public long getDeregisterStreamBaseBackoffMillis() {
+		return deregisterStreamBaseBackoffMillis;
+	}
+
+	/**
+	 * Get maximum backoff millis for the deregister stream operation.
+	 */
+	public long getDeregisterStreamMaxBackoffMillis() {
+		return deregisterStreamMaxBackoffMillis;
+	}
+
+	/**
+	 * Get exponential backoff power constant for the deregister stream operation.
+	 */
+	public double getDeregisterStreamExpConstant() {
+		return deregisterStreamExpConstant;
+	}
+
+	/**
+	 * Get maximum retry attempts for the register stream operation.
+	 */
+	public int getDeregisterStreamMaxRetries() {
+		return deregisterStreamMaxRetries;
+	}
+	// ------------------------------------------------------------------------
+	//  listStream() related performance settings
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Get base backoff millis for the list stream consumers operation.
+	 */
+	public long getListStreamConsumersBaseBackoffMillis() {
+		return listStreamConsumersBaseBackoffMillis;
+	}
+
+	/**
+	 * Get maximum backoff millis for the list stream consumers operation.
+	 */
+	public long getListStreamConsumersMaxBackoffMillis() {
+		return listStreamConsumersMaxBackoffMillis;
+	}
+
+	/**
+	 * Get exponential backoff power constant for the list stream consumers operation.
+	 */
+	public double getListStreamConsumersExpConstant() {
+		return listStreamConsumersExpConstant;
+	}
+
+	/**
+	 * Get maximum retry attempts for the list stream consumers operation.
+	 */
+	public int getListStreamConsumersMaxRetries() {
+		return listStreamConsumersMaxRetries;
+	}
+
+	/**
+	 * Get efo registration type.
+	 */
+	public EFORegistrationType getEfoRegistrationType() {
+		return efoRegistrationType;
+	}
+
+	/**
+	 * Get consumer name, will be null if efo registration type is 'NONE'.
+	 */
+	@Nullable
+	public String getConsumerName() {

Review comment:
       I think you are right. I'm a big fan of `Optional`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org