You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2019/06/09 18:08:00 UTC

[pulsar] branch master updated: [pulsar-storm] pulsar-bolt: add option to pass producer-configuration (#4495)

This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fbb3c18  [pulsar-storm] pulsar-bolt: add option to pass producer-configuration (#4495)
fbb3c18 is described below

commit fbb3c18ce14f9a852c2e0538e1d2e9487a611e99
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Sun Jun 9 11:07:55 2019 -0700

    [pulsar-storm] pulsar-bolt: add option to pass producer-configuration (#4495)
---
 .../java/org/apache/pulsar/storm/PulsarBolt.java   | 24 +++++++++++++++++-----
 1 file changed, 19 insertions(+), 5 deletions(-)

diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
index 5259b5b..8432b1c 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentMap;
 
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
@@ -42,6 +43,7 @@ import org.apache.storm.tuple.Tuple;
 import org.apache.storm.utils.TupleUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 public class PulsarBolt extends BaseRichBolt implements IMetric {
     /**
@@ -67,18 +69,30 @@ public class PulsarBolt extends BaseRichBolt implements IMetric {
     private volatile long messagesSent = 0;
     private volatile long messageSizeSent = 0;
 
+    public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf) {
+        this(pulsarBoltConf, PulsarClient.builder());
+    }
+
     public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientBuilder clientBuilder) {
-        this.clientConf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone();
-        this.producerConf = new ProducerConfigurationData();
+        this(pulsarBoltConf, ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone(),
+                new ProducerConfigurationData());
+    }
+
+    public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientConfigurationData clientConf,
+            ProducerConfigurationData producerConf) {
+        checkNotNull(pulsarBoltConf, "bolt configuration can't be null");
+        checkNotNull(clientConf, "client configuration can't be null");
+        checkNotNull(producerConf, "producer configuration can't be null");
         Objects.requireNonNull(pulsarBoltConf.getServiceUrl());
         Objects.requireNonNull(pulsarBoltConf.getTopic());
         Objects.requireNonNull(pulsarBoltConf.getTupleToMessageMapper());
-
+        this.pulsarBoltConf = pulsarBoltConf;
+        this.clientConf = clientConf;
+        this.producerConf = producerConf;
         this.clientConf.setServiceUrl(pulsarBoltConf.getServiceUrl());
         this.producerConf.setTopicName(pulsarBoltConf.getTopic());
-        this.pulsarBoltConf = pulsarBoltConf;
     }
-
+    
     @SuppressWarnings({ "rawtypes" })
     @Override
     public void prepare(Map conf, TopologyContext context, OutputCollector collector) {