You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/09/07 06:19:29 UTC

[5/6] flink git commit: [FLINK-7367] [kinesis] Generalize configuration for FlinkKinesisProducer properties

[FLINK-7367] [kinesis] Generalize configuration for FlinkKinesisProducer properties


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9ed5d9a1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9ed5d9a1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9ed5d9a1

Branch: refs/heads/master
Commit: 9ed5d9a180dcd871e33bf8982434e3afd90ed295
Parents: 98737f9
Author: Bowen Li <bo...@gmail.com>
Authored: Thu Aug 3 20:59:02 2017 -0700
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Sep 7 12:54:27 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/kinesis.md                  | 27 +++++---
 .../kinesis/FlinkKinesisProducer.java           | 25 ++-----
 .../kinesis/config/ProducerConfigConstants.java | 16 ++++-
 .../kinesis/util/KinesisConfigUtil.java         | 48 +++++++++++--
 .../kinesis/FlinkKinesisConsumerTest.java       | 72 --------------------
 .../kinesis/util/KinesisConfigUtilTest.java     | 66 ++++++++++++++++++
 6 files changed, 147 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9ed5d9a1/docs/dev/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md
index 5fbf24b..1eea308 100644
--- a/docs/dev/connectors/kinesis.md
+++ b/docs/dev/connectors/kinesis.md
@@ -256,23 +256,29 @@ consumer when calling this API can also be modified by using the other keys pref
 
 ## Kinesis Producer
 
-The `FlinkKinesisProducer` is used for putting data from a Flink stream into a Kinesis stream. Note that the producer is not participating in
-Flink's checkpointing and doesn't provide exactly-once processing guarantees.
-Also, the Kinesis producer does not guarantee that records are written in order to the shards (See [here](https://github.com/awslabs/amazon-kinesis-producer/issues/23) and [here](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestSyntax) for more details).
+The `FlinkKinesisProducer` uses [Kinesis Producer Library (KPL)](http://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) to put data from a Flink stream into a Kinesis stream.
+
+Note that the producer is not participating in Flink's checkpointing and doesn't provide exactly-once processing guarantees. Also, the Kinesis producer does not guarantee that records are written in order to the shards (See [here](https://github.com/awslabs/amazon-kinesis-producer/issues/23) and [here](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestSyntax) for more details).
 
 In case of a failure or a resharding, data will be written again to Kinesis, leading to duplicates. This behavior is usually called "at-least-once" semantics.
 
 To put data into a Kinesis stream, make sure the stream is marked as "ACTIVE" in the AWS dashboard.
 
-For the monitoring to work, the user accessing the stream needs access to the Cloud watch service.
+For the monitoring to work, the user accessing the stream needs access to the CloudWatch service.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 Properties producerConfig = new Properties();
+// Required configs
 producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
 producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
 producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+// Optional configs
+producerConfig.put("AggregationMaxCount", "4294967295");
+producerConfig.put("CollectionMaxCount", "1000");
+producerConfig.put("RecordTtl", "30000");
+producerConfig.put("RequestTimeout", "6000");
 
 FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);
 kinesis.setFailOnError(true);
@@ -286,9 +292,15 @@ simpleStringStream.addSink(kinesis);
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 val producerConfig = new Properties();
+// Required configs
 producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
 producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
 producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+// Optional configs
+producerConfig.put("AggregationMaxCount", "4294967295");
+producerConfig.put("CollectionMaxCount", "1000");
+producerConfig.put("RecordTtl", "30000");
+producerConfig.put("RequestTimeout", "6000");
 
 val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig);
 kinesis.setFailOnError(true);
@@ -301,15 +313,14 @@ simpleStringStream.addSink(kinesis);
 </div>
 </div>
 
-The above is a simple example of using the producer. Configuration for the producer with the mandatory configuration values is supplied with a `java.util.Properties`
-instance as described above for the consumer. The example demonstrates producing a single Kinesis stream in the AWS region "us-east-1".
+The above is a simple example of using the producer. To initialize `FlinkKinesisProducer`, users are required to pass in `AWS_REGION`, `AWS_ACCESS_KEY_ID`, and `AWS_SECRET_ACCESS_KEY` via a `java.util.Properties` instance. Users can also pass in KPL's configurations as optional parameters to customize the KPL underlying `FlinkKinesisProducer`. The full list of KPL configs and explanations can be found [here](https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties). The example demonstrates producing a single Kinesis stream in the AWS region "us-east-1".
+
+If users don't specify any KPL configs and values, `FlinkKinesisProducer` will use default config values of KPL, except `RateLimit`. `RateLimit` limits the maximum allowed put rate for a shard, as a percentage of the backend limits. KPL's default value is 150 but it makes KPL throw `RateLimitExceededException` too frequently and breaks Flink sink as a result. Thus `FlinkKinesisProducer` overrides KPL's default value to 100.
 
 Instead of a `SerializationSchema`, it also supports a `KinesisSerializationSchema`. The `KinesisSerializationSchema` allows to send the data to multiple streams. This is
 done using the `KinesisSerializationSchema.getTargetStream(T element)` method. Returning `null` there will instruct the producer to write the element to the default stream.
 Otherwise, the returned stream name is used.
 
-Other optional configuration keys for the producer can be found in `ProducerConfigConstants`.
-
 
 ## Using Non-AWS Kinesis Endpoints for Testing
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed5d9a1/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
index e0d3e38..1f5e64c 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
@@ -19,13 +19,11 @@ package org.apache.flink.streaming.connectors.kinesis;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.PropertiesUtil;
 
 import com.amazonaws.services.kinesis.producer.Attempt;
 import com.amazonaws.services.kinesis.producer.KinesisProducer;
@@ -91,7 +89,7 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
 	 * This is a constructor supporting Flink's {@see SerializationSchema}.
 	 *
 	 * @param schema Serialization schema for the data type
-	 * @param configProps The properties used to configure AWS credentials and AWS region
+	 * @param configProps The properties used to configure KinesisProducer, including AWS credentials and AWS region
 	 */
 	public FlinkKinesisProducer(final SerializationSchema<OUT> schema, Properties configProps) {
 
@@ -116,13 +114,11 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
 	 * This is a constructor supporting {@see KinesisSerializationSchema}.
 	 *
 	 * @param schema Kinesis serialization schema for the data type
-	 * @param configProps The properties used to configure AWS credentials and AWS region
+	 * @param configProps The properties used to configure KinesisProducer, including AWS credentials and AWS region
 	 */
 	public FlinkKinesisProducer(KinesisSerializationSchema<OUT> schema, Properties configProps) {
-		this.configProps = checkNotNull(configProps, "configProps can not be null");
-
-		// check the configuration properties for any conflicting settings
-		KinesisConfigUtil.validateProducerConfiguration(this.configProps);
+		checkNotNull(configProps, "configProps can not be null");
+		this.configProps = KinesisConfigUtil.replaceDeprecatedProducerKeys(configProps);
 
 		checkNotNull(schema, "serialization schema cannot be null");
 		checkArgument(
@@ -174,18 +170,9 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
 	public void open(Configuration parameters) throws Exception {
 		super.open(parameters);
 
-		KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration();
-
-		producerConfig.setRegion(configProps.getProperty(ProducerConfigConstants.AWS_REGION));
+		// check and pass the configuration properties
+		KinesisProducerConfiguration producerConfig = KinesisConfigUtil.validateProducerConfiguration(configProps);
 		producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
-		if (configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) {
-			producerConfig.setCollectionMaxCount(PropertiesUtil.getLong(configProps,
-					ProducerConfigConstants.COLLECTION_MAX_COUNT, producerConfig.getCollectionMaxCount(), LOG));
-		}
-		if (configProps.containsKey(ProducerConfigConstants.AGGREGATION_MAX_COUNT)) {
-			producerConfig.setAggregationMaxCount(PropertiesUtil.getLong(configProps,
-					ProducerConfigConstants.AGGREGATION_MAX_COUNT, producerConfig.getAggregationMaxCount(), LOG));
-		}
 
 		producer = new KinesisProducer(producerConfig);
 		callback = new FutureCallback<UserRecordResult>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed5d9a1/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
index d131150..983687e 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
@@ -20,14 +20,24 @@ package org.apache.flink.streaming.connectors.kinesis.config;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
 
 /**
+ * @deprecated
+ *
  * Optional producer specific configuration keys for {@link FlinkKinesisProducer}.
  */
+@Deprecated
 public class ProducerConfigConstants extends AWSConfigConstants {
 
-	/** Maximum number of items to pack into an PutRecords request. **/
+	/**
+	 * @deprecated
+	 *
+	 * Deprecated key. **/
+	@Deprecated
 	public static final String COLLECTION_MAX_COUNT = "aws.producer.collectionMaxCount";
 
-	/** Maximum number of items to pack into an aggregated record. **/
+ 	/**
+	 * @deprecated
+	 *
+	 * Deprecated key. **/
+	@Deprecated
 	public static final String AGGREGATION_MAX_COUNT = "aws.producer.aggregationMaxCount";
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed5d9a1/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index 42f1af0..997191c 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -26,6 +26,7 @@ import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConsta
 import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
 
 import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
 
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
@@ -38,6 +39,22 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * Utilities for Flink Kinesis connector configuration.
  */
 public class KinesisConfigUtil {
+
+	/** Maximum number of items to pack into an PutRecords request. **/
+	protected static final String COLLECTION_MAX_COUNT = "CollectionMaxCount";
+
+	/** Maximum number of items to pack into an aggregated record. **/
+	protected static final String AGGREGATION_MAX_COUNT = "AggregationMaxCount";
+
+	/** Limits the maximum allowed put rate for a shard, as a percentage of the backend limits.
+	 * The default value is set as 100% in Flink. KPL's default value is 150% but it makes KPL throw
+	 * RateLimitExceededException too frequently and breaks Flink sink as a result.
+	 **/
+	private static final String RATE_LIMIT = "RateLimit";
+
+	/** Default values for RateLimit. **/
+	private static final String DEFAULT_RATE_LIMIT = "100";
+
 	/**
 	 * Validate configuration properties for {@link FlinkKinesisConsumer}.
 	 */
@@ -127,18 +144,39 @@ public class KinesisConfigUtil {
 	}
 
 	/**
+	 * Replace deprecated configuration properties for {@link FlinkKinesisProducer}.
+	 * This should be remove along with deprecated keys
+	 */
+	public static Properties replaceDeprecatedProducerKeys(Properties configProps) {
+		// Replace deprecated key
+		if (configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) {
+			configProps.setProperty(COLLECTION_MAX_COUNT,
+					configProps.getProperty(ProducerConfigConstants.COLLECTION_MAX_COUNT));
+			configProps.remove(ProducerConfigConstants.COLLECTION_MAX_COUNT);
+		}
+		// Replace deprecated key
+		if (configProps.containsKey(ProducerConfigConstants.AGGREGATION_MAX_COUNT)) {
+			configProps.setProperty(AGGREGATION_MAX_COUNT,
+					configProps.getProperty(ProducerConfigConstants.AGGREGATION_MAX_COUNT));
+			configProps.remove(ProducerConfigConstants.AGGREGATION_MAX_COUNT);
+		}
+		return configProps;
+	}
+
+	/**
 	 * Validate configuration properties for {@link FlinkKinesisProducer}.
 	 */
-	public static void validateProducerConfiguration(Properties config) {
+	public static KinesisProducerConfiguration validateProducerConfiguration(Properties config) {
 		checkNotNull(config, "config can not be null");
 
 		validateAwsConfiguration(config);
 
-		validateOptionalPositiveLongProperty(config, ProducerConfigConstants.COLLECTION_MAX_COUNT,
-			"Invalid value given for maximum number of items to pack into a PutRecords request. Must be a valid non-negative long value.");
+		// Override KPL default value if it's not specified by user
+		if (!config.containsKey(RATE_LIMIT)) {
+			config.setProperty(RATE_LIMIT, DEFAULT_RATE_LIMIT);
+		}
 
-		validateOptionalPositiveLongProperty(config, ProducerConfigConstants.AGGREGATION_MAX_COUNT,
-			"Invalid value given for maximum number of items to pack into an aggregated record. Must be a valid non-negative long value.");
+		return KinesisProducerConfiguration.fromProperties(config);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed5d9a1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 6af4c62..4a007d5 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -33,7 +33,6 @@ import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
@@ -46,7 +45,6 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGen
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.flink.util.InstantiationUtil;
 
 import com.amazonaws.services.kinesis.model.HashKeyRange;
 import com.amazonaws.services.kinesis.model.SequenceNumberRange;
@@ -507,76 +505,6 @@ public class FlinkKinesisConsumerTest {
 	}
 
 	// ----------------------------------------------------------------------
-	// FlinkKinesisConsumer.validateProducerConfiguration() tests
-	// ----------------------------------------------------------------------
-
-	@Test
-	public void testUnparsableLongForCollectionMaxCountInConfig() {
-		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Invalid value given for maximum number of items to pack into a PutRecords request");
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ProducerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-		testConfig.setProperty(ProducerConfigConstants.COLLECTION_MAX_COUNT, "unparsableLong");
-
-		KinesisConfigUtil.validateProducerConfiguration(testConfig);
-	}
-
-	@Test
-	public void testUnparsableLongForAggregationMaxCountInConfig() {
-		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Invalid value given for maximum number of items to pack into an aggregated record");
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ProducerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-		testConfig.setProperty(ProducerConfigConstants.AGGREGATION_MAX_COUNT, "unparsableLong");
-
-		KinesisConfigUtil.validateProducerConfiguration(testConfig);
-	}
-
-	// ----------------------------------------------------------------------
-	// Tests to verify serializability
-	// ----------------------------------------------------------------------
-
-	@Test
-	public void testCreateWithNonSerializableDeserializerFails() {
-		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("The provided deserialization schema is not serializable");
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
-		new FlinkKinesisConsumer<>("test-stream", new NonSerializableDeserializationSchema(), testConfig);
-	}
-
-	@Test
-	public void testCreateWithSerializableDeserializer() {
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
-		new FlinkKinesisConsumer<>("test-stream", new SerializableDeserializationSchema(), testConfig);
-	}
-
-	@Test
-	public void testConsumerIsSerializable() {
-		Properties testConfig = new Properties();
-		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
-		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
-		FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("test-stream", new SimpleStringSchema(), testConfig);
-		assertTrue(InstantiationUtil.isSerializable(consumer));
-	}
-
-	// ----------------------------------------------------------------------
 	// Tests related to state initialization
 	// ----------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed5d9a1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
new file mode 100644
index 0000000..d14ac04
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for KinesisConfigUtil.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({FlinkKinesisConsumer.class, KinesisConfigUtil.class})
+public class KinesisConfigUtilTest {
+	@Rule
+	private ExpectedException exception = ExpectedException.none();
+
+	@Test
+	public void testUnparsableLongForProducerConfiguration() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Error trying to set field RateLimit with the value 'unparsableLong'");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ProducerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty("RateLimit", "unparsableLong");
+
+		KinesisConfigUtil.validateProducerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testReplaceDeprecatedKeys() {
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ProducerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ProducerConfigConstants.AGGREGATION_MAX_COUNT, "1");
+		testConfig.setProperty(ProducerConfigConstants.COLLECTION_MAX_COUNT, "2");
+		Properties replacedConfig = KinesisConfigUtil.replaceDeprecatedProducerKeys(testConfig);
+
+		assertEquals("1", replacedConfig.getProperty(KinesisConfigUtil.AGGREGATION_MAX_COUNT));
+		assertEquals("2", replacedConfig.getProperty(KinesisConfigUtil.COLLECTION_MAX_COUNT));
+	}
+}