You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/09/06 15:58:42 UTC

[flink] branch release-1.15 updated: [FLINK-29205][connectors/kinesis] Passthrough use config to HTTP client when constructing Async Client for Kinesis EFO

This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new e1cbf9ed54c [FLINK-29205][connectors/kinesis] Passthrough use config to HTTP client when constructing Async Client for Kinesis EFO
e1cbf9ed54c is described below

commit e1cbf9ed54c04118f3d6f6a89fc23599be0a0bb7
Author: Danny Cranmer <da...@apache.org>
AuthorDate: Tue Sep 6 13:14:39 2022 +0100

    [FLINK-29205][connectors/kinesis] Passthrough use config to HTTP client when constructing Async Client for Kinesis EFO
---
 .../connectors/kinesis/proxy/KinesisProxyV2Factory.java     | 10 ++++------
 .../connectors/kinesis/util/KinesisConfigUtil.java          | 13 +++++++++++++
 .../connectors/kinesis/util/KinesisConfigUtilTest.java      | 12 ++++++++++++
 3 files changed, 29 insertions(+), 6 deletions(-)

diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java
index f8cf6371519..24c3cb3b6d6 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java
@@ -23,6 +23,7 @@ import org.apache.flink.connector.aws.util.AWSGeneralUtil;
 import org.apache.flink.connector.kinesis.sink.KinesisStreamsConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisherConfiguration;
 import org.apache.flink.streaming.connectors.kinesis.util.AwsV2Util;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.apache.flink.util.Preconditions;
 
 import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
@@ -61,15 +62,12 @@ public class KinesisProxyV2Factory {
         final FanOutRecordPublisherConfiguration configuration =
                 new FanOutRecordPublisherConfiguration(configProps, emptyList());
 
-        Properties legacyConfigProps = new Properties(configProps);
-        legacyConfigProps.setProperty(
-                KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX,
-                AWSAsyncSinkUtil.formatFlinkUserAgentPrefix(
-                        KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT));
+        Properties asyncClientProperties =
+                KinesisConfigUtil.getV2ConsumerAsyncClientProperties(configProps);
 
         final KinesisAsyncClient client =
                 AWSAsyncSinkUtil.createAwsAsyncClient(
-                        legacyConfigProps,
+                        asyncClientProperties,
                         httpClient,
                         KinesisAsyncClient.builder(),
                         KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT,
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index 9f6965a7957..dbcfcfef4a7 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -18,7 +18,9 @@
 package org.apache.flink.streaming.connectors.kinesis.util;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
 import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.kinesis.sink.KinesisStreamsConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
 import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
@@ -537,6 +539,17 @@ public class KinesisConfigUtil {
         }
     }
 
+    public static Properties getV2ConsumerAsyncClientProperties(final Properties configProps) {
+        Properties asyncClientProperties = new Properties();
+        asyncClientProperties.putAll(configProps);
+        asyncClientProperties.setProperty(
+                KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX,
+                AWSAsyncSinkUtil.formatFlinkUserAgentPrefix(
+                        KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT));
+
+        return asyncClientProperties;
+    }
+
     private static void validateOptionalPositiveLongProperty(
             Properties config, String key, String message) {
         if (config.containsKey(key)) {
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
index fa26cfb6565..085d9268d20 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
@@ -41,6 +41,7 @@ import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfi
 import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_HTTP_CLIENT_MAX_CONCURRENCY;
 import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP;
 import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -1014,4 +1015,15 @@ public class KinesisConfigUtilTest {
 
         KinesisConfigUtil.validateConsumerConfiguration(testConfig);
     }
+
+    @Test
+    public void testGetV2ConsumerAsyncClientProperties() {
+        Properties properties = new Properties();
+        properties.setProperty("retained", "property");
+
+        assertThat(KinesisConfigUtil.getV2ConsumerAsyncClientProperties(properties))
+                .containsEntry("retained", "property")
+                .containsKey("aws.kinesis.client.user-agent-prefix")
+                .hasSize(2);
+    }
 }