You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/02/03 16:06:17 UTC

[flink] branch master updated: [FLINK-25949][connector/firehose] Fixing wrong Http protocol default value for kinesisfirehoseSinkBuilder

This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b12b749  [FLINK-25949][connector/firehose] Fixing wrong Http protocol default value for kinesisfirehoseSinkBuilder
b12b749 is described below

commit b12b749a3da491dd9760d7ce806b67cb8411d3f9
Author: Ahmed Hamdy <va...@amazon.com>
AuthorDate: Thu Feb 3 12:26:53 2022 +0000

    [FLINK-25949][connector/firehose] Fixing wrong Http protocol default value for kinesisfirehoseSinkBuilder
---
 .../connector/firehose/sink/KinesisFirehoseSinkBuilder.java |  6 ++++--
 .../firehose/sink/KinesisFirehoseSinkBuilderTest.java       | 13 +++++++++++++
 2 files changed, 17 insertions(+), 2 deletions(-)

diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java
index a180abc..087fd6b 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilder.java
@@ -18,6 +18,7 @@
 package org.apache.flink.connector.firehose.sink;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
 
@@ -135,10 +136,11 @@ public class KinesisFirehoseSinkBuilder<InputT>
         return this;
     }
 
-    private Properties getClientPropertiesWithDefaultHttpProtocol() {
+    @VisibleForTesting
+    Properties getClientPropertiesWithDefaultHttpProtocol() {
         Properties clientProperties =
                 Optional.ofNullable(firehoseClientProperties).orElse(new Properties());
-        clientProperties.putIfAbsent(HTTP_PROTOCOL_VERSION, DEFAULT_HTTP_PROTOCOL);
+        clientProperties.putIfAbsent(HTTP_PROTOCOL_VERSION, DEFAULT_HTTP_PROTOCOL.toString());
         return clientProperties;
     }
 
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java
index 88f4329..490b381 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkBuilderTest.java
@@ -19,10 +19,13 @@ package org.apache.flink.connector.firehose.sink;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
 
 import org.assertj.core.api.Assertions;
 import org.junit.Test;
 
+import java.util.Properties;
+
 /** Covers construction, defaults and sanity checking of {@link KinesisFirehoseSinkBuilder}. */
 public class KinesisFirehoseSinkBuilderTest {
 
@@ -65,4 +68,14 @@ public class KinesisFirehoseSinkBuilderTest {
                 .withMessageContaining(
                         "The delivery stream name must be set when initializing the KDF Sink.");
     }
+
+    @Test
+    public void defaultProtocolVersionInsertedToConfiguration() {
+        Properties expectedProps = new Properties();
+        expectedProps.setProperty(AWSConfigConstants.HTTP_PROTOCOL_VERSION, "HTTP1_1");
+        Properties defaultProperties =
+                KinesisFirehoseSink.<String>builder().getClientPropertiesWithDefaultHttpProtocol();
+
+        Assertions.assertThat(defaultProperties).isEqualTo(expectedProps);
+    }
 }