You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2019/05/20 20:22:28 UTC

[pulsar] branch master updated: [pulsar-storm] support reader for pulsar-spout (#4236)

This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 407c445  [pulsar-storm] support reader for pulsar-spout (#4236)
407c445 is described below

commit 407c4452e904c111f0a02d1c970ac1ef9de834fc
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Mon May 20 13:22:23 2019 -0700

    [pulsar-storm] support reader for pulsar-spout (#4236)
    
    * [pulsar-storm] pulsar-spout can use reader to read message without durable subscription
    
    * fix test
---
 .../java/org/apache/pulsar/storm/PulsarSpout.java  | 132 +++++++++++++++++----
 .../pulsar/storm/PulsarSpoutConfiguration.java     |  32 +++++
 .../apache/pulsar/storm/PulsarSpoutConsumer.java   |  58 +++++++++
 .../apache/pulsar/storm/SharedPulsarClient.java    |  21 ++++
 .../org/apache/pulsar/storm/PulsarSpoutTest.java   |  67 ++++++++++-
 5 files changed, 289 insertions(+), 21 deletions(-)

diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
index 5a5ea59..0d1bebc 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.storm;
 
 import static java.lang.String.format;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
@@ -35,12 +36,13 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
 import org.apache.storm.metric.api.IMetric;
-import org.apache.storm.shade.org.eclipse.jetty.util.log.Log;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -65,7 +67,6 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
     public static final String CONSUMER_THROUGHPUT_BYTES = "consumerThroughput";
 
     private final ClientConfigurationData clientConf;
-    private final ConsumerConfigurationData<byte[]> consumerConf;
     private final PulsarSpoutConfiguration pulsarSpoutConf;
     private final long failedRetriesTimeoutNano;
     private final int maxFailedRetries;
@@ -77,7 +78,7 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
     private String componentId;
     private String spoutId;
     private SpoutOutputCollector collector;
-    private Consumer<byte[]> consumer;
+    private PulsarSpoutConsumer consumer;
     private volatile long messagesReceived = 0;
     private volatile long messagesEmitted = 0;
     private volatile long messagesFailed = 0;
@@ -93,12 +94,6 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
 
         this.clientConf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone();
         this.clientConf.setServiceUrl(pulsarSpoutConf.getServiceUrl());
-        this.consumerConf = new ConsumerConfigurationData<>();
-        this.consumerConf.setTopicNames(Collections.singleton(pulsarSpoutConf.getTopic()));
-        this.consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName());
-        this.consumerConf.setSubscriptionType(pulsarSpoutConf.getSubscriptionType());
-        this.consumerConf.setReceiverQueueSize(pulsarSpoutConf.getConsumerReceiverQueueSize());
-
         this.pulsarSpoutConf = pulsarSpoutConf;
         this.failedRetriesTimeoutNano = pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.NANOSECONDS);
         this.maxFailedRetries = pulsarSpoutConf.getMaxFailedRetries();
@@ -110,7 +105,12 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
             LOG.info("[{}] Closing Pulsar consumer for topic {}", spoutId, pulsarSpoutConf.getTopic());
             
             if (pulsarSpoutConf.isAutoUnsubscribe()) {
-                consumer.unsubscribe();
+                try {
+                    consumer.unsubscribe();    
+                }catch(PulsarClientException e) {
+                    LOG.error("[{}] Failed to unsubscribe {} on topic {}", spoutId,
+                            this.pulsarSpoutConf.getSubscriptionName(), pulsarSpoutConf.getTopic(), e);
+                }
             }
             
             if (!pulsarSpoutConf.isSharedConsumerEnabled() && consumer != null) {
@@ -259,16 +259,7 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
         pendingMessageRetries.clear();
         failedMessages.clear();
         try {
-            sharedPulsarClient = SharedPulsarClient.get(componentId, clientConf);
-            if (pulsarSpoutConf.isSharedConsumerEnabled()) {
-                consumer = sharedPulsarClient.getSharedConsumer(consumerConf);
-            } else {
-                try {
-                    consumer = sharedPulsarClient.getClient().subscribeAsync(consumerConf).join();
-                } catch (CompletionException e) {
-                    throw (PulsarClientException) e.getCause();
-                }
-            }
+            consumer = createConsumer();
             LOG.info("[{}] Created a pulsar consumer on topic {} to receive messages with subscription {}", spoutId,
                     pulsarSpoutConf.getTopic(), pulsarSpoutConf.getSubscriptionName());
         } catch (PulsarClientException e) {
@@ -280,6 +271,27 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
                 pulsarSpoutConf.getMetricsTimeIntervalInSecs());
     }
 
+    private PulsarSpoutConsumer createConsumer() throws PulsarClientException {
+        sharedPulsarClient = SharedPulsarClient.get(componentId, clientConf);
+        PulsarSpoutConsumer consumer;
+        if (pulsarSpoutConf.isSharedConsumerEnabled()) {
+            consumer = pulsarSpoutConf.isDurableSubscription()
+                    ? new SpoutConsumer(sharedPulsarClient.getSharedConsumer(newConsumerConfiguration(pulsarSpoutConf)))
+                    : new SpoutReader(sharedPulsarClient.getSharedReader(newReaderConfiguration(pulsarSpoutConf)));
+        } else {
+            try {
+                consumer = pulsarSpoutConf.isDurableSubscription()
+                        ? new SpoutConsumer(sharedPulsarClient.getClient()
+                                .subscribeAsync(newConsumerConfiguration(pulsarSpoutConf)).join())
+                        : new SpoutReader(sharedPulsarClient.getClient()
+                                .createReaderAsync(newReaderConfiguration(pulsarSpoutConf)).join());
+            } catch (CompletionException e) {
+                throw (PulsarClientException) e.getCause();
+            }
+        }
+        return consumer;
+    }
+
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         pulsarSpoutConf.getMessageToValuesMapper().declareOutputFields(declarer);
@@ -363,4 +375,84 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
         resetMetrics();
         return metrics;
     }
+
+    private ReaderConfigurationData<byte[]> newReaderConfiguration(PulsarSpoutConfiguration pulsarSpoutConf) {
+        ReaderConfigurationData<byte[]> readerConf = new ReaderConfigurationData<> ();
+        readerConf.setTopicName(pulsarSpoutConf.getTopic());
+        readerConf.setReaderName(pulsarSpoutConf.getSubscriptionName());
+        readerConf.setStartMessageId(pulsarSpoutConf.getNonDurableSubscriptionReadPosition());
+        return readerConf;
+    }
+
+    private ConsumerConfigurationData<byte[]> newConsumerConfiguration(PulsarSpoutConfiguration pulsarSpoutConf2) {
+        ConsumerConfigurationData<byte[]> consumerConf = new ConsumerConfigurationData<>();
+        consumerConf.setTopicNames(Collections.singleton(pulsarSpoutConf.getTopic()));
+        consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName());
+        consumerConf.setSubscriptionType(pulsarSpoutConf.getSubscriptionType());
+        consumerConf.setReceiverQueueSize(pulsarSpoutConf.getConsumerReceiverQueueSize());
+        return consumerConf;
+    }
+
+    static class SpoutConsumer implements PulsarSpoutConsumer {
+        private Consumer<byte[]> consumer;
+
+        public SpoutConsumer(Consumer<byte[]> consumer) {
+            super();
+            this.consumer = consumer;
+        }
+        
+        @Override
+        public Message<byte[]> receive(int timeout, TimeUnit unit) throws PulsarClientException {
+            return consumer.receive(timeout, unit);
+        }
+
+        @Override
+        public void acknowledgeAsync(Message<?> msg) {
+            consumer.acknowledgeAsync(msg);
+        }
+
+        @Override
+        public void close() throws PulsarClientException {
+            consumer.close();
+        }
+
+        @Override
+        public void unsubscribe() throws PulsarClientException {
+            consumer.unsubscribe();
+        }
+
+    }
+
+    static class SpoutReader implements PulsarSpoutConsumer {
+        private Reader<byte[]> reader;
+
+        public SpoutReader(Reader<byte[]> reader) {
+            super();
+            this.reader = reader;
+        }
+
+        @Override
+        public Message<byte[]> receive(int timeout, TimeUnit unit) throws PulsarClientException {
+            return reader.readNext(timeout, unit);
+        }
+
+        @Override
+        public void acknowledgeAsync(Message<?> msg) {
+            // No-op
+        }
+
+        @Override
+        public void close() throws PulsarClientException {
+            try {
+                reader.close();
+            } catch (IOException e) {
+                throw new PulsarClientException(e);
+            }
+        }
+
+        @Override
+        public void unsubscribe() throws PulsarClientException {
+            // No-op
+        }
+    }
 }
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
index 0f27ddc..daa598f 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.storm;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.SubscriptionType;
 
 /**
@@ -47,7 +48,11 @@ public class PulsarSpoutConfiguration extends PulsarStormConfiguration {
     private SubscriptionType subscriptionType = SubscriptionType.Shared;
     private boolean autoUnsubscribe = false;
     private int consumerReceiverQueueSize = 1000;
+    private boolean durableSubscription = true;
+    // read position if non-durable subscription is enabled : default oldest message available in topic
+    private MessageId nonDurableSubscriptionReadPosition = MessageId.earliest; 
 
+    
     /**
      * @return the subscription name for the consumer in the spout
      */
@@ -174,4 +179,31 @@ public class PulsarSpoutConfiguration extends PulsarStormConfiguration {
     public void setAutoUnsubscribe(boolean autoUnsubscribe) {
         this.autoUnsubscribe = autoUnsubscribe;
     }
+    
+    public boolean isDurableSubscription() {
+        return durableSubscription;
+    }
+
+    /**
+     * if subscription is not durable then it creates non-durable reader to start reading from the
+     * {@link #setNonDurableSubscriptionReadPosition(MessagePosition)} in topic.
+     * 
+     * @param nonDurableSubscription
+     */
+    public void setDurableSubscription(boolean durableSubscription) {
+        this.durableSubscription = durableSubscription;
+    }
+
+    public MessageId getNonDurableSubscriptionReadPosition() {
+        return nonDurableSubscriptionReadPosition;
+    }
+
+    /**
+     * Non-durable-subscription/Reader can be set to start reading from a specific position earliest/latest.
+     * 
+     * @param nonDurableSubscriptionReadPosition
+     */
+    public void setNonDurableSubscriptionReadPosition(MessageId nonDurableSubscriptionReadPosition) {
+        this.nonDurableSubscriptionReadPosition = nonDurableSubscriptionReadPosition;
+    }
 }
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java
new file mode 100644
index 0000000..d845c4e
--- /dev/null
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+public interface PulsarSpoutConsumer {
+
+    /**
+     * Receives a single message.
+     * 
+     * @param waitTime
+     * @param milliseconds
+     * @return
+     * @throws PulsarClientException
+     */
+    Message<byte[]> receive(int waitTime, TimeUnit unit) throws PulsarClientException;
+    
+    /**
+     * Ack the message async.
+     * 
+     * @param msg
+     */
+    void acknowledgeAsync(Message<?> msg);
+
+    /**
+     * unsubscribe the consumer
+     * @throws PulsarClientException 
+     */
+    void unsubscribe() throws PulsarClientException;
+    
+    /**
+     * Close the consumer
+     * 
+     * @throws PulsarClientException
+     */
+    void close() throws PulsarClientException;
+
+}
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
index d07903e..b8263a4 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java
@@ -27,10 +27,12 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,6 +45,7 @@ public class SharedPulsarClient {
     private final AtomicInteger counter = new AtomicInteger();
 
     private Consumer<byte[]> consumer;
+    private Reader<byte[]> reader;
     private Producer<byte[]> producer;
 
     private SharedPulsarClient(String componentId, ClientConfigurationData clientConf)
@@ -104,6 +107,23 @@ public class SharedPulsarClient {
         return consumer;
     }
 
+    public Reader<byte[]> getSharedReader(ReaderConfigurationData<byte[]> readerConf) throws PulsarClientException {
+        counter.incrementAndGet();
+        synchronized (this) {
+            if (reader == null) {
+                try {
+                    reader = client.createReaderAsync(readerConf).join();
+                } catch (CompletionException e) {
+                    throw (PulsarClientException) e.getCause();
+                }
+                LOG.info("[{}] Created a new Pulsar reader on {}", componentId, readerConf.getTopicName());
+            } else {
+                LOG.info("[{}] Using a shared reader on {}", componentId, readerConf.getTopicName());
+            }
+        }
+        return reader;
+    }
+
     public Producer<byte[]> getSharedProducer(ProducerConfigurationData producerConf) throws PulsarClientException {
         counter.incrementAndGet();
         synchronized (this) {
@@ -130,4 +150,5 @@ public class SharedPulsarClient {
             }
         }
     }
+
 }
diff --git a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
index 588c5a0..5764ac7 100644
--- a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
+++ b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
@@ -25,9 +25,16 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertTrue;
 
 import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
@@ -36,6 +43,11 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.pulsar.storm.PulsarSpout.SpoutConsumer;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Values;
 import org.slf4j.Logger;
@@ -73,16 +85,69 @@ public class PulsarSpoutTest {
         
         Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), null, Schema.BYTES);
         Consumer<byte[]> consumer = mock(Consumer.class);
+        SpoutConsumer spoutConsumer = new SpoutConsumer(consumer);
         CompletableFuture<Void> future = new CompletableFuture<>();
         future.complete(null);
         doReturn(future).when(consumer).acknowledgeAsync(msg.getMessageId());
         Field consField = PulsarSpout.class.getDeclaredField("consumer");
         consField.setAccessible(true);
-        consField.set(spout, consumer);
+        consField.set(spout, spoutConsumer);
         
         spout.fail(msg);
         spout.ack(msg);
         spout.emitNextAvailableTuple();
         verify(consumer, atLeast(1)).receive(anyInt(), any());
     }
+    
+    @Test
+    public void testPulsarSpout() throws Exception {
+        PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration();
+        conf.setServiceUrl("http://localhost:8080");
+        conf.setSubscriptionName("sub1");
+        conf.setTopic("persistent://prop/ns1/topic1");
+        conf.setSubscriptionType(SubscriptionType.Exclusive);
+        conf.setSharedConsumerEnabled(true);
+        AtomicBoolean called = new AtomicBoolean(false);
+        conf.setMessageToValuesMapper(new MessageToValuesMapper() {
+            @Override
+            public Values toValues(Message<byte[]> msg) {
+                called.set(true);
+                return new Values("test");
+            }
+
+            @Override
+            public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            }
+
+        });
+
+        ClientBuilder builder = spy(new ClientBuilderImpl());
+        PulsarSpout spout = spy(new PulsarSpout(conf, builder));
+        TopologyContext context = mock(TopologyContext.class);
+        final String componentId = "test-component-id";
+        doReturn(componentId).when(context).getThisComponentId();
+        SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
+        Map config = new HashMap<>();
+        Field field = SharedPulsarClient.class.getDeclaredField("instances");
+        field.setAccessible(true);
+        ConcurrentMap<String, SharedPulsarClient> instances = (ConcurrentMap<String, SharedPulsarClient>) field
+                .get(SharedPulsarClient.class);
+
+        SharedPulsarClient client = mock(SharedPulsarClient.class);
+        Consumer<byte[]> consumer = mock(Consumer.class);
+        when(client.getSharedConsumer(any())).thenReturn(consumer);
+        instances.put(componentId, client);
+
+        ByteBuf data = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+        data.writeBytes("test".getBytes());
+        Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), data, Schema.BYTES);
+        when(consumer.receive(anyInt(), any())).thenReturn(msg);
+
+        spout.open(config, context, collector);
+        spout.emitNextAvailableTuple();
+
+        assertTrue(called.get());
+        verify(consumer, atLeast(1)).receive(anyInt(), any());
+    }
+    
 }