You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/10 08:09:39 UTC

[flink-statefun] branch master updated: [FLINK-16518] [kafka] Set client properties as strings in KafkaSinkProvider

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git


The following commit(s) were added to refs/heads/master by this push:
     new ab491a8  [FLINK-16518] [kafka] Set client properties as strings in KafkaSinkProvider
ab491a8 is described below

commit ab491a80db6e0ac5bb44d7a19f99d8bb88f7a1e9
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Mar 10 10:56:44 2020 +0800

    [FLINK-16518] [kafka] Set client properties as strings in KafkaSinkProvider
    
    This closes #55.
---
 .../flink/statefun/flink/io/kafka/KafkaEgressSpecJsonParser.java   | 2 +-
 .../flink/statefun/flink/io/kafka/KafkaIngressSpecJsonParser.java  | 2 +-
 .../apache/flink/statefun/flink/io/kafka/KafkaSinkProvider.java    | 7 +++++--
 .../org/apache/flink/statefun/sdk/kafka/KafkaEgressBuilder.java    | 4 ++--
 .../org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.java   | 4 +++-
 5 files changed, 12 insertions(+), 7 deletions(-)

diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaEgressSpecJsonParser.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaEgressSpecJsonParser.java
index a61a816..81f2947 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaEgressSpecJsonParser.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaEgressSpecJsonParser.java
@@ -50,7 +50,7 @@ final class KafkaEgressSpecJsonParser {
   static Properties kafkaClientProperties(JsonNode json) {
     Map<String, String> kvs = Selectors.propertiesAt(json, PROPERTIES_POINTER);
     Properties properties = new Properties();
-    kvs.forEach(properties::put);
+    kvs.forEach(properties::setProperty);
     return properties;
   }
 
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaIngressSpecJsonParser.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaIngressSpecJsonParser.java
index a9f729a..91fbc50 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaIngressSpecJsonParser.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaIngressSpecJsonParser.java
@@ -99,7 +99,7 @@ final class KafkaIngressSpecJsonParser {
   static Properties kafkaClientProperties(JsonNode json) {
     Map<String, String> kvs = Selectors.propertiesAt(json, PROPERTIES_POINTER);
     Properties properties = new Properties();
-    kvs.forEach(properties::put);
+    kvs.forEach(properties::setProperty);
     return properties;
   }
 
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaSinkProvider.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaSinkProvider.java
index e6d94ce..ba0f941 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaSinkProvider.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaSinkProvider.java
@@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic;
 import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
+import org.apache.kafka.clients.producer.ProducerConfig;
 
 public class KafkaSinkProvider implements SinkProvider {
 
@@ -39,11 +40,13 @@ public class KafkaSinkProvider implements SinkProvider {
 
     Properties properties = new Properties();
     properties.putAll(spec.properties());
-    properties.put("bootstrap.servers", spec.kafkaAddress());
+    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.kafkaAddress());
 
     Semantic producerSemantic = semanticFromSpec(spec);
     if (producerSemantic == Semantic.EXACTLY_ONCE) {
-      properties.put("transaction.timeout.ms", spec.transactionTimeoutDuration().toMillis());
+      properties.setProperty(
+          ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
+          String.valueOf(spec.transactionTimeoutDuration().toMillis()));
     }
 
     return new FlinkKafkaProducer<>(
diff --git a/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaEgressBuilder.java b/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaEgressBuilder.java
index ae88ce1..9abe95f 100644
--- a/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaEgressBuilder.java
+++ b/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaEgressBuilder.java
@@ -59,10 +59,10 @@ public final class KafkaEgressBuilder<OutT> {
   }
 
   /** A configuration property for the KafkaProducer. */
-  public KafkaEgressBuilder<OutT> withProperty(String key, Object value) {
+  public KafkaEgressBuilder<OutT> withProperty(String key, String value) {
     Objects.requireNonNull(key);
     Objects.requireNonNull(value);
-    properties.put(key, value);
+    properties.setProperty(key, value);
     return this;
   }
 
diff --git a/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.java b/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.java
index 6acceba..d432959 100644
--- a/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.java
+++ b/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.java
@@ -93,7 +93,9 @@ public final class KafkaIngressBuilder<T> {
 
   /** A configuration property for the KafkaProducer. */
   public KafkaIngressBuilder<T> withProperty(String name, String value) {
-    this.properties.put(name, value);
+    Objects.requireNonNull(name);
+    Objects.requireNonNull(value);
+    this.properties.setProperty(name, value);
     return this;
   }