You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/25 03:28:43 UTC

[pulsar] branch master updated: [flink] Allow to specify a custom Pulsar producer (#3894)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 19fd91c  [flink] Allow to specify a custom Pulsar producer (#3894)
19fd91c is described below

commit 19fd91c484bf40cf8677cd529d155f3210fd59d4
Author: Cristian <me...@cristian.io>
AuthorDate: Sun Mar 24 20:28:38 2019 -0700

    [flink] Allow to specify a custom Pulsar producer (#3894)
    
    This is necessary in pretty much any non-trivial use-case. The ability
    to control the settings of the Pulsar producer is paramount to
    building real-life applications
---
 .../streaming/connectors/pulsar/FlinkPulsarProducer.java   | 14 +++++++++++++-
 1 file changed, 13 insertions(+), 1 deletion(-)

diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
index 27701c5..04e69f8 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
@@ -115,6 +115,14 @@ public class FlinkPulsarProducer<IN>
                                String defaultTopicName,
                                SerializationSchema<IN> serializationSchema,
                                PulsarKeyExtractor<IN> keyExtractor) {
+        this(serviceUrl, defaultTopicName, serializationSchema, keyExtractor, null);
+    }
+
+    public FlinkPulsarProducer(String serviceUrl,
+                               String defaultTopicName,
+                               SerializationSchema<IN> serializationSchema,
+                               PulsarKeyExtractor<IN> keyExtractor,
+                               Producer<byte[]> producer) {
         checkArgument(StringUtils.isNotBlank(serviceUrl), "Service url cannot be blank");
         checkArgument(StringUtils.isNotBlank(defaultTopicName), "TopicName cannot be blank");
         this.serviceUrl = serviceUrl;
@@ -122,6 +130,7 @@ public class FlinkPulsarProducer<IN>
         this.schema = checkNotNull(serializationSchema, "Serialization Schema not set");
         this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor);
         ClosureCleaner.ensureSerializable(serializationSchema);
+        this.producer = producer;
     }
 
     // ---------------------------------- Properties --------------------------
@@ -185,7 +194,10 @@ public class FlinkPulsarProducer<IN>
      */
     @Override
     public void open(Configuration parameters) throws Exception {
-        this.producer = createProducer();
+        if (producer == null) {
+            // If no custom producer was specified create a default one
+            this.producer = createProducer();
+        }
 
         RuntimeContext ctx = getRuntimeContext();