You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/06/10 17:03:29 UTC
[pulsar] branch master updated: [pulsar-storm] add option to pass
consumerConfiguration to PulsarSpout (#4494)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6c71908 [pulsar-storm] add option to pass consumerConfiguration to PulsarSpout (#4494)
6c71908 is described below
commit 6c7190803d407c0d1183566b20edf03031c99e26
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Mon Jun 10 10:03:22 2019 -0700
[pulsar-storm] add option to pass consumerConfiguration to PulsarSpout (#4494)
---
.../java/org/apache/pulsar/storm/PulsarSpout.java | 43 +++++++++++++++++-----
.../pulsar/storm/PulsarSpoutConfiguration.java | 14 -------
2 files changed, 33 insertions(+), 24 deletions(-)
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
index 0d1bebc..ed92987 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
@@ -31,10 +31,12 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.impl.Backoff;
@@ -68,6 +70,7 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
private final ClientConfigurationData clientConf;
private final PulsarSpoutConfiguration pulsarSpoutConf;
+ private final ConsumerConfigurationData<byte[]> consumerConf;
private final long failedRetriesTimeoutNano;
private final int maxFailedRetries;
private final ConcurrentMap<MessageId, MessageRetries> pendingMessageRetries = new ConcurrentHashMap<>();
@@ -86,14 +89,28 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
private volatile long pendingAcks = 0;
private volatile long messageSizeReceived = 0;
+ public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf) {
+ this(pulsarSpoutConf, PulsarClient.builder());
+ }
+
public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientBuilder clientBuilder) {
+ this(pulsarSpoutConf, ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone(),
+ new ConsumerConfigurationData<byte[]>());
+ }
+
+ public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientConfigurationData clientConfig,
+ ConsumerConfigurationData<byte[]> consumerConfig) {
Objects.requireNonNull(pulsarSpoutConf.getServiceUrl());
Objects.requireNonNull(pulsarSpoutConf.getTopic());
Objects.requireNonNull(pulsarSpoutConf.getSubscriptionName());
Objects.requireNonNull(pulsarSpoutConf.getMessageToValuesMapper());
- this.clientConf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone();
+ checkNotNull(pulsarSpoutConf, "spout configuration can't be null");
+ checkNotNull(clientConfig, "client configuration can't be null");
+ checkNotNull(consumerConfig, "consumer configuration can't be null");
+ this.clientConf = clientConfig;
this.clientConf.setServiceUrl(pulsarSpoutConf.getServiceUrl());
+ this.consumerConf = consumerConfig;
this.pulsarSpoutConf = pulsarSpoutConf;
this.failedRetriesTimeoutNano = pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.NANOSECONDS);
this.maxFailedRetries = pulsarSpoutConf.getMaxFailedRetries();
@@ -276,15 +293,15 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
PulsarSpoutConsumer consumer;
if (pulsarSpoutConf.isSharedConsumerEnabled()) {
consumer = pulsarSpoutConf.isDurableSubscription()
- ? new SpoutConsumer(sharedPulsarClient.getSharedConsumer(newConsumerConfiguration(pulsarSpoutConf)))
- : new SpoutReader(sharedPulsarClient.getSharedReader(newReaderConfiguration(pulsarSpoutConf)));
+ ? new SpoutConsumer(sharedPulsarClient.getSharedConsumer(newConsumerConfiguration()))
+ : new SpoutReader(sharedPulsarClient.getSharedReader(newReaderConfiguration()));
} else {
try {
consumer = pulsarSpoutConf.isDurableSubscription()
? new SpoutConsumer(sharedPulsarClient.getClient()
- .subscribeAsync(newConsumerConfiguration(pulsarSpoutConf)).join())
+ .subscribeAsync(newConsumerConfiguration()).join())
: new SpoutReader(sharedPulsarClient.getClient()
- .createReaderAsync(newReaderConfiguration(pulsarSpoutConf)).join());
+ .createReaderAsync(newReaderConfiguration()).join());
} catch (CompletionException e) {
throw (PulsarClientException) e.getCause();
}
@@ -376,20 +393,26 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
return metrics;
}
- private ReaderConfigurationData<byte[]> newReaderConfiguration(PulsarSpoutConfiguration pulsarSpoutConf) {
- ReaderConfigurationData<byte[]> readerConf = new ReaderConfigurationData<> ();
+ private ReaderConfigurationData<byte[]> newReaderConfiguration() {
+ ReaderConfigurationData<byte[]> readerConf = new ReaderConfigurationData<>();
readerConf.setTopicName(pulsarSpoutConf.getTopic());
readerConf.setReaderName(pulsarSpoutConf.getSubscriptionName());
readerConf.setStartMessageId(pulsarSpoutConf.getNonDurableSubscriptionReadPosition());
+ if (this.consumerConf != null) {
+ readerConf.setCryptoFailureAction(consumerConf.getCryptoFailureAction());
+ readerConf.setCryptoKeyReader(consumerConf.getCryptoKeyReader());
+ readerConf.setReadCompacted(consumerConf.isReadCompacted());
+ readerConf.setReceiverQueueSize(consumerConf.getReceiverQueueSize());
+ }
return readerConf;
}
- private ConsumerConfigurationData<byte[]> newConsumerConfiguration(PulsarSpoutConfiguration pulsarSpoutConf2) {
- ConsumerConfigurationData<byte[]> consumerConf = new ConsumerConfigurationData<>();
+ private ConsumerConfigurationData<byte[]> newConsumerConfiguration() {
+ ConsumerConfigurationData<byte[]> consumerConf = this.consumerConf != null ? this.consumerConf
+ : new ConsumerConfigurationData<>();
consumerConf.setTopicNames(Collections.singleton(pulsarSpoutConf.getTopic()));
consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName());
consumerConf.setSubscriptionType(pulsarSpoutConf.getSubscriptionType());
- consumerConf.setReceiverQueueSize(pulsarSpoutConf.getConsumerReceiverQueueSize());
return consumerConf;
}
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
index daa598f..0ef7686 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
@@ -47,7 +47,6 @@ public class PulsarSpoutConfiguration extends PulsarStormConfiguration {
private SubscriptionType subscriptionType = SubscriptionType.Shared;
private boolean autoUnsubscribe = false;
- private int consumerReceiverQueueSize = 1000;
private boolean durableSubscription = true;
// read position if non-durable subscription is enabled : default oldest message available in topic
private MessageId nonDurableSubscriptionReadPosition = MessageId.earliest;
@@ -77,19 +76,6 @@ public class PulsarSpoutConfiguration extends PulsarStormConfiguration {
this.subscriptionType = subscriptionType;
}
- public int getConsumerReceiverQueueSize() {
- return consumerReceiverQueueSize;
- }
-
- /**
- * Receiver queue size of pulsar-spout consumer.
- *
- * @param consumerReceiverQueueSize
- */
- public void setConsumerReceiverQueueSize(int consumerReceiverQueueSize) {
- this.consumerReceiverQueueSize = consumerReceiverQueueSize;
- }
-
/**
* @return the mapper to convert pulsar message to a storm tuple
*/