You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/09/03 13:17:07 UTC

[flink] 04/06: [FLINK-18513][Kinesis] Inverting dependency control of KinesisProxyV2

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3423bb8b797df54d2b1f7b1af4020629d25cf8b9
Author: Danny Cranmer <cr...@amazon.com>
AuthorDate: Mon Jul 27 16:39:07 2020 +0100

    [FLINK-18513][Kinesis] Inverting dependency control of KinesisProxyV2
    
    This closes #12944.
---
 .../connectors/kinesis/proxy/KinesisProxyV2.java   | 24 ++++------------------
 .../connectors/kinesis/util/AwsV2Util.java         | 14 +++++++++++++
 2 files changed, 18 insertions(+), 20 deletions(-)

diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
index 8c7fb52..d1310e5 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
@@ -18,14 +18,10 @@
 package org.apache.flink.streaming.connectors.kinesis.proxy;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.connectors.kinesis.util.AwsV2Util;
+import org.apache.flink.util.Preconditions;
 
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.ClientConfigurationFactory;
 import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
 
-import java.util.Properties;
-
 /**
  * Kinesis proxy implementation using AWS SDK v2.x - a utility class that is used as a proxy to make
  * calls to AWS Kinesis for several EFO (Enhanced Fan Out) functions, such as de-/registering stream consumers,
@@ -39,22 +35,10 @@ public class KinesisProxyV2 implements KinesisProxyV2Interface {
 	/**
 	 * Create a new KinesisProxyV2 based on the supplied configuration properties.
 	 *
-	 * @param configProps configuration properties containing AWS credential and AWS region info
-	 */
-	public KinesisProxyV2(final Properties configProps) {
-		this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
-	}
-
-	/**
-	 * Create the Kinesis client, using the provided configuration properties.
-	 * Derived classes can override this method to customize the client configuration.
-	 *
-	 * @param configProps the properties map used to create the Kinesis Client
-	 * @return a Kinesis Client
+	 * @param kinesisAsyncClient the kinesis async client used to communicate with Kinesis
 	 */
-	protected KinesisAsyncClient createKinesisAsyncClient(final Properties configProps) {
-		final ClientConfiguration config = new ClientConfigurationFactory().getConfig();
-		return AwsV2Util.createKinesisAsyncClient(configProps, config);
+	public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
+		this.kinesisAsyncClient = Preconditions.checkNotNull(kinesisAsyncClient);
 	}
 
 }
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
index f5d95df..c4073c3 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider;
 
 import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ClientConfigurationFactory;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
@@ -61,6 +62,19 @@ public class AwsV2Util {
 	 * - https://github.com/aws/aws-sdk-java-v2/blob/2.13.52/docs/LaunchChangelog.md#134-client-override-retry-configuration
 	 *
 	 * @param configProps configuration properties
+	 * @return a new Amazon Kinesis Client
+	 */
+	public static KinesisAsyncClient createKinesisAsyncClient(final Properties configProps) {
+		final ClientConfiguration config = new ClientConfigurationFactory().getConfig();
+		return createKinesisAsyncClient(configProps, config);
+	}
+
+	/**
+	 * Creates an Amazon Kinesis Async Client from the provided properties.
+	 * Configuration is copied from AWS SDK v1 configuration class as per:
+	 * - https://github.com/aws/aws-sdk-java-v2/blob/2.13.52/docs/LaunchChangelog.md#134-client-override-retry-configuration
+	 *
+	 * @param configProps configuration properties
 	 * @param config the AWS SDK v1.x client configuration used to create the client
 	 * @return a new Amazon Kinesis Client
 	 */