You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/25 03:28:43 UTC
[pulsar] branch master updated: [flink] Allow to specify a custom
Pulsar producer (#3894)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 19fd91c [flink] Allow to specify a custom Pulsar producer (#3894)
19fd91c is described below
commit 19fd91c484bf40cf8677cd529d155f3210fd59d4
Author: Cristian <me...@cristian.io>
AuthorDate: Sun Mar 24 20:28:38 2019 -0700
[flink] Allow to specify a custom Pulsar producer (#3894)
This is necessary in pretty much any non-trivial use-case. The ability
to control the settings of the Pulsar producer is paramount to
building real-life applications
---
.../streaming/connectors/pulsar/FlinkPulsarProducer.java | 14 +++++++++++++-
1 file changed, 13 insertions(+), 1 deletion(-)
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
index 27701c5..04e69f8 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
@@ -115,6 +115,14 @@ public class FlinkPulsarProducer<IN>
String defaultTopicName,
SerializationSchema<IN> serializationSchema,
PulsarKeyExtractor<IN> keyExtractor) {
+ this(serviceUrl, defaultTopicName, serializationSchema, keyExtractor, null);
+ }
+
+ public FlinkPulsarProducer(String serviceUrl,
+ String defaultTopicName,
+ SerializationSchema<IN> serializationSchema,
+ PulsarKeyExtractor<IN> keyExtractor,
+ Producer<byte[]> producer) {
checkArgument(StringUtils.isNotBlank(serviceUrl), "Service url cannot be blank");
checkArgument(StringUtils.isNotBlank(defaultTopicName), "TopicName cannot be blank");
this.serviceUrl = serviceUrl;
@@ -122,6 +130,7 @@ public class FlinkPulsarProducer<IN>
this.schema = checkNotNull(serializationSchema, "Serialization Schema not set");
this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor);
ClosureCleaner.ensureSerializable(serializationSchema);
+ this.producer = producer;
}
// ---------------------------------- Properties --------------------------
@@ -185,7 +194,10 @@ public class FlinkPulsarProducer<IN>
*/
@Override
public void open(Configuration parameters) throws Exception {
- this.producer = createProducer();
+ if (producer == null) {
+ // If no custom producer was specified create a default one
+ this.producer = createProducer();
+ }
RuntimeContext ctx = getRuntimeContext();