You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/09/07 06:19:29 UTC
[5/6] flink git commit: [FLINK-7367] [kinesis] Generalize
configuration for FlinkKinesisProducer properties
[FLINK-7367] [kinesis] Generalize configuration for FlinkKinesisProducer properties
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9ed5d9a1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9ed5d9a1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9ed5d9a1
Branch: refs/heads/master
Commit: 9ed5d9a180dcd871e33bf8982434e3afd90ed295
Parents: 98737f9
Author: Bowen Li <bo...@gmail.com>
Authored: Thu Aug 3 20:59:02 2017 -0700
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Sep 7 12:54:27 2017 +0800
----------------------------------------------------------------------
docs/dev/connectors/kinesis.md | 27 +++++---
.../kinesis/FlinkKinesisProducer.java | 25 ++-----
.../kinesis/config/ProducerConfigConstants.java | 16 ++++-
.../kinesis/util/KinesisConfigUtil.java | 48 +++++++++++--
.../kinesis/FlinkKinesisConsumerTest.java | 72 --------------------
.../kinesis/util/KinesisConfigUtilTest.java | 66 ++++++++++++++++++
6 files changed, 147 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9ed5d9a1/docs/dev/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md
index 5fbf24b..1eea308 100644
--- a/docs/dev/connectors/kinesis.md
+++ b/docs/dev/connectors/kinesis.md
@@ -256,23 +256,29 @@ consumer when calling this API can also be modified by using the other keys pref
## Kinesis Producer
-The `FlinkKinesisProducer` is used for putting data from a Flink stream into a Kinesis stream. Note that the producer is not participating in
-Flink's checkpointing and doesn't provide exactly-once processing guarantees.
-Also, the Kinesis producer does not guarantee that records are written in order to the shards (See [here](https://github.com/awslabs/amazon-kinesis-producer/issues/23) and [here](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestSyntax) for more details).
+The `FlinkKinesisProducer` uses [Kinesis Producer Library (KPL)](http://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) to put data from a Flink stream into a Kinesis stream.
+
+Note that the producer is not participating in Flink's checkpointing and doesn't provide exactly-once processing guarantees. Also, the Kinesis producer does not guarantee that records are written in order to the shards (See [here](https://github.com/awslabs/amazon-kinesis-producer/issues/23) and [here](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestSyntax) for more details).
In case of a failure or a resharding, data will be written again to Kinesis, leading to duplicates. This behavior is usually called "at-least-once" semantics.
To put data into a Kinesis stream, make sure the stream is marked as "ACTIVE" in the AWS dashboard.
-For the monitoring to work, the user accessing the stream needs access to the Cloud watch service.
+For the monitoring to work, the user accessing the stream needs access to the CloudWatch service.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
Properties producerConfig = new Properties();
+// Required configs
producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+// Optional configs
+producerConfig.put("AggregationMaxCount", "4294967295");
+producerConfig.put("CollectionMaxCount", "1000");
+producerConfig.put("RecordTtl", "30000");
+producerConfig.put("RequestTimeout", "6000");
FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);
kinesis.setFailOnError(true);
@@ -286,9 +292,15 @@ simpleStringStream.addSink(kinesis);
<div data-lang="scala" markdown="1">
{% highlight scala %}
val producerConfig = new Properties();
+// Required configs
producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+// Optional configs
+producerConfig.put("AggregationMaxCount", "4294967295");
+producerConfig.put("CollectionMaxCount", "1000");
+producerConfig.put("RecordTtl", "30000");
+producerConfig.put("RequestTimeout", "6000");
val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig);
kinesis.setFailOnError(true);
@@ -301,15 +313,14 @@ simpleStringStream.addSink(kinesis);
</div>
</div>
-The above is a simple example of using the producer. Configuration for the producer with the mandatory configuration values is supplied with a `java.util.Properties`
-instance as described above for the consumer. The example demonstrates producing a single Kinesis stream in the AWS region "us-east-1".
+The above is a simple example of using the producer. To initialize `FlinkKinesisProducer`, users are required to pass in `AWS_REGION`, `AWS_ACCESS_KEY_ID`, and `AWS_SECRET_ACCESS_KEY` via a `java.util.Properties` instance. Users can also pass in KPL's configurations as optional parameters to customize the KPL underlying `FlinkKinesisProducer`. The full list of KPL configs and explanations can be found [here](https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties). The example demonstrates producing a single Kinesis stream in the AWS region "us-east-1".
+
+If users don't specify any KPL configs and values, `FlinkKinesisProducer` will use default config values of KPL, except `RateLimit`. `RateLimit` limits the maximum allowed put rate for a shard, as a percentage of the backend limits. KPL's default value is 150 but it makes KPL throw `RateLimitExceededException` too frequently and breaks Flink sink as a result. Thus `FlinkKinesisProducer` overrides KPL's default value to 100.
Instead of a `SerializationSchema`, it also supports a `KinesisSerializationSchema`. The `KinesisSerializationSchema` allows to send the data to multiple streams. This is
done using the `KinesisSerializationSchema.getTargetStream(T element)` method. Returning `null` there will instruct the producer to write the element to the default stream.
Otherwise, the returned stream name is used.
-Other optional configuration keys for the producer can be found in `ProducerConfigConstants`.
-
## Using Non-AWS Kinesis Endpoints for Testing
http://git-wip-us.apache.org/repos/asf/flink/blob/9ed5d9a1/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
index e0d3e38..1f5e64c 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
@@ -19,13 +19,11 @@ package org.apache.flink.streaming.connectors.kinesis;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.PropertiesUtil;
import com.amazonaws.services.kinesis.producer.Attempt;
import com.amazonaws.services.kinesis.producer.KinesisProducer;
@@ -91,7 +89,7 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
* This is a constructor supporting Flink's {@see SerializationSchema}.
*
* @param schema Serialization schema for the data type
- * @param configProps The properties used to configure AWS credentials and AWS region
+ * @param configProps The properties used to configure KinesisProducer, including AWS credentials and AWS region
*/
public FlinkKinesisProducer(final SerializationSchema<OUT> schema, Properties configProps) {
@@ -116,13 +114,11 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
* This is a constructor supporting {@see KinesisSerializationSchema}.
*
* @param schema Kinesis serialization schema for the data type
- * @param configProps The properties used to configure AWS credentials and AWS region
+ * @param configProps The properties used to configure KinesisProducer, including AWS credentials and AWS region
*/
public FlinkKinesisProducer(KinesisSerializationSchema<OUT> schema, Properties configProps) {
- this.configProps = checkNotNull(configProps, "configProps can not be null");
-
- // check the configuration properties for any conflicting settings
- KinesisConfigUtil.validateProducerConfiguration(this.configProps);
+ checkNotNull(configProps, "configProps can not be null");
+ this.configProps = KinesisConfigUtil.replaceDeprecatedProducerKeys(configProps);
checkNotNull(schema, "serialization schema cannot be null");
checkArgument(
@@ -174,18 +170,9 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
- KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration();
-
- producerConfig.setRegion(configProps.getProperty(ProducerConfigConstants.AWS_REGION));
+ // check and pass the configuration properties
+ KinesisProducerConfiguration producerConfig = KinesisConfigUtil.validateProducerConfiguration(configProps);
producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
- if (configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) {
- producerConfig.setCollectionMaxCount(PropertiesUtil.getLong(configProps,
- ProducerConfigConstants.COLLECTION_MAX_COUNT, producerConfig.getCollectionMaxCount(), LOG));
- }
- if (configProps.containsKey(ProducerConfigConstants.AGGREGATION_MAX_COUNT)) {
- producerConfig.setAggregationMaxCount(PropertiesUtil.getLong(configProps,
- ProducerConfigConstants.AGGREGATION_MAX_COUNT, producerConfig.getAggregationMaxCount(), LOG));
- }
producer = new KinesisProducer(producerConfig);
callback = new FutureCallback<UserRecordResult>() {
http://git-wip-us.apache.org/repos/asf/flink/blob/9ed5d9a1/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
index d131150..983687e 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
@@ -20,14 +20,24 @@ package org.apache.flink.streaming.connectors.kinesis.config;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
/**
+ * @deprecated
+ *
* Optional producer specific configuration keys for {@link FlinkKinesisProducer}.
*/
+@Deprecated
public class ProducerConfigConstants extends AWSConfigConstants {
- /** Maximum number of items to pack into an PutRecords request. **/
+ /**
+ * @deprecated
+ *
+ * Deprecated key. **/
+ @Deprecated
public static final String COLLECTION_MAX_COUNT = "aws.producer.collectionMaxCount";
- /** Maximum number of items to pack into an aggregated record. **/
+ /**
+ * @deprecated
+ *
+ * Deprecated key. **/
+ @Deprecated
public static final String AGGREGATION_MAX_COUNT = "aws.producer.aggregationMaxCount";
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9ed5d9a1/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index 42f1af0..997191c 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -26,6 +26,7 @@ import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConsta
import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import java.text.ParseException;
import java.text.SimpleDateFormat;
@@ -38,6 +39,22 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* Utilities for Flink Kinesis connector configuration.
*/
public class KinesisConfigUtil {
+
+ /** Maximum number of items to pack into an PutRecords request. **/
+ protected static final String COLLECTION_MAX_COUNT = "CollectionMaxCount";
+
+ /** Maximum number of items to pack into an aggregated record. **/
+ protected static final String AGGREGATION_MAX_COUNT = "AggregationMaxCount";
+
+ /** Limits the maximum allowed put rate for a shard, as a percentage of the backend limits.
+ * The default value is set as 100% in Flink. KPL's default value is 150% but it makes KPL throw
+ * RateLimitExceededException too frequently and breaks Flink sink as a result.
+ **/
+ private static final String RATE_LIMIT = "RateLimit";
+
+ /** Default values for RateLimit. **/
+ private static final String DEFAULT_RATE_LIMIT = "100";
+
/**
* Validate configuration properties for {@link FlinkKinesisConsumer}.
*/
@@ -127,18 +144,39 @@ public class KinesisConfigUtil {
}
/**
+ * Replace deprecated configuration properties for {@link FlinkKinesisProducer}.
+ * This should be remove along with deprecated keys
+ */
+ public static Properties replaceDeprecatedProducerKeys(Properties configProps) {
+ // Replace deprecated key
+ if (configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) {
+ configProps.setProperty(COLLECTION_MAX_COUNT,
+ configProps.getProperty(ProducerConfigConstants.COLLECTION_MAX_COUNT));
+ configProps.remove(ProducerConfigConstants.COLLECTION_MAX_COUNT);
+ }
+ // Replace deprecated key
+ if (configProps.containsKey(ProducerConfigConstants.AGGREGATION_MAX_COUNT)) {
+ configProps.setProperty(AGGREGATION_MAX_COUNT,
+ configProps.getProperty(ProducerConfigConstants.AGGREGATION_MAX_COUNT));
+ configProps.remove(ProducerConfigConstants.AGGREGATION_MAX_COUNT);
+ }
+ return configProps;
+ }
+
+ /**
* Validate configuration properties for {@link FlinkKinesisProducer}.
*/
- public static void validateProducerConfiguration(Properties config) {
+ public static KinesisProducerConfiguration validateProducerConfiguration(Properties config) {
checkNotNull(config, "config can not be null");
validateAwsConfiguration(config);
- validateOptionalPositiveLongProperty(config, ProducerConfigConstants.COLLECTION_MAX_COUNT,
- "Invalid value given for maximum number of items to pack into a PutRecords request. Must be a valid non-negative long value.");
+ // Override KPL default value if it's not specified by user
+ if (!config.containsKey(RATE_LIMIT)) {
+ config.setProperty(RATE_LIMIT, DEFAULT_RATE_LIMIT);
+ }
- validateOptionalPositiveLongProperty(config, ProducerConfigConstants.AGGREGATION_MAX_COUNT,
- "Invalid value given for maximum number of items to pack into an aggregated record. Must be a valid non-negative long value.");
+ return KinesisProducerConfiguration.fromProperties(config);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/9ed5d9a1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 6af4c62..4a007d5 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -33,7 +33,6 @@ import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
@@ -46,7 +45,6 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGen
import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.flink.util.InstantiationUtil;
import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
@@ -507,76 +505,6 @@ public class FlinkKinesisConsumerTest {
}
// ----------------------------------------------------------------------
- // FlinkKinesisConsumer.validateProducerConfiguration() tests
- // ----------------------------------------------------------------------
-
- @Test
- public void testUnparsableLongForCollectionMaxCountInConfig() {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Invalid value given for maximum number of items to pack into a PutRecords request");
-
- Properties testConfig = new Properties();
- testConfig.setProperty(ProducerConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
- testConfig.setProperty(ProducerConfigConstants.COLLECTION_MAX_COUNT, "unparsableLong");
-
- KinesisConfigUtil.validateProducerConfiguration(testConfig);
- }
-
- @Test
- public void testUnparsableLongForAggregationMaxCountInConfig() {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Invalid value given for maximum number of items to pack into an aggregated record");
-
- Properties testConfig = new Properties();
- testConfig.setProperty(ProducerConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
- testConfig.setProperty(ProducerConfigConstants.AGGREGATION_MAX_COUNT, "unparsableLong");
-
- KinesisConfigUtil.validateProducerConfiguration(testConfig);
- }
-
- // ----------------------------------------------------------------------
- // Tests to verify serializability
- // ----------------------------------------------------------------------
-
- @Test
- public void testCreateWithNonSerializableDeserializerFails() {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("The provided deserialization schema is not serializable");
-
- Properties testConfig = new Properties();
- testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
- new FlinkKinesisConsumer<>("test-stream", new NonSerializableDeserializationSchema(), testConfig);
- }
-
- @Test
- public void testCreateWithSerializableDeserializer() {
- Properties testConfig = new Properties();
- testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
- new FlinkKinesisConsumer<>("test-stream", new SerializableDeserializationSchema(), testConfig);
- }
-
- @Test
- public void testConsumerIsSerializable() {
- Properties testConfig = new Properties();
- testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
- testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
- testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
-
- FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("test-stream", new SimpleStringSchema(), testConfig);
- assertTrue(InstantiationUtil.isSerializable(consumer));
- }
-
- // ----------------------------------------------------------------------
// Tests related to state initialization
// ----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9ed5d9a1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
new file mode 100644
index 0000000..d14ac04
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for KinesisConfigUtil.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({FlinkKinesisConsumer.class, KinesisConfigUtil.class})
+public class KinesisConfigUtilTest {
+ @Rule
+ private ExpectedException exception = ExpectedException.none();
+
+ @Test
+ public void testUnparsableLongForProducerConfiguration() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Error trying to set field RateLimit with the value 'unparsableLong'");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ProducerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty("RateLimit", "unparsableLong");
+
+ KinesisConfigUtil.validateProducerConfiguration(testConfig);
+ }
+
+ @Test
+ public void testReplaceDeprecatedKeys() {
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ProducerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(ProducerConfigConstants.AGGREGATION_MAX_COUNT, "1");
+ testConfig.setProperty(ProducerConfigConstants.COLLECTION_MAX_COUNT, "2");
+ Properties replacedConfig = KinesisConfigUtil.replaceDeprecatedProducerKeys(testConfig);
+
+ assertEquals("1", replacedConfig.getProperty(KinesisConfigUtil.AGGREGATION_MAX_COUNT));
+ assertEquals("2", replacedConfig.getProperty(KinesisConfigUtil.COLLECTION_MAX_COUNT));
+ }
+}