You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/06/10 17:03:29 UTC

[pulsar] branch master updated: [pulsar-storm] add option to pass consumerConfiguration to PulsarSpout (#4494)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c71908  [pulsar-storm] add option to pass consumerConfiguration to PulsarSpout (#4494)
6c71908 is described below

commit 6c7190803d407c0d1183566b20edf03031c99e26
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Mon Jun 10 10:03:22 2019 -0700

    [pulsar-storm] add option to pass consumerConfiguration to PulsarSpout (#4494)
---
 .../java/org/apache/pulsar/storm/PulsarSpout.java  | 43 +++++++++++++++++-----
 .../pulsar/storm/PulsarSpoutConfiguration.java     | 14 -------
 2 files changed, 33 insertions(+), 24 deletions(-)

diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
index 0d1bebc..ed92987 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
@@ -31,10 +31,12 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.impl.Backoff;
@@ -68,6 +70,7 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
 
     private final ClientConfigurationData clientConf;
     private final PulsarSpoutConfiguration pulsarSpoutConf;
+    private final ConsumerConfigurationData<byte[]> consumerConf;
     private final long failedRetriesTimeoutNano;
     private final int maxFailedRetries;
     private final ConcurrentMap<MessageId, MessageRetries> pendingMessageRetries = new ConcurrentHashMap<>();
@@ -86,14 +89,28 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
     private volatile long pendingAcks = 0;
     private volatile long messageSizeReceived = 0;
 
+    public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf) {
+        this(pulsarSpoutConf, PulsarClient.builder());
+    }
+    
     public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientBuilder clientBuilder) {
+        this(pulsarSpoutConf, ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone(),
+                new ConsumerConfigurationData<byte[]>());
+    }
+
+    public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientConfigurationData clientConfig,
+            ConsumerConfigurationData<byte[]> consumerConfig) {
         Objects.requireNonNull(pulsarSpoutConf.getServiceUrl());
         Objects.requireNonNull(pulsarSpoutConf.getTopic());
         Objects.requireNonNull(pulsarSpoutConf.getSubscriptionName());
         Objects.requireNonNull(pulsarSpoutConf.getMessageToValuesMapper());
 
-        this.clientConf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone();
+        checkNotNull(pulsarSpoutConf, "spout configuration can't be null");
+        checkNotNull(clientConfig, "client configuration can't be null");
+        checkNotNull(consumerConfig, "consumer configuration can't be null");
+        this.clientConf = clientConfig;
         this.clientConf.setServiceUrl(pulsarSpoutConf.getServiceUrl());
+        this.consumerConf = consumerConfig;
         this.pulsarSpoutConf = pulsarSpoutConf;
         this.failedRetriesTimeoutNano = pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.NANOSECONDS);
         this.maxFailedRetries = pulsarSpoutConf.getMaxFailedRetries();
@@ -276,15 +293,15 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
         PulsarSpoutConsumer consumer;
         if (pulsarSpoutConf.isSharedConsumerEnabled()) {
             consumer = pulsarSpoutConf.isDurableSubscription()
-                    ? new SpoutConsumer(sharedPulsarClient.getSharedConsumer(newConsumerConfiguration(pulsarSpoutConf)))
-                    : new SpoutReader(sharedPulsarClient.getSharedReader(newReaderConfiguration(pulsarSpoutConf)));
+                    ? new SpoutConsumer(sharedPulsarClient.getSharedConsumer(newConsumerConfiguration()))
+                    : new SpoutReader(sharedPulsarClient.getSharedReader(newReaderConfiguration()));
         } else {
             try {
                 consumer = pulsarSpoutConf.isDurableSubscription()
                         ? new SpoutConsumer(sharedPulsarClient.getClient()
-                                .subscribeAsync(newConsumerConfiguration(pulsarSpoutConf)).join())
+                                .subscribeAsync(newConsumerConfiguration()).join())
                         : new SpoutReader(sharedPulsarClient.getClient()
-                                .createReaderAsync(newReaderConfiguration(pulsarSpoutConf)).join());
+                                .createReaderAsync(newReaderConfiguration()).join());
             } catch (CompletionException e) {
                 throw (PulsarClientException) e.getCause();
             }
@@ -376,20 +393,26 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
         return metrics;
     }
 
-    private ReaderConfigurationData<byte[]> newReaderConfiguration(PulsarSpoutConfiguration pulsarSpoutConf) {
-        ReaderConfigurationData<byte[]> readerConf = new ReaderConfigurationData<> ();
+    private ReaderConfigurationData<byte[]> newReaderConfiguration() {
+        ReaderConfigurationData<byte[]> readerConf = new ReaderConfigurationData<>();
         readerConf.setTopicName(pulsarSpoutConf.getTopic());
         readerConf.setReaderName(pulsarSpoutConf.getSubscriptionName());
         readerConf.setStartMessageId(pulsarSpoutConf.getNonDurableSubscriptionReadPosition());
+        if (this.consumerConf != null) {
+            readerConf.setCryptoFailureAction(consumerConf.getCryptoFailureAction());
+            readerConf.setCryptoKeyReader(consumerConf.getCryptoKeyReader());
+            readerConf.setReadCompacted(consumerConf.isReadCompacted());
+            readerConf.setReceiverQueueSize(consumerConf.getReceiverQueueSize());
+        }
         return readerConf;
     }
 
-    private ConsumerConfigurationData<byte[]> newConsumerConfiguration(PulsarSpoutConfiguration pulsarSpoutConf2) {
-        ConsumerConfigurationData<byte[]> consumerConf = new ConsumerConfigurationData<>();
+    private ConsumerConfigurationData<byte[]> newConsumerConfiguration() {
+        ConsumerConfigurationData<byte[]> consumerConf = this.consumerConf != null ? this.consumerConf
+                : new ConsumerConfigurationData<>();
         consumerConf.setTopicNames(Collections.singleton(pulsarSpoutConf.getTopic()));
         consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName());
         consumerConf.setSubscriptionType(pulsarSpoutConf.getSubscriptionType());
-        consumerConf.setReceiverQueueSize(pulsarSpoutConf.getConsumerReceiverQueueSize());
         return consumerConf;
     }
 
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
index daa598f..0ef7686 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
@@ -47,7 +47,6 @@ public class PulsarSpoutConfiguration extends PulsarStormConfiguration {
 
     private SubscriptionType subscriptionType = SubscriptionType.Shared;
     private boolean autoUnsubscribe = false;
-    private int consumerReceiverQueueSize = 1000;
     private boolean durableSubscription = true;
     // read position if non-durable subscription is enabled : default oldest message available in topic
     private MessageId nonDurableSubscriptionReadPosition = MessageId.earliest; 
@@ -77,19 +76,6 @@ public class PulsarSpoutConfiguration extends PulsarStormConfiguration {
         this.subscriptionType = subscriptionType;
     }
 
-    public int getConsumerReceiverQueueSize() {
-        return consumerReceiverQueueSize;
-    }
-
-    /**
-     * Receiver queue size of pulsar-spout consumer.
-     * 
-     * @param consumerReceiverQueueSize
-     */
-    public void setConsumerReceiverQueueSize(int consumerReceiverQueueSize) {
-        this.consumerReceiverQueueSize = consumerReceiverQueueSize;
-    }
-    
     /**
      * @return the mapper to convert pulsar message to a storm tuple
      */