You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2019/06/09 18:08:00 UTC
[pulsar] branch master updated: [pulsar-storm] pulsar-bolt: add
option to pass producer-configuration (#4495)
This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fbb3c18 [pulsar-storm] pulsar-bolt: add option to pass producer-configuration (#4495)
fbb3c18 is described below
commit fbb3c18ce14f9a852c2e0538e1d2e9487a611e99
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Sun Jun 9 11:07:55 2019 -0700
[pulsar-storm] pulsar-bolt: add option to pass producer-configuration (#4495)
---
.../java/org/apache/pulsar/storm/PulsarBolt.java | 24 +++++++++++++++++-----
1 file changed, 19 insertions(+), 5 deletions(-)
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
index 5259b5b..8432b1c 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
@@ -42,6 +43,7 @@ import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.TupleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static com.google.common.base.Preconditions.checkNotNull;
public class PulsarBolt extends BaseRichBolt implements IMetric {
/**
@@ -67,18 +69,30 @@ public class PulsarBolt extends BaseRichBolt implements IMetric {
private volatile long messagesSent = 0;
private volatile long messageSizeSent = 0;
+ public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf) {
+ this(pulsarBoltConf, PulsarClient.builder());
+ }
+
public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientBuilder clientBuilder) {
- this.clientConf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone();
- this.producerConf = new ProducerConfigurationData();
+ this(pulsarBoltConf, ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone(),
+ new ProducerConfigurationData());
+ }
+
+ public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientConfigurationData clientConf,
+ ProducerConfigurationData producerConf) {
+ checkNotNull(pulsarBoltConf, "bolt configuration can't be null");
+ checkNotNull(clientConf, "client configuration can't be null");
+ checkNotNull(producerConf, "producer configuration can't be null");
Objects.requireNonNull(pulsarBoltConf.getServiceUrl());
Objects.requireNonNull(pulsarBoltConf.getTopic());
Objects.requireNonNull(pulsarBoltConf.getTupleToMessageMapper());
-
+ this.pulsarBoltConf = pulsarBoltConf;
+ this.clientConf = clientConf;
+ this.producerConf = producerConf;
this.clientConf.setServiceUrl(pulsarBoltConf.getServiceUrl());
this.producerConf.setTopicName(pulsarBoltConf.getTopic());
- this.pulsarBoltConf = pulsarBoltConf;
}
-
+
@SuppressWarnings({ "rawtypes" })
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {