You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/09/07 06:19:30 UTC

[6/6] flink git commit: [FLINK-7363] [kinesis] Clean up deprecation of ProducerConfigConstants

[FLINK-7363] [kinesis] Clean up deprecation of ProducerConfigConstants

- Improve deprecation message in Javadocs
- Remove usage of ProducerConfigConstants in code wherever possible
- Remove usage of ProducerConfigConstants in documentation code snippets

This closes #4473.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59eab454
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59eab454
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59eab454

Branch: refs/heads/master
Commit: 59eab45458b3b1637ccbc5dafd326cc84ffb9655
Parents: 9ed5d9a
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Tue Aug 15 13:46:29 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Sep 7 12:54:33 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/kinesis.md                  | 36 ++++++++++----------
 .../kinesis/config/ProducerConfigConstants.java | 19 +++++++----
 .../kinesis/examples/ProduceIntoKinesis.java    |  8 ++---
 .../manualtests/ManualConsumerProducerTest.java |  8 ++---
 .../kinesis/manualtests/ManualProducerTest.java |  8 ++---
 .../kinesis/util/KinesisConfigUtilTest.java     |  6 ++--
 6 files changed, 47 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/59eab454/docs/dev/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md
index 1eea308..3ffe1c4 100644
--- a/docs/dev/connectors/kinesis.md
+++ b/docs/dev/connectors/kinesis.md
@@ -271,9 +271,9 @@ For the monitoring to work, the user accessing the stream needs access to the Cl
 {% highlight java %}
 Properties producerConfig = new Properties();
 // Required configs
-producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
-producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
-producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
 // Optional configs
 producerConfig.put("AggregationMaxCount", "4294967295");
 producerConfig.put("CollectionMaxCount", "1000");
@@ -293,10 +293,10 @@ simpleStringStream.addSink(kinesis);
 {% highlight scala %}
 val producerConfig = new Properties();
 // Required configs
-producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
-producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
-producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
-// Optional configs
+producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+// Optional KPL configs
 producerConfig.put("AggregationMaxCount", "4294967295");
 producerConfig.put("CollectionMaxCount", "1000");
 producerConfig.put("RecordTtl", "30000");
@@ -328,29 +328,29 @@ It is sometimes desirable to have Flink operate as a consumer or producer agains
 [Kinesalite](https://github.com/mhart/kinesalite); this is especially useful when performing functional testing of a Flink
 application. The AWS endpoint that would normally be inferred by the AWS region set in the Flink configuration must be overridden via a configuration property.
 
-To override the AWS endpoint, taking the producer for example, set the `ProducerConfigConstants.AWS_ENDPOINT` property in the
-Flink configuration, in addition to the `ProducerConfigConstants.AWS_REGION` required by Flink. Although the region is
+To override the AWS endpoint, taking the producer for example, set the `AWSConfigConstants.AWS_ENDPOINT` property in the
+Flink configuration, in addition to the `AWSConfigConstants.AWS_REGION` required by Flink. Although the region is
 required, it will not be used to determine the AWS endpoint URL.
 
-The following example shows how one might supply the `ProducerConfigConstants.AWS_ENDPOINT` configuration property:
+The following example shows how one might supply the `AWSConfigConstants.AWS_ENDPOINT` configuration property:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 Properties producerConfig = new Properties();
-producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
-producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
-producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
-producerConfig.put(ProducerConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
+producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 val producerConfig = new Properties();
-producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
-producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
-producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
-producerConfig.put(ProducerConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
+producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
 {% endhighlight %}
 </div>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/59eab454/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
index 983687e..d66bb90 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
@@ -20,24 +20,31 @@ package org.apache.flink.streaming.connectors.kinesis.config;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
 
 /**
- * @deprecated
- *
  * Optional producer specific configuration keys for {@link FlinkKinesisProducer}.
+ *
+ * @deprecated This class is deprecated in favor of the official AWS Kinesis producer configuration keys.
+ *             See <a href="https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties">
+ *             here</a> for the full list of available configs.
+ *             For configuring the region and credentials, please use the keys in {@link AWSConfigConstants}.
  */
 @Deprecated
 public class ProducerConfigConstants extends AWSConfigConstants {
 
 	/**
-	 * @deprecated
+	 * Deprecated key.
 	 *
-	 * Deprecated key. **/
+	 * @deprecated This is deprecated in favor of the official AWS Kinesis producer configuration keys.
+	 *             Please use {@code CollectionMaxCount} instead.
+	 **/
 	@Deprecated
 	public static final String COLLECTION_MAX_COUNT = "aws.producer.collectionMaxCount";
 
  	/**
-	 * @deprecated
+	 * Deprecated key.
 	 *
-	 * Deprecated key. **/
+	 * @deprecated This is deprecated in favor of the official AWS Kinesis producer configuration keys.
+	 *             Please use {@code AggregationMaxCount} instead.
+	 **/
 	@Deprecated
 	public static final String AGGREGATION_MAX_COUNT = "aws.producer.aggregationMaxCount";
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/59eab454/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
index ee031eb..8d21c2c 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
@@ -22,7 +22,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
-import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 
 import org.apache.commons.lang3.RandomStringUtils;
@@ -43,9 +43,9 @@ public class ProduceIntoKinesis {
 		DataStream<String> simpleStringStream = see.addSource(new EventsGenerator());
 
 		Properties kinesisProducerConfig = new Properties();
-		kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_REGION, pt.getRequired("region"));
-		kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
-		kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));
+		kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_REGION, pt.getRequired("region"));
+		kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
+		kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));
 
 		FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(
 				new SimpleStringSchema(), kinesisProducerConfig);

http://git-wip-us.apache.org/repos/asf/flink/blob/59eab454/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
index 2915e2f..a7470dc 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
@@ -25,8 +25,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
 import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.examples.ProduceIntoKinesis;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
@@ -56,9 +56,9 @@ public class ManualConsumerProducerTest {
 		DataStream<String> simpleStringStream = see.addSource(new ProduceIntoKinesis.EventsGenerator());
 
 		Properties kinesisProducerConfig = new Properties();
-		kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_REGION, pt.getRequired("region"));
-		kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
-		kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));
+		kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_REGION, pt.getRequired("region"));
+		kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
+		kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));
 
 		FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(
 				new KinesisSerializationSchema<String>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/59eab454/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
index 8abf4bb..fb49169 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
 import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner;
-import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.examples.ProduceIntoKinesis;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
 
@@ -53,9 +53,9 @@ public class ManualProducerTest {
 		DataStream<String> simpleStringStream = see.addSource(new ProduceIntoKinesis.EventsGenerator());
 
 		Properties kinesisProducerConfig = new Properties();
-		kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_REGION, pt.getRequired("region"));
-		kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
-		kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));
+		kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_REGION, pt.getRequired("region"));
+		kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
+		kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));
 
 		FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(
 				new KinesisSerializationSchema<String>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/59eab454/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
index d14ac04..3b00058 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.kinesis.util;
 
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
 
 import org.junit.Rule;
@@ -46,7 +47,7 @@ public class KinesisConfigUtilTest {
 		exception.expectMessage("Error trying to set field RateLimit with the value 'unparsableLong'");
 
 		Properties testConfig = new Properties();
-		testConfig.setProperty(ProducerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
 		testConfig.setProperty("RateLimit", "unparsableLong");
 
 		KinesisConfigUtil.validateProducerConfiguration(testConfig);
@@ -55,7 +56,8 @@ public class KinesisConfigUtilTest {
 	@Test
 	public void testReplaceDeprecatedKeys() {
 		Properties testConfig = new Properties();
-		testConfig.setProperty(ProducerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+		// these deprecated keys should be replaced
 		testConfig.setProperty(ProducerConfigConstants.AGGREGATION_MAX_COUNT, "1");
 		testConfig.setProperty(ProducerConfigConstants.COLLECTION_MAX_COUNT, "2");
 		Properties replacedConfig = KinesisConfigUtil.replaceDeprecatedProducerKeys(testConfig);