You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/07/17 13:26:48 UTC

[GitHub] [flink] dannycranmer commented on a change in pull request #12920: [FLINK-18536][kinesis] Adding enhanced fan-out related configurations.

dannycranmer commented on a change in pull request #12920:
URL: https://github.com/apache/flink/pull/12920#discussion_r456415363



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;

Review comment:
       @xiaolong-sn missing Apache copyright statement

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {
+	private static final long serialVersionUID = 3204635913413261619L;
+
+	private EFORegistrationType efoRegistrationType;
+	@Nullable
+	private String consumerName;
+	@Nullable
+	private List<String> streamConsumerArns;
+
+	private int subscribeToShardMaxRetries;
+
+	private long subscribeToShardMaxBackoffMillis;
+
+	private long subscribeToShardBaseBackoffMillis;
+
+	private double subscribeToShardExpConstant;
+
+	public FanOutProperties(Properties properties, List<String> streams) {

Review comment:
       @xiaolong-sn missing comment ["All public/protected methods and classes must have a Javadoc."](https://flink.apache.org/contributing/code-style-and-quality-formatting.html)

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
##########
@@ -169,6 +232,55 @@ public static void validateConsumerConfiguration(Properties config) {
 		}
 	}
 
+	public static void validateEFOConfiguration(Properties config, List<String> streams) {
+		if (config.containsKey(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE)) {
+			String recordPublisherType = config.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE);
+
+			// specified record publisher type in stream must be either EFO or POLLING
+			try {
+				RecordPublisherType.valueOf(recordPublisherType);
+			} catch (IllegalArgumentException e) {
+				StringBuilder sb = new StringBuilder();
+				for (RecordPublisherType rp : RecordPublisherType.values()) {
+					sb.append(rp.toString()).append(", ");
+				}
+				throw new IllegalArgumentException("Invalid record publisher type in stream set in config. Valid values are: " + sb.toString());

Review comment:
       @xiaolong-sn this validation is general stream consumer validation yet is within the `validateEFOConfiguration` method. Suggest moving to a new method and invoke from the parent to restrict responsibility. 

##########
File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutPropertiesTest.java
##########
@@ -0,0 +1,120 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;

Review comment:
       @xiaolong-sn Missing Apache copyright 

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
##########
@@ -169,6 +232,55 @@ public static void validateConsumerConfiguration(Properties config) {
 		}
 	}
 
+	public static void validateEFOConfiguration(Properties config, List<String> streams) {
+		if (config.containsKey(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE)) {
+			String recordPublisherType = config.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE);
+
+			// specified record publisher type in stream must be either EFO or POLLING
+			try {
+				RecordPublisherType.valueOf(recordPublisherType);
+			} catch (IllegalArgumentException e) {
+				StringBuilder sb = new StringBuilder();
+				for (RecordPublisherType rp : RecordPublisherType.values()) {
+					sb.append(rp.toString()).append(", ");
+				}
+				throw new IllegalArgumentException("Invalid record publisher type in stream set in config. Valid values are: " + sb.toString());
+			}
+			if (RecordPublisherType.valueOf(recordPublisherType) == RecordPublisherType.EFO) {
+				String efoRegistrationType;
+				if (config.containsKey(ConsumerConfigConstants.EFO_REGISTRATION_TYPE)) {
+					efoRegistrationType = config.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE);
+					// specified efo registration type in stream must be either LAZY, EAGER or NONE.
+					try {
+						EFORegistrationType.valueOf(efoRegistrationType);
+					} catch (IllegalArgumentException e) {
+						StringBuilder sb = new StringBuilder();
+						for (EFORegistrationType ert : EFORegistrationType.values()) {
+							sb.append(ert.toString()).append(", ");
+						}
+						throw new IllegalArgumentException("Invalid efo registration type in stream set in config. Valid values are: " + sb.toString());
+					}
+				} else {
+					efoRegistrationType = EFORegistrationType.LAZY.toString();
+				}
+				if (EFORegistrationType.valueOf(efoRegistrationType) == EFORegistrationType.NONE) {
+					//if the registration type is NONE, then for each stream there must be an according consumer ARN
+					for (String stream : streams) {
+						String efoConsumerARNKey = ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX + "." + stream;
+						if (!config.containsKey(efoConsumerARNKey)) {
+							throw new IllegalArgumentException("Invalid efo consumer arn settings for not providing " + efoConsumerARNKey);

Review comment:
       @xiaolong-sn nit: This is fine, however consider collecting all missing keys rather than fail fast to give the user a single error message

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {
+	private static final long serialVersionUID = 3204635913413261619L;
+
+	private EFORegistrationType efoRegistrationType;
+	@Nullable
+	private String consumerName;
+	@Nullable
+	private List<String> streamConsumerArns;
+
+	private int subscribeToShardMaxRetries;
+
+	private long subscribeToShardMaxBackoffMillis;
+
+	private long subscribeToShardBaseBackoffMillis;
+
+	private double subscribeToShardExpConstant;
+
+	public FanOutProperties(Properties properties, List<String> streams) {
+		//validate the properties
+		Preconditions.checkArgument(properties.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE).equals(RecordPublisherType.EFO.toString()), "Only efo record publisher can register a FanOutProperties.");
+		KinesisConfigUtil.validateEFOConfiguration(properties, streams);
+
+		efoRegistrationType = EFORegistrationType.valueOf(properties.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, EFORegistrationType.EAGER.toString()));
+		//if efo registration type is EAGER|LAZY, then user should explicitly provide a consumer name for each stream.
+		if (efoRegistrationType == EFORegistrationType.EAGER || efoRegistrationType == EFORegistrationType.LAZY) {
+			consumerName = properties.getProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME);
+		} else {
+			//else users should explicitly provide consumer arns.
+			streamConsumerArns = new ArrayList<>();
+			for (String stream:streams) {

Review comment:
       @xiaolong-sn nit: spaces around `:`

##########
File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
##########
@@ -180,6 +182,104 @@ public void testUnrecognizableCredentialProviderTypeInConfig() {
 		KinesisConfigUtil.validateAwsConfiguration(testConfig);
 	}
 
+	// ----------------------------------------------------------------------
+	// validateEFOConfiguration() tests
+	// ----------------------------------------------------------------------
+
+	@Test
+	public void testNoRecordPublisherTypeInConfig() {
+		Properties testConfig = TestUtils.getStandardProperties();
+
+		KinesisConfigUtil.validateEFOConfiguration(testConfig, new ArrayList<>());

Review comment:
       @xiaolong-sn nit: `Collections.emptyList()` is more expressive

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {

Review comment:
       @xiaolong-sn does this need to be `Serializable`? I think the `Properties` are serialised which are used to construct this class, and therefore this class does not need it.

##########
File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutPropertiesTest.java
##########
@@ -0,0 +1,120 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link FanOutProperties}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(FanOutProperties.class)
+public class FanOutPropertiesTest extends TestLogger {
+	@Rule
+	private ExpectedException exception = ExpectedException.none();
+
+	@Test
+	public void testPollingRecordPublisher() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Only efo record publisher can register a FanOutProperties.");
+
+		Properties testConfig = TestUtils.getStandardProperties();
+		testConfig.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, RecordPublisherType.POLLING.toString());
+
+		new FanOutProperties(testConfig, new ArrayList<>());
+	}
+
+	@Test
+	public void testEagerStrategyWithConsumerName() {
+		String fakedConsumerName = "fakedconsumername";
+		Properties testConfig = TestUtils.getStandardProperties();
+		testConfig.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, RecordPublisherType.EFO.toString());
+		testConfig.setProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME, fakedConsumerName);
+		FanOutProperties fanOutProperties = new FanOutProperties(testConfig, new ArrayList<>());
+		assertEquals(fanOutProperties.getConsumerName(), fakedConsumerName);
+	}
+
+	@Test
+	public void testEagerStrategyWithNoConsumerName() {
+		String msg = "No valid enhanced fan-out consumer name is set through " + ConsumerConfigConstants.EFO_CONSUMER_NAME;
+
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage(msg);
+
+		Properties testConfig = TestUtils.getStandardProperties();
+		testConfig.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, RecordPublisherType.EFO.toString());
+		new FanOutProperties(testConfig, new ArrayList<>());
+	}
+
+	@Test
+	public void testNoneStrategyWithStreams() {
+		String fakedStream1 = "fakedstream1";

Review comment:
       @xiaolong-sn these 5 lines can be reduced to `List<String> streams = Arrays.asList("fakedstream1", "fakedstream2");`

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
##########
@@ -137,6 +137,48 @@
 	/* Exponential backoff power constant for the describe stream operation. */
 	private final double describeStreamExpConstant;
 
+	// ------------------------------------------------------------------------
+	//  registerStream() related performance settings
+	// ------------------------------------------------------------------------
+	/** Base backoff millis for the register stream operation. */
+	private final long registerStreamBaseBackoffMillis;
+
+	/** Maximum backoff millis for the register stream operation. */
+	private final long registerStreamMaxBackoffMillis;
+
+	/** Exponential backoff power constant for the register stream operation. */
+	private final double registerStreamExpConstant;
+
+	/** Maximum retry attempts for the register stream operation. */
+	private final int registerStreamMaxRetries;
+	// ------------------------------------------------------------------------
+	//  deregisterStream() related performance settings
+	// ------------------------------------------------------------------------
+	/** Base backoff millis for the deregister stream operation. */
+	private final long deregisterStreamBaseBackoffMillis;
+
+	/** Maximum backoff millis for the deregister stream operation. */
+	private final long deregisterStreamMaxBackoffMillis;
+
+	/** Exponential backoff power constant for the deregister stream operation. */
+	private final double deregisterStreamExpConstant;
+
+	/** Maximum retry attempts for the deregister stream operation. */
+	private final int deregisterStreamMaxRetries;
+	// ------------------------------------------------------------------------
+	//  listStream() related performance settings

Review comment:
       @xiaolong-sn these should be `listStreamConsumers`

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
##########
@@ -137,6 +137,48 @@
 	/* Exponential backoff power constant for the describe stream operation. */
 	private final double describeStreamExpConstant;
 
+	// ------------------------------------------------------------------------

Review comment:
       @xiaolong-sn as discussed the plan was to implement the de-/registration in the `KinesisProxyV2`. Can we please move these properties to a new configuration class, similar to the `FanOutProperties`, or merge them all into a single configuration class?

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
##########
@@ -159,6 +174,54 @@ public static void validateConsumerConfiguration(Properties config) {
 		validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT,
 			"Invalid value given for list shards operation backoff exponential constant. Must be a valid non-negative double value.");
 
+		validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.REGISTER_STREAM_RETRIES,
+			"Invalid value given for maximum retry attempts for register stream operation. Must be a valid non-negative integer value.");
+
+		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_BASE,
+			"Invalid value given for register stream operation base backoff milliseconds. Must be a valid non-negative long value.");
+
+		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_MAX,
+			"Invalid value given for register stream operation max backoff milliseconds. Must be a valid non-negative long value.");
+
+		validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.REGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT,
+			"Invalid value given for register stream operation backoff exponential constant. Must be a valid non-negative double value.");
+
+		validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.DEREGISTER_STREAM_RETRIES,
+			"Invalid value given for maximum retry attempts for deregister stream operation. Must be a valid non-negative integer value.");
+
+		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_BASE,
+			"Invalid value given for deregister stream operation base backoff milliseconds. Must be a valid non-negative long value.");
+
+		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_MAX,
+			"Invalid value given for deregister stream operation max backoff milliseconds. Must be a valid non-negative long value.");
+
+		validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.DEREGISTER_STREAM_BACKOFF_EXPONENTIAL_CONSTANT,
+			"Invalid value given for deregister stream operation backoff exponential constant. Must be a valid non-negative double value.");
+
+		validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.LIST_STREAM_RETRIES,

Review comment:
       @xiaolong-sn should be list stream consumers (for all the new list properties)

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {
+	private static final long serialVersionUID = 3204635913413261619L;
+
+	private EFORegistrationType efoRegistrationType;
+	@Nullable
+	private String consumerName;
+	@Nullable
+	private List<String> streamConsumerArns;
+
+	private int subscribeToShardMaxRetries;
+
+	private long subscribeToShardMaxBackoffMillis;
+
+	private long subscribeToShardBaseBackoffMillis;
+
+	private double subscribeToShardExpConstant;
+
+	public FanOutProperties(Properties properties, List<String> streams) {
+		//validate the properties
+		Preconditions.checkArgument(properties.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE).equals(RecordPublisherType.EFO.toString()), "Only efo record publisher can register a FanOutProperties.");
+		KinesisConfigUtil.validateEFOConfiguration(properties, streams);
+
+		efoRegistrationType = EFORegistrationType.valueOf(properties.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, EFORegistrationType.EAGER.toString()));
+		//if efo registration type is EAGER|LAZY, then user should explicitly provide a consumer name for each stream.
+		if (efoRegistrationType == EFORegistrationType.EAGER || efoRegistrationType == EFORegistrationType.LAZY) {
+			consumerName = properties.getProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME);
+		} else {
+			//else users should explicitly provide consumer arns.
+			streamConsumerArns = new ArrayList<>();
+			for (String stream:streams) {
+				String key = ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX + "." + stream;
+				streamConsumerArns.add(properties.getProperty(key));
+			}
+		}
+
+		this.subscribeToShardMaxRetries = Integer.parseInt(
+			properties.getProperty(
+				ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES)));
+		this.subscribeToShardBaseBackoffMillis = Long.parseLong(
+			properties.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_BASE)));
+		this.subscribeToShardMaxBackoffMillis = Long.parseLong(
+			properties.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_MAX)));
+		this.subscribeToShardExpConstant = Double.parseDouble(
+			properties.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT)));
+	}
+
+	public void setEfoRegistrationType(EFORegistrationType efoRegistrationType) {

Review comment:
       @xiaolong-sn as the setters are not needed, we should delete them and make the class immutable (imo). Then we can make the fields `final` (["A good general approach is to try and make as many fields of a class final as possible"](https://flink.apache.org/contributing/code-style-and-quality-common.html))

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {
+	private static final long serialVersionUID = 3204635913413261619L;
+
+	private EFORegistrationType efoRegistrationType;
+	@Nullable

Review comment:
       @xiaolong-sn nit: My preference is a new line before each field (before the annotations)

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {
+	private static final long serialVersionUID = 3204635913413261619L;
+
+	private EFORegistrationType efoRegistrationType;
+	@Nullable
+	private String consumerName;
+	@Nullable
+	private List<String> streamConsumerArns;

Review comment:
       @xiaolong-sn having a `List` means we lose the Stream > ARN relationship. Can you use a `Map` instead where the key is stream and value is ARN?

##########
File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
##########
@@ -277,7 +377,7 @@ public void testIllegalValueForInitialTimestampInConfig() {
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
 		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "-1.0");
 
-		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig, new ArrayList<>());

Review comment:
       @xiaolong-sn consider overloading the method for backwards compatibility

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
##########
@@ -66,27 +74,35 @@
 	 **/
 	protected static final String THREAD_POOL_SIZE = "ThreadPoolSize";
 
-	/** Default values for RateLimit. **/
+	/**
+	 * Default values for RateLimit.
+	 **/
 	protected static final long DEFAULT_RATE_LIMIT = 100L;
 
-	/** Default value for ThreadingModel. **/
+	/**
+	 * Default value for ThreadingModel.
+	 **/
 	protected static final KinesisProducerConfiguration.ThreadingModel DEFAULT_THREADING_MODEL = KinesisProducerConfiguration.ThreadingModel.POOLED;
 
-	/** Default values for ThreadPoolSize. **/
+	/**
+	 * Default values for ThreadPoolSize.
+	 **/
 	protected static final int DEFAULT_THREAD_POOL_SIZE = 10;
 
 	/**
 	 * Validate configuration properties for {@link FlinkKinesisConsumer}.
 	 */
-	public static void validateConsumerConfiguration(Properties config) {
+	public static void validateConsumerConfiguration(Properties config, List<String> streams) {
 		checkNotNull(config, "config can not be null");
 
 		validateAwsConfiguration(config);
 
+		validateEFOConfiguration(config, streams);

Review comment:
       @xiaolong-sn nit: Camelcase applies to acronyms too, typically `validateEFOConfiguration` should be `validateEfoConfiguration`. This applies to all instances of fields/methods in the PR

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {
+	private static final long serialVersionUID = 3204635913413261619L;
+
+	private EFORegistrationType efoRegistrationType;
+	@Nullable
+	private String consumerName;
+	@Nullable
+	private List<String> streamConsumerArns;
+
+	private int subscribeToShardMaxRetries;
+
+	private long subscribeToShardMaxBackoffMillis;
+
+	private long subscribeToShardBaseBackoffMillis;
+
+	private double subscribeToShardExpConstant;
+
+	public FanOutProperties(Properties properties, List<String> streams) {
+		//validate the properties
+		Preconditions.checkArgument(properties.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE).equals(RecordPublisherType.EFO.toString()), "Only efo record publisher can register a FanOutProperties.");
+		KinesisConfigUtil.validateEFOConfiguration(properties, streams);
+
+		efoRegistrationType = EFORegistrationType.valueOf(properties.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, EFORegistrationType.EAGER.toString()));
+		//if efo registration type is EAGER|LAZY, then user should explicitly provide a consumer name for each stream.
+		if (efoRegistrationType == EFORegistrationType.EAGER || efoRegistrationType == EFORegistrationType.LAZY) {
+			consumerName = properties.getProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME);
+		} else {
+			//else users should explicitly provide consumer arns.
+			streamConsumerArns = new ArrayList<>();
+			for (String stream:streams) {
+				String key = ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX + "." + stream;
+				streamConsumerArns.add(properties.getProperty(key));

Review comment:
       @xiaolong-sn using a map `streamConsumerArns.put(stream, properties.getProperty(key));`

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
##########
@@ -169,6 +232,55 @@ public static void validateConsumerConfiguration(Properties config) {
 		}
 	}
 
+	public static void validateEFOConfiguration(Properties config, List<String> streams) {
+		if (config.containsKey(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE)) {

Review comment:
       @xiaolong-sn Consider ["Avoid deep nesting of scopes, by flipping the if condition and exiting early."](https://flink.apache.org/contributing/code-style-and-quality-common.html)

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {
+	private static final long serialVersionUID = 3204635913413261619L;
+
+	private EFORegistrationType efoRegistrationType;
+	@Nullable
+	private String consumerName;
+	@Nullable
+	private List<String> streamConsumerArns;
+
+	private int subscribeToShardMaxRetries;
+
+	private long subscribeToShardMaxBackoffMillis;
+
+	private long subscribeToShardBaseBackoffMillis;
+
+	private double subscribeToShardExpConstant;
+
+	public FanOutProperties(Properties properties, List<String> streams) {
+		//validate the properties

Review comment:
       @xiaolong-sn some of your comments might violate the "golden rule", ["Golden rule: Comment as much as necessary to support code understanding, but don’t add redundant information."](https://flink.apache.org/contributing/code-style-and-quality-common.html#comments-and-code-readability)

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/fanout/FanOutProperties.java
##########
@@ -0,0 +1,127 @@
+package org.apache.flink.streaming.connectors.kinesis.internals.fanout;
+
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * This is a configuration class for enhanced fan-out components.
+ */
+public class FanOutProperties implements Serializable {
+	private static final long serialVersionUID = 3204635913413261619L;
+
+	private EFORegistrationType efoRegistrationType;
+	@Nullable
+	private String consumerName;
+	@Nullable
+	private List<String> streamConsumerArns;
+
+	private int subscribeToShardMaxRetries;
+
+	private long subscribeToShardMaxBackoffMillis;
+
+	private long subscribeToShardBaseBackoffMillis;
+
+	private double subscribeToShardExpConstant;
+
+	public FanOutProperties(Properties properties, List<String> streams) {
+		//validate the properties
+		Preconditions.checkArgument(properties.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE).equals(RecordPublisherType.EFO.toString()), "Only efo record publisher can register a FanOutProperties.");
+		KinesisConfigUtil.validateEFOConfiguration(properties, streams);
+
+		efoRegistrationType = EFORegistrationType.valueOf(properties.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, EFORegistrationType.EAGER.toString()));
+		//if efo registration type is EAGER|LAZY, then user should explicitly provide a consumer name for each stream.
+		if (efoRegistrationType == EFORegistrationType.EAGER || efoRegistrationType == EFORegistrationType.LAZY) {
+			consumerName = properties.getProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME);
+		} else {
+			//else users should explicitly provide consumer arns.
+			streamConsumerArns = new ArrayList<>();
+			for (String stream:streams) {
+				String key = ConsumerConfigConstants.EFO_CONSUMER_ARN_PREFIX + "." + stream;
+				streamConsumerArns.add(properties.getProperty(key));
+			}
+		}
+
+		this.subscribeToShardMaxRetries = Integer.parseInt(
+			properties.getProperty(
+				ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_RETRIES)));
+		this.subscribeToShardBaseBackoffMillis = Long.parseLong(
+			properties.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_BASE)));
+		this.subscribeToShardMaxBackoffMillis = Long.parseLong(
+			properties.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_MAX)));
+		this.subscribeToShardExpConstant = Double.parseDouble(
+			properties.getProperty(ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT)));
+	}
+
+	public void setEfoRegistrationType(EFORegistrationType efoRegistrationType) {
+		this.efoRegistrationType = efoRegistrationType;
+	}
+
+	public void setConsumerName(@Nullable String consumerName) {
+		this.consumerName = consumerName;
+	}
+
+	public void setStreamConsumerArns(@Nullable List<String> streamConsumerArns) {
+		this.streamConsumerArns = streamConsumerArns;
+	}
+
+	public void setSubscribeToShardMaxRetries(int subscribeToShardMaxRetries) {
+		this.subscribeToShardMaxRetries = subscribeToShardMaxRetries;
+	}
+
+	public void setSubscribeToShardMaxBackoffMillis(long subscribeToShardMaxBackoffMillis) {
+		this.subscribeToShardMaxBackoffMillis = subscribeToShardMaxBackoffMillis;
+	}
+
+	public void setSubscribeToShardBaseBackoffMillis(long subscribeToShardBaseBackoffMillis) {
+		this.subscribeToShardBaseBackoffMillis = subscribeToShardBaseBackoffMillis;
+	}
+
+	public void setSubscribeToShardExpConstant(double subscribeToShardExpConstant) {
+		this.subscribeToShardExpConstant = subscribeToShardExpConstant;
+	}
+
+	public EFORegistrationType getEfoRegistrationType() {
+		return efoRegistrationType;
+	}
+
+	@Nullable
+	public String getConsumerName() {
+		return consumerName;
+	}
+
+	@Nullable
+	public List<String> getStreamConsumerArns() {

Review comment:
       @xiaolong-sn when we have a map we can add a method `public String getStreamConsumerArn(String stream)`

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
##########
@@ -169,6 +232,55 @@ public static void validateConsumerConfiguration(Properties config) {
 		}
 	}
 
+	public static void validateEFOConfiguration(Properties config, List<String> streams) {
+		if (config.containsKey(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE)) {
+			String recordPublisherType = config.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE);
+
+			// specified record publisher type in stream must be either EFO or POLLING
+			try {
+				RecordPublisherType.valueOf(recordPublisherType);
+			} catch (IllegalArgumentException e) {
+				StringBuilder sb = new StringBuilder();
+				for (RecordPublisherType rp : RecordPublisherType.values()) {
+					sb.append(rp.toString()).append(", ");
+				}
+				throw new IllegalArgumentException("Invalid record publisher type in stream set in config. Valid values are: " + sb.toString());
+			}
+			if (RecordPublisherType.valueOf(recordPublisherType) == RecordPublisherType.EFO) {
+				String efoRegistrationType;
+				if (config.containsKey(ConsumerConfigConstants.EFO_REGISTRATION_TYPE)) {
+					efoRegistrationType = config.getProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE);
+					// specified efo registration type in stream must be either LAZY, EAGER or NONE.
+					try {
+						EFORegistrationType.valueOf(efoRegistrationType);
+					} catch (IllegalArgumentException e) {
+						StringBuilder sb = new StringBuilder();

Review comment:
       @xiaolong-sn nit: nit be nicer to build with stream
   
   ```
   String errorMessage = Arrays.stream(EFORegistrationType.values())
       .map(Enum::name).collect(Collectors.joining(", "));
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org