You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/10 08:09:39 UTC
[flink-statefun] branch master updated: [FLINK-16518] [kafka] Set
client properties as strings in KafkaSinkProvider
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
The following commit(s) were added to refs/heads/master by this push:
new ab491a8 [FLINK-16518] [kafka] Set client properties as strings in KafkaSinkProvider
ab491a8 is described below
commit ab491a80db6e0ac5bb44d7a19f99d8bb88f7a1e9
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Mar 10 10:56:44 2020 +0800
[FLINK-16518] [kafka] Set client properties as strings in KafkaSinkProvider
This closes #55.
---
.../flink/statefun/flink/io/kafka/KafkaEgressSpecJsonParser.java | 2 +-
.../flink/statefun/flink/io/kafka/KafkaIngressSpecJsonParser.java | 2 +-
.../apache/flink/statefun/flink/io/kafka/KafkaSinkProvider.java | 7 +++++--
.../org/apache/flink/statefun/sdk/kafka/KafkaEgressBuilder.java | 4 ++--
.../org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.java | 4 +++-
5 files changed, 12 insertions(+), 7 deletions(-)
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaEgressSpecJsonParser.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaEgressSpecJsonParser.java
index a61a816..81f2947 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaEgressSpecJsonParser.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaEgressSpecJsonParser.java
@@ -50,7 +50,7 @@ final class KafkaEgressSpecJsonParser {
static Properties kafkaClientProperties(JsonNode json) {
Map<String, String> kvs = Selectors.propertiesAt(json, PROPERTIES_POINTER);
Properties properties = new Properties();
- kvs.forEach(properties::put);
+ kvs.forEach(properties::setProperty);
return properties;
}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaIngressSpecJsonParser.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaIngressSpecJsonParser.java
index a9f729a..91fbc50 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaIngressSpecJsonParser.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaIngressSpecJsonParser.java
@@ -99,7 +99,7 @@ final class KafkaIngressSpecJsonParser {
static Properties kafkaClientProperties(JsonNode json) {
Map<String, String> kvs = Selectors.propertiesAt(json, PROPERTIES_POINTER);
Properties properties = new Properties();
- kvs.forEach(properties::put);
+ kvs.forEach(properties::setProperty);
return properties;
}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaSinkProvider.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaSinkProvider.java
index e6d94ce..ba0f941 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaSinkProvider.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaSinkProvider.java
@@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
+import org.apache.kafka.clients.producer.ProducerConfig;
public class KafkaSinkProvider implements SinkProvider {
@@ -39,11 +40,13 @@ public class KafkaSinkProvider implements SinkProvider {
Properties properties = new Properties();
properties.putAll(spec.properties());
- properties.put("bootstrap.servers", spec.kafkaAddress());
+ properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.kafkaAddress());
Semantic producerSemantic = semanticFromSpec(spec);
if (producerSemantic == Semantic.EXACTLY_ONCE) {
- properties.put("transaction.timeout.ms", spec.transactionTimeoutDuration().toMillis());
+ properties.setProperty(
+ ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
+ String.valueOf(spec.transactionTimeoutDuration().toMillis()));
}
return new FlinkKafkaProducer<>(
diff --git a/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaEgressBuilder.java b/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaEgressBuilder.java
index ae88ce1..9abe95f 100644
--- a/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaEgressBuilder.java
+++ b/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaEgressBuilder.java
@@ -59,10 +59,10 @@ public final class KafkaEgressBuilder<OutT> {
}
/** A configuration property for the KafkaProducer. */
- public KafkaEgressBuilder<OutT> withProperty(String key, Object value) {
+ public KafkaEgressBuilder<OutT> withProperty(String key, String value) {
Objects.requireNonNull(key);
Objects.requireNonNull(value);
- properties.put(key, value);
+ properties.setProperty(key, value);
return this;
}
diff --git a/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.java b/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.java
index 6acceba..d432959 100644
--- a/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.java
+++ b/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.java
@@ -93,7 +93,9 @@ public final class KafkaIngressBuilder<T> {
/** A configuration property for the KafkaProducer. */
public KafkaIngressBuilder<T> withProperty(String name, String value) {
- this.properties.put(name, value);
+ Objects.requireNonNull(name);
+ Objects.requireNonNull(value);
+ this.properties.setProperty(name, value);
return this;
}