You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2019/05/20 20:22:28 UTC
[pulsar] branch master updated: [pulsar-storm] support reader for
pulsar-spout (#4236)
This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 407c445 [pulsar-storm] support reader for pulsar-spout (#4236)
407c445 is described below
commit 407c4452e904c111f0a02d1c970ac1ef9de834fc
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Mon May 20 13:22:23 2019 -0700
[pulsar-storm] support reader for pulsar-spout (#4236)
* [pulsar-storm] pulsar-spout can use reader to read message without durable subscription
* fix test
---
.../java/org/apache/pulsar/storm/PulsarSpout.java | 132 +++++++++++++++++----
.../pulsar/storm/PulsarSpoutConfiguration.java | 32 +++++
.../apache/pulsar/storm/PulsarSpoutConsumer.java | 58 +++++++++
.../apache/pulsar/storm/SharedPulsarClient.java | 21 ++++
.../org/apache/pulsar/storm/PulsarSpoutTest.java | 67 ++++++++++-
5 files changed, 289 insertions(+), 21 deletions(-)
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
index 5a5ea59..0d1bebc 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.storm;
import static java.lang.String.format;
+import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
@@ -35,12 +36,13 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.storm.metric.api.IMetric;
-import org.apache.storm.shade.org.eclipse.jetty.util.log.Log;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -65,7 +67,6 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
public static final String CONSUMER_THROUGHPUT_BYTES = "consumerThroughput";
private final ClientConfigurationData clientConf;
- private final ConsumerConfigurationData<byte[]> consumerConf;
private final PulsarSpoutConfiguration pulsarSpoutConf;
private final long failedRetriesTimeoutNano;
private final int maxFailedRetries;
@@ -77,7 +78,7 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
private String componentId;
private String spoutId;
private SpoutOutputCollector collector;
- private Consumer<byte[]> consumer;
+ private PulsarSpoutConsumer consumer;
private volatile long messagesReceived = 0;
private volatile long messagesEmitted = 0;
private volatile long messagesFailed = 0;
@@ -93,12 +94,6 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
this.clientConf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone();
this.clientConf.setServiceUrl(pulsarSpoutConf.getServiceUrl());
- this.consumerConf = new ConsumerConfigurationData<>();
- this.consumerConf.setTopicNames(Collections.singleton(pulsarSpoutConf.getTopic()));
- this.consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName());
- this.consumerConf.setSubscriptionType(pulsarSpoutConf.getSubscriptionType());
- this.consumerConf.setReceiverQueueSize(pulsarSpoutConf.getConsumerReceiverQueueSize());
-
this.pulsarSpoutConf = pulsarSpoutConf;
this.failedRetriesTimeoutNano = pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.NANOSECONDS);
this.maxFailedRetries = pulsarSpoutConf.getMaxFailedRetries();
@@ -110,7 +105,12 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
LOG.info("[{}] Closing Pulsar consumer for topic {}", spoutId, pulsarSpoutConf.getTopic());
if (pulsarSpoutConf.isAutoUnsubscribe()) {
- consumer.unsubscribe();
+ try {
+ consumer.unsubscribe();
+ }catch(PulsarClientException e) {
+ LOG.error("[{}] Failed to unsubscribe {} on topic {}", spoutId,
+ this.pulsarSpoutConf.getSubscriptionName(), pulsarSpoutConf.getTopic(), e);
+ }
}
if (!pulsarSpoutConf.isSharedConsumerEnabled() && consumer != null) {
@@ -259,16 +259,7 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
pendingMessageRetries.clear();
failedMessages.clear();
try {
- sharedPulsarClient = SharedPulsarClient.get(componentId, clientConf);
- if (pulsarSpoutConf.isSharedConsumerEnabled()) {
- consumer = sharedPulsarClient.getSharedConsumer(consumerConf);
- } else {
- try {
- consumer = sharedPulsarClient.getClient().subscribeAsync(consumerConf).join();
- } catch (CompletionException e) {
- throw (PulsarClientException) e.getCause();
- }
- }
+ consumer = createConsumer();
LOG.info("[{}] Created a pulsar consumer on topic {} to receive messages with subscription {}", spoutId,
pulsarSpoutConf.getTopic(), pulsarSpoutConf.getSubscriptionName());
} catch (PulsarClientException e) {
@@ -280,6 +271,27 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
pulsarSpoutConf.getMetricsTimeIntervalInSecs());
}
+ private PulsarSpoutConsumer createConsumer() throws PulsarClientException {
+ sharedPulsarClient = SharedPulsarClient.get(componentId, clientConf);
+ PulsarSpoutConsumer consumer;
+ if (pulsarSpoutConf.isSharedConsumerEnabled()) {
+ consumer = pulsarSpoutConf.isDurableSubscription()
+ ? new SpoutConsumer(sharedPulsarClient.getSharedConsumer(newConsumerConfiguration(pulsarSpoutConf)))
+ : new SpoutReader(sharedPulsarClient.getSharedReader(newReaderConfiguration(pulsarSpoutConf)));
+ } else {
+ try {
+ consumer = pulsarSpoutConf.isDurableSubscription()
+ ? new SpoutConsumer(sharedPulsarClient.getClient()
+ .subscribeAsync(newConsumerConfiguration(pulsarSpoutConf)).join())
+ : new SpoutReader(sharedPulsarClient.getClient()
+ .createReaderAsync(newReaderConfiguration(pulsarSpoutConf)).join());
+ } catch (CompletionException e) {
+ throw (PulsarClientException) e.getCause();
+ }
+ }
+ return consumer;
+ }
+
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
pulsarSpoutConf.getMessageToValuesMapper().declareOutputFields(declarer);
@@ -363,4 +375,84 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
resetMetrics();
return metrics;
}
+
+ private ReaderConfigurationData<byte[]> newReaderConfiguration(PulsarSpoutConfiguration pulsarSpoutConf) {
+ ReaderConfigurationData<byte[]> readerConf = new ReaderConfigurationData<> ();
+ readerConf.setTopicName(pulsarSpoutConf.getTopic());
+ readerConf.setReaderName(pulsarSpoutConf.getSubscriptionName());
+ readerConf.setStartMessageId(pulsarSpoutConf.getNonDurableSubscriptionReadPosition());
+ return readerConf;
+ }
+
+ private ConsumerConfigurationData<byte[]> newConsumerConfiguration(PulsarSpoutConfiguration pulsarSpoutConf2) {
+ ConsumerConfigurationData<byte[]> consumerConf = new ConsumerConfigurationData<>();
+ consumerConf.setTopicNames(Collections.singleton(pulsarSpoutConf.getTopic()));
+ consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName());
+ consumerConf.setSubscriptionType(pulsarSpoutConf.getSubscriptionType());
+ consumerConf.setReceiverQueueSize(pulsarSpoutConf.getConsumerReceiverQueueSize());
+ return consumerConf;
+ }
+
+ static class SpoutConsumer implements PulsarSpoutConsumer {
+ private Consumer<byte[]> consumer;
+
+ public SpoutConsumer(Consumer<byte[]> consumer) {
+ super();
+ this.consumer = consumer;
+ }
+
+ @Override
+ public Message<byte[]> receive(int timeout, TimeUnit unit) throws PulsarClientException {
+ return consumer.receive(timeout, unit);
+ }
+
+ @Override
+ public void acknowledgeAsync(Message<?> msg) {
+ consumer.acknowledgeAsync(msg);
+ }
+
+ @Override
+ public void close() throws PulsarClientException {
+ consumer.close();
+ }
+
+ @Override
+ public void unsubscribe() throws PulsarClientException {
+ consumer.unsubscribe();
+ }
+
+ }
+
+ static class SpoutReader implements PulsarSpoutConsumer {
+ private Reader<byte[]> reader;
+
+ public SpoutReader(Reader<byte[]> reader) {
+ super();
+ this.reader = reader;
+ }
+
+ @Override
+ public Message<byte[]> receive(int timeout, TimeUnit unit) throws PulsarClientException {
+ return reader.readNext(timeout, unit);
+ }
+
+ @Override
+ public void acknowledgeAsync(Message<?> msg) {
+ // No-op
+ }
+
+ @Override
+ public void close() throws PulsarClientException {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ throw new PulsarClientException(e);
+ }
+ }
+
+ @Override
+ public void unsubscribe() throws PulsarClientException {
+ // No-op
+ }
+ }
}
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
index 0f27ddc..daa598f 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.storm;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
/**
@@ -47,7 +48,11 @@ public class PulsarSpoutConfiguration extends PulsarStormConfiguration {
private SubscriptionType subscriptionType = SubscriptionType.Shared;
private boolean autoUnsubscribe = false;
private int consumerReceiverQueueSize = 1000;
+ private boolean durableSubscription = true;
+ // read position if non-durable subscription is enabled : default oldest message available in topic
+ private MessageId nonDurableSubscriptionReadPosition = MessageId.earliest;
+
/**
* @return the subscription name for the consumer in the spout
*/
@@ -174,4 +179,31 @@ public class PulsarSpoutConfiguration extends PulsarStormConfiguration {
public void setAutoUnsubscribe(boolean autoUnsubscribe) {
this.autoUnsubscribe = autoUnsubscribe;
}
+
+ public boolean isDurableSubscription() {
+ return durableSubscription;
+ }
+
+ /**
+ * if subscription is not durable then it creates non-durable reader to start reading from the
+ * {@link #setNonDurableSubscriptionReadPosition(MessagePosition)} in topic.
+ *
+ * @param nonDurableSubscription
+ */
+ public void setDurableSubscription(boolean durableSubscription) {
+ this.durableSubscription = durableSubscription;
+ }
+
+ public MessageId getNonDurableSubscriptionReadPosition() {
+ return nonDurableSubscriptionReadPosition;
+ }
+
+ /**
+ * Non-durable-subscription/Reader can be set to start reading from a specific position earliest/latest.
+ *
+ * @param nonDurableSubscriptionReadPosition
+ */
+ public void setNonDurableSubscriptionReadPosition(MessageId nonDurableSubscriptionReadPosition) {
+ this.nonDurableSubscriptionReadPosition = nonDurableSubscriptionReadPosition;
+ }
}
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java
new file mode 100644
index 0000000..d845c4e
--- /dev/null
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+public interface PulsarSpoutConsumer {
+
+ /**
+ * Receives a single message.
+ *
+ * @param waitTime
+ * @param milliseconds
+ * @return
+ * @throws PulsarClientException
+ */
+ Message<byte[]> receive(int waitTime, TimeUnit unit) throws PulsarClientException;
+
+ /**
+ * Ack the message async.
+ *
+ * @param msg
+ */
+ void acknowledgeAsync(Message<?> msg);
+
+ /**
+ * unsubscribe the consumer
+ * @throws PulsarClientException
+ */
+ void unsubscribe() throws PulsarClientException;
+
+ /**
+ * Close the consumer
+ *
+ * @throws PulsarClientException
+ */
+ void close() throws PulsarClientException;
+
+}
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
index d07903e..b8263a4 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
@@ -27,10 +27,12 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +45,7 @@ public class SharedPulsarClient {
private final AtomicInteger counter = new AtomicInteger();
private Consumer<byte[]> consumer;
+ private Reader<byte[]> reader;
private Producer<byte[]> producer;
private SharedPulsarClient(String componentId, ClientConfigurationData clientConf)
@@ -104,6 +107,23 @@ public class SharedPulsarClient {
return consumer;
}
+ public Reader<byte[]> getSharedReader(ReaderConfigurationData<byte[]> readerConf) throws PulsarClientException {
+ counter.incrementAndGet();
+ synchronized (this) {
+ if (reader == null) {
+ try {
+ reader = client.createReaderAsync(readerConf).join();
+ } catch (CompletionException e) {
+ throw (PulsarClientException) e.getCause();
+ }
+ LOG.info("[{}] Created a new Pulsar reader on {}", componentId, readerConf.getTopicName());
+ } else {
+ LOG.info("[{}] Using a shared reader on {}", componentId, readerConf.getTopicName());
+ }
+ }
+ return reader;
+ }
+
public Producer<byte[]> getSharedProducer(ProducerConfigurationData producerConf) throws PulsarClientException {
counter.incrementAndGet();
synchronized (this) {
@@ -130,4 +150,5 @@ public class SharedPulsarClient {
}
}
}
+
}
diff --git a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
index 588c5a0..5764ac7 100644
--- a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
+++ b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
@@ -25,9 +25,16 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertTrue;
import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
@@ -36,6 +43,11 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.pulsar.storm.PulsarSpout.SpoutConsumer;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
@@ -73,16 +85,69 @@ public class PulsarSpoutTest {
Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), null, Schema.BYTES);
Consumer<byte[]> consumer = mock(Consumer.class);
+ SpoutConsumer spoutConsumer = new SpoutConsumer(consumer);
CompletableFuture<Void> future = new CompletableFuture<>();
future.complete(null);
doReturn(future).when(consumer).acknowledgeAsync(msg.getMessageId());
Field consField = PulsarSpout.class.getDeclaredField("consumer");
consField.setAccessible(true);
- consField.set(spout, consumer);
+ consField.set(spout, spoutConsumer);
spout.fail(msg);
spout.ack(msg);
spout.emitNextAvailableTuple();
verify(consumer, atLeast(1)).receive(anyInt(), any());
}
+
+ @Test
+ public void testPulsarSpout() throws Exception {
+ PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration();
+ conf.setServiceUrl("http://localhost:8080");
+ conf.setSubscriptionName("sub1");
+ conf.setTopic("persistent://prop/ns1/topic1");
+ conf.setSubscriptionType(SubscriptionType.Exclusive);
+ conf.setSharedConsumerEnabled(true);
+ AtomicBoolean called = new AtomicBoolean(false);
+ conf.setMessageToValuesMapper(new MessageToValuesMapper() {
+ @Override
+ public Values toValues(Message<byte[]> msg) {
+ called.set(true);
+ return new Values("test");
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ }
+
+ });
+
+ ClientBuilder builder = spy(new ClientBuilderImpl());
+ PulsarSpout spout = spy(new PulsarSpout(conf, builder));
+ TopologyContext context = mock(TopologyContext.class);
+ final String componentId = "test-component-id";
+ doReturn(componentId).when(context).getThisComponentId();
+ SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
+ Map config = new HashMap<>();
+ Field field = SharedPulsarClient.class.getDeclaredField("instances");
+ field.setAccessible(true);
+ ConcurrentMap<String, SharedPulsarClient> instances = (ConcurrentMap<String, SharedPulsarClient>) field
+ .get(SharedPulsarClient.class);
+
+ SharedPulsarClient client = mock(SharedPulsarClient.class);
+ Consumer<byte[]> consumer = mock(Consumer.class);
+ when(client.getSharedConsumer(any())).thenReturn(consumer);
+ instances.put(componentId, client);
+
+ ByteBuf data = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+ data.writeBytes("test".getBytes());
+ Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), data, Schema.BYTES);
+ when(consumer.receive(anyInt(), any())).thenReturn(msg);
+
+ spout.open(config, context, collector);
+ spout.emitNextAvailableTuple();
+
+ assertTrue(called.get());
+ verify(consumer, atLeast(1)).receive(anyInt(), any());
+ }
+
}