You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/09/06 15:58:42 UTC
[flink] branch release-1.15 updated: [FLINK-29205][connectors/kinesis] Passthrough use config to HTTP client when constructing Async Client for Kinesis EFO
This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new e1cbf9ed54c [FLINK-29205][connectors/kinesis] Passthrough use config to HTTP client when constructing Async Client for Kinesis EFO
e1cbf9ed54c is described below
commit e1cbf9ed54c04118f3d6f6a89fc23599be0a0bb7
Author: Danny Cranmer <da...@apache.org>
AuthorDate: Tue Sep 6 13:14:39 2022 +0100
[FLINK-29205][connectors/kinesis] Passthrough use config to HTTP client when constructing Async Client for Kinesis EFO
---
.../connectors/kinesis/proxy/KinesisProxyV2Factory.java | 10 ++++------
.../connectors/kinesis/util/KinesisConfigUtil.java | 13 +++++++++++++
.../connectors/kinesis/util/KinesisConfigUtilTest.java | 12 ++++++++++++
3 files changed, 29 insertions(+), 6 deletions(-)
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java
index f8cf6371519..24c3cb3b6d6 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java
@@ -23,6 +23,7 @@ import org.apache.flink.connector.aws.util.AWSGeneralUtil;
import org.apache.flink.connector.kinesis.sink.KinesisStreamsConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisherConfiguration;
import org.apache.flink.streaming.connectors.kinesis.util.AwsV2Util;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
import org.apache.flink.util.Preconditions;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
@@ -61,15 +62,12 @@ public class KinesisProxyV2Factory {
final FanOutRecordPublisherConfiguration configuration =
new FanOutRecordPublisherConfiguration(configProps, emptyList());
- Properties legacyConfigProps = new Properties(configProps);
- legacyConfigProps.setProperty(
- KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX,
- AWSAsyncSinkUtil.formatFlinkUserAgentPrefix(
- KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT));
+ Properties asyncClientProperties =
+ KinesisConfigUtil.getV2ConsumerAsyncClientProperties(configProps);
final KinesisAsyncClient client =
AWSAsyncSinkUtil.createAwsAsyncClient(
- legacyConfigProps,
+ asyncClientProperties,
httpClient,
KinesisAsyncClient.builder(),
KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT,
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index 9f6965a7957..dbcfcfef4a7 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -18,7 +18,9 @@
package org.apache.flink.streaming.connectors.kinesis.util;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.kinesis.sink.KinesisStreamsConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
@@ -537,6 +539,17 @@ public class KinesisConfigUtil {
}
}
+ public static Properties getV2ConsumerAsyncClientProperties(final Properties configProps) {
+ Properties asyncClientProperties = new Properties();
+ asyncClientProperties.putAll(configProps);
+ asyncClientProperties.setProperty(
+ KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX,
+ AWSAsyncSinkUtil.formatFlinkUserAgentPrefix(
+ KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT));
+
+ return asyncClientProperties;
+ }
+
private static void validateOptionalPositiveLongProperty(
Properties config, String key, String message) {
if (config.containsKey(key)) {
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
index fa26cfb6565..085d9268d20 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
@@ -41,6 +41,7 @@ import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfi
import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_HTTP_CLIENT_MAX_CONCURRENCY;
import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP;
import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -1014,4 +1015,15 @@ public class KinesisConfigUtilTest {
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
}
+
+ @Test
+ public void testGetV2ConsumerAsyncClientProperties() {
+ Properties properties = new Properties();
+ properties.setProperty("retained", "property");
+
+ assertThat(KinesisConfigUtil.getV2ConsumerAsyncClientProperties(properties))
+ .containsEntry("retained", "property")
+ .containsKey("aws.kinesis.client.user-agent-prefix")
+ .hasSize(2);
+ }
}