You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/02/03 16:06:17 UTC
[flink] branch master updated: [FLINK-25949][connector/firehose] Fixing wrong Http protocol default value for kinesisfirehoseSinkBuilder
This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b12b749 [FLINK-25949][connector/firehose] Fixing wrong Http protocol default value for kinesisfirehoseSinkBuilder
b12b749 is described below
commit b12b749a3da491dd9760d7ce806b67cb8411d3f9
Author: Ahmed Hamdy <va...@amazon.com>
AuthorDate: Thu Feb 3 12:26:53 2022 +0000
[FLINK-25949][connector/firehose] Fixing wrong Http protocol default value for kinesisfirehoseSinkBuilder
---
.../connector/firehose/sink/KinesisFirehoseSinkBuilder.java | 6 ++++--
.../firehose/sink/KinesisFirehoseSinkBuilderTest.java | 13 +++++++++++++
2 files changed, 17 insertions(+), 2 deletions(-)
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java
index a180abc..087fd6b 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.firehose.sink;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
@@ -135,10 +136,11 @@ public class KinesisFirehoseSinkBuilder<InputT>
return this;
}
- private Properties getClientPropertiesWithDefaultHttpProtocol() {
+ @VisibleForTesting
+ Properties getClientPropertiesWithDefaultHttpProtocol() {
Properties clientProperties =
Optional.ofNullable(firehoseClientProperties).orElse(new Properties());
- clientProperties.putIfAbsent(HTTP_PROTOCOL_VERSION, DEFAULT_HTTP_PROTOCOL);
+ clientProperties.putIfAbsent(HTTP_PROTOCOL_VERSION, DEFAULT_HTTP_PROTOCOL.toString());
return clientProperties;
}
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java
index 88f4329..490b381 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java
@@ -19,10 +19,13 @@ package org.apache.flink.connector.firehose.sink;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
import org.assertj.core.api.Assertions;
import org.junit.Test;
+import java.util.Properties;
+
/** Covers construction, defaults and sanity checking of {@link KinesisFirehoseSinkBuilder}. */
public class KinesisFirehoseSinkBuilderTest {
@@ -65,4 +68,14 @@ public class KinesisFirehoseSinkBuilderTest {
.withMessageContaining(
"The delivery stream name must be set when initializing the KDF Sink.");
}
+
+ @Test
+ public void defaultProtocolVersionInsertedToConfiguration() {
+ Properties expectedProps = new Properties();
+ expectedProps.setProperty(AWSConfigConstants.HTTP_PROTOCOL_VERSION, "HTTP1_1");
+ Properties defaultProperties =
+ KinesisFirehoseSink.<String>builder().getClientPropertiesWithDefaultHttpProtocol();
+
+ Assertions.assertThat(defaultProperties).isEqualTo(expectedProps);
+ }
}