You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/09/03 13:17:07 UTC
[flink] 04/06: [FLINK-18513][Kinesis] Inverting dependency control
of KinesisProxyV2
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3423bb8b797df54d2b1f7b1af4020629d25cf8b9
Author: Danny Cranmer <cr...@amazon.com>
AuthorDate: Mon Jul 27 16:39:07 2020 +0100
[FLINK-18513][Kinesis] Inverting dependency control of KinesisProxyV2
This closes #12944.
---
.../connectors/kinesis/proxy/KinesisProxyV2.java | 24 ++++------------------
.../connectors/kinesis/util/AwsV2Util.java | 14 +++++++++++++
2 files changed, 18 insertions(+), 20 deletions(-)
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
index 8c7fb52..d1310e5 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
@@ -18,14 +18,10 @@
package org.apache.flink.streaming.connectors.kinesis.proxy;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.connectors.kinesis.util.AwsV2Util;
+import org.apache.flink.util.Preconditions;
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.ClientConfigurationFactory;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
-import java.util.Properties;
-
/**
* Kinesis proxy implementation using AWS SDK v2.x - a utility class that is used as a proxy to make
* calls to AWS Kinesis for several EFO (Enhanced Fan Out) functions, such as de-/registering stream consumers,
@@ -39,22 +35,10 @@ public class KinesisProxyV2 implements KinesisProxyV2Interface {
/**
* Create a new KinesisProxyV2 based on the supplied configuration properties.
*
- * @param configProps configuration properties containing AWS credential and AWS region info
- */
- public KinesisProxyV2(final Properties configProps) {
- this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
- }
-
- /**
- * Create the Kinesis client, using the provided configuration properties.
- * Derived classes can override this method to customize the client configuration.
- *
- * @param configProps the properties map used to create the Kinesis Client
- * @return a Kinesis Client
+ * @param kinesisAsyncClient the kinesis async client used to communicate with Kinesis
*/
- protected KinesisAsyncClient createKinesisAsyncClient(final Properties configProps) {
- final ClientConfiguration config = new ClientConfigurationFactory().getConfig();
- return AwsV2Util.createKinesisAsyncClient(configProps, config);
+ public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
+ this.kinesisAsyncClient = Preconditions.checkNotNull(kinesisAsyncClient);
}
}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
index f5d95df..c4073c3 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider;
import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ClientConfigurationFactory;
import com.amazonaws.auth.AWSCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
@@ -61,6 +62,19 @@ public class AwsV2Util {
* - https://github.com/aws/aws-sdk-java-v2/blob/2.13.52/docs/LaunchChangelog.md#134-client-override-retry-configuration
*
* @param configProps configuration properties
+ * @return a new Amazon Kinesis Client
+ */
+ public static KinesisAsyncClient createKinesisAsyncClient(final Properties configProps) {
+ final ClientConfiguration config = new ClientConfigurationFactory().getConfig();
+ return createKinesisAsyncClient(configProps, config);
+ }
+
+ /**
+ * Creates an Amazon Kinesis Async Client from the provided properties.
+ * Configuration is copied from AWS SDK v1 configuration class as per:
+ * - https://github.com/aws/aws-sdk-java-v2/blob/2.13.52/docs/LaunchChangelog.md#134-client-override-retry-configuration
+ *
+ * @param configProps configuration properties
* @param config the AWS SDK v1.x client configuration used to create the client
* @return a new Amazon Kinesis Client
*/