You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/09/07 06:19:30 UTC
[6/6] flink git commit: [FLINK-7363] [kinesis] Clean up deprecation
of ProducerConfigConstants
[FLINK-7363] [kinesis] Clean up deprecation of ProducerConfigConstants
- Improve deprecation message in Javadocs
- Remove usage of ProducerConfigConstants in code wherever possible
- Remove usage of ProducerConfigConstants in documentation code snippets
This closes #4473.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59eab454
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59eab454
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59eab454
Branch: refs/heads/master
Commit: 59eab45458b3b1637ccbc5dafd326cc84ffb9655
Parents: 9ed5d9a
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Tue Aug 15 13:46:29 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Sep 7 12:54:33 2017 +0800
----------------------------------------------------------------------
docs/dev/connectors/kinesis.md | 36 ++++++++++----------
.../kinesis/config/ProducerConfigConstants.java | 19 +++++++----
.../kinesis/examples/ProduceIntoKinesis.java | 8 ++---
.../manualtests/ManualConsumerProducerTest.java | 8 ++---
.../kinesis/manualtests/ManualProducerTest.java | 8 ++---
.../kinesis/util/KinesisConfigUtilTest.java | 6 ++--
6 files changed, 47 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/59eab454/docs/dev/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md
index 1eea308..3ffe1c4 100644
--- a/docs/dev/connectors/kinesis.md
+++ b/docs/dev/connectors/kinesis.md
@@ -271,9 +271,9 @@ For the monitoring to work, the user accessing the stream needs access to the Cl
{% highlight java %}
Properties producerConfig = new Properties();
// Required configs
-producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
-producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
-producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
// Optional configs
producerConfig.put("AggregationMaxCount", "4294967295");
producerConfig.put("CollectionMaxCount", "1000");
@@ -293,10 +293,10 @@ simpleStringStream.addSink(kinesis);
{% highlight scala %}
val producerConfig = new Properties();
// Required configs
-producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
-producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
-producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
-// Optional configs
+producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+// Optional KPL configs
producerConfig.put("AggregationMaxCount", "4294967295");
producerConfig.put("CollectionMaxCount", "1000");
producerConfig.put("RecordTtl", "30000");
@@ -328,29 +328,29 @@ It is sometimes desirable to have Flink operate as a consumer or producer agains
[Kinesalite](https://github.com/mhart/kinesalite); this is especially useful when performing functional testing of a Flink
application. The AWS endpoint that would normally be inferred by the AWS region set in the Flink configuration must be overridden via a configuration property.
-To override the AWS endpoint, taking the producer for example, set the `ProducerConfigConstants.AWS_ENDPOINT` property in the
-Flink configuration, in addition to the `ProducerConfigConstants.AWS_REGION` required by Flink. Although the region is
+To override the AWS endpoint, taking the producer for example, set the `AWSConfigConstants.AWS_ENDPOINT` property in the
+Flink configuration, in addition to the `AWSConfigConstants.AWS_REGION` required by Flink. Although the region is
required, it will not be used to determine the AWS endpoint URL.
-The following example shows how one might supply the `ProducerConfigConstants.AWS_ENDPOINT` configuration property:
+The following example shows how one might supply the `AWSConfigConstants.AWS_ENDPOINT` configuration property:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
Properties producerConfig = new Properties();
-producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
-producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
-producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
-producerConfig.put(ProducerConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
+producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val producerConfig = new Properties();
-producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
-producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
-producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
-producerConfig.put(ProducerConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
+producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
{% endhighlight %}
</div>
</div>
http://git-wip-us.apache.org/repos/asf/flink/blob/59eab454/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
index 983687e..d66bb90 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
@@ -20,24 +20,31 @@ package org.apache.flink.streaming.connectors.kinesis.config;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
/**
- * @deprecated
- *
* Optional producer specific configuration keys for {@link FlinkKinesisProducer}.
+ *
+ * @deprecated This class is deprecated in favor of the official AWS Kinesis producer configuration keys.
+ * See <a href="https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties">
+ * here</a> for the full list of available configs.
+ * For configuring the region and credentials, please use the keys in {@link AWSConfigConstants}.
*/
@Deprecated
public class ProducerConfigConstants extends AWSConfigConstants {
/**
- * @deprecated
+ * Deprecated key.
*
- * Deprecated key. **/
+ * @deprecated This is deprecated in favor of the official AWS Kinesis producer configuration keys.
+ * Please use {@code CollectionMaxCount} instead.
+ **/
@Deprecated
public static final String COLLECTION_MAX_COUNT = "aws.producer.collectionMaxCount";
/**
- * @deprecated
+ * Deprecated key.
*
- * Deprecated key. **/
+ * @deprecated This is deprecated in favor of the official AWS Kinesis producer configuration keys.
+ * Please use {@code AggregationMaxCount} instead.
+ **/
@Deprecated
public static final String AGGREGATION_MAX_COUNT = "aws.producer.aggregationMaxCount";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59eab454/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
index ee031eb..8d21c2c 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
@@ -22,7 +22,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
-import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.commons.lang3.RandomStringUtils;
@@ -43,9 +43,9 @@ public class ProduceIntoKinesis {
DataStream<String> simpleStringStream = see.addSource(new EventsGenerator());
Properties kinesisProducerConfig = new Properties();
- kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_REGION, pt.getRequired("region"));
- kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
- kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));
+ kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_REGION, pt.getRequired("region"));
+ kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
+ kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));
FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(
new SimpleStringSchema(), kinesisProducerConfig);
http://git-wip-us.apache.org/repos/asf/flink/blob/59eab454/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
index 2915e2f..a7470dc 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
@@ -25,8 +25,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.examples.ProduceIntoKinesis;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
@@ -56,9 +56,9 @@ public class ManualConsumerProducerTest {
DataStream<String> simpleStringStream = see.addSource(new ProduceIntoKinesis.EventsGenerator());
Properties kinesisProducerConfig = new Properties();
- kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_REGION, pt.getRequired("region"));
- kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
- kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));
+ kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_REGION, pt.getRequired("region"));
+ kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
+ kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));
FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(
new KinesisSerializationSchema<String>() {
http://git-wip-us.apache.org/repos/asf/flink/blob/59eab454/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
index 8abf4bb..fb49169 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner;
-import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.examples.ProduceIntoKinesis;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
@@ -53,9 +53,9 @@ public class ManualProducerTest {
DataStream<String> simpleStringStream = see.addSource(new ProduceIntoKinesis.EventsGenerator());
Properties kinesisProducerConfig = new Properties();
- kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_REGION, pt.getRequired("region"));
- kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
- kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));
+ kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_REGION, pt.getRequired("region"));
+ kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
+ kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));
FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(
new KinesisSerializationSchema<String>() {
http://git-wip-us.apache.org/repos/asf/flink/blob/59eab454/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
index d14ac04..3b00058 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.kinesis.util;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
import org.junit.Rule;
@@ -46,7 +47,7 @@ public class KinesisConfigUtilTest {
exception.expectMessage("Error trying to set field RateLimit with the value 'unparsableLong'");
Properties testConfig = new Properties();
- testConfig.setProperty(ProducerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
testConfig.setProperty("RateLimit", "unparsableLong");
KinesisConfigUtil.validateProducerConfiguration(testConfig);
@@ -55,7 +56,8 @@ public class KinesisConfigUtilTest {
@Test
public void testReplaceDeprecatedKeys() {
Properties testConfig = new Properties();
- testConfig.setProperty(ProducerConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+ // these deprecated keys should be replaced
testConfig.setProperty(ProducerConfigConstants.AGGREGATION_MAX_COUNT, "1");
testConfig.setProperty(ProducerConfigConstants.COLLECTION_MAX_COUNT, "2");
Properties replacedConfig = KinesisConfigUtil.replaceDeprecatedProducerKeys(testConfig);