You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/04/09 02:09:51 UTC

[pulsar] branch master updated: [pulsar-storm] Fix NPE while emitting next tuple (#3991)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 12de91f  [pulsar-storm] Fix NPE while emitting next tuple (#3991)
12de91f is described below

commit 12de91fb231c3e609137de60bbdbc29c5c2f2e13
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Mon Apr 8 19:09:46 2019 -0700

    [pulsar-storm] Fix NPE while emitting next tuple (#3991)
    
    ### Motivation
    
    [PulsarSpout] removes messages from [pendingMessageRetries](https://github.com/apache/pulsar/blob/master/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java#L126) but it doesn't remove from the `failedMessages` queue because of that PulsarSpout throws NPE while [emitting next tuple](https://github.com/apache/pulsar/blob/master/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java#L181)
    
    ````
    stack-trace with old pulsar-storm lib: 1.20
    2019-04-05 18:49:58.240 b.s.util CmsSpout_[1 1] [INFO] Async loop Stacktrace is: {} java.lang.NullPointerException
        at org.apache.pulsar.storm.PulsarSpout.emitNextAvailableTuple(PulsarSpout.java:176)
        at org.apache.pulsar.storm.PulsarSpout.nextTuple(PulsarSpout.java:160)
        at backtype.storm.daemon.executor$fn__7365$fn__7380$fn__7411.invoke(executor.clj:577)
        at backtype.storm.util$async_loop$fn__551.invoke(util.clj:491)
        at clojure.lang.AFn.run(AFn.java:22)
        at java.lang.Thread.run(Thread.java:748)
    ```
---
 .../java/org/apache/pulsar/storm/PulsarSpout.java  | 54 +++++++++----
 .../org/apache/pulsar/storm/PulsarSpoutTest.java   | 88 ++++++++++++++++++++++
 2 files changed, 127 insertions(+), 15 deletions(-)

diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
index 8907fc3..2588741 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
@@ -40,6 +40,7 @@ import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.shade.org.eclipse.jetty.util.log.Log;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -124,6 +125,8 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
             }
             consumer.acknowledgeAsync(msg);
             pendingMessageRetries.remove(msg.getMessageId());
+            // we should also remove message from failedMessages but it will be eventually removed while emitting next
+            // tuple
             --pendingAcks;
         }
     }
@@ -172,25 +175,12 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
      * emit.
      */
     public void emitNextAvailableTuple() {
-        Message<byte[]> msg;
-
         // check if there are any failed messages to re-emit in the topology
-        msg = failedMessages.peek();
-        if (msg != null) {
-            MessageRetries messageRetries = pendingMessageRetries.get(msg.getMessageId());
-            if (Backoff.shouldBackoff(messageRetries.getTimeStamp(), TimeUnit.NANOSECONDS,
-                    messageRetries.getNumRetries(), clientConf.getDefaultBackoffIntervalNanos(), 
-                    clientConf.getMaxBackoffIntervalNanos())) {
-                Utils.sleep(TimeUnit.NANOSECONDS.toMillis(clientConf.getDefaultBackoffIntervalNanos()));
-            } else {
-                // remove the message from the queue and emit to the topology, only if it should not be backedoff
-                LOG.info("[{}] Retrying failed message {}", spoutId, msg.getMessageId());
-                failedMessages.remove();
-                mapToValueAndEmit(msg);
-            }
+        if(emitFailedMessage()) {
             return;
         }
 
+        Message<byte[]> msg;
         // receive from consumer if no failed messages
         if (consumer != null) {
             if (LOG.isDebugEnabled()) {
@@ -215,6 +205,40 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
         }
     }
 
+    private boolean emitFailedMessage() {
+        Message<byte[]> msg;
+
+        while ((msg = failedMessages.peek()) != null) {
+            MessageRetries messageRetries = pendingMessageRetries.get(msg.getMessageId());
+            if (messageRetries != null) {
+                // emit the tuple if retry doesn't need backoff else sleep with backoff time and return without doing
+                // anything
+                if (Backoff.shouldBackoff(messageRetries.getTimeStamp(), TimeUnit.NANOSECONDS,
+                        messageRetries.getNumRetries(), clientConf.getDefaultBackoffIntervalNanos(),
+                        clientConf.getMaxBackoffIntervalNanos())) {
+                    Utils.sleep(TimeUnit.NANOSECONDS.toMillis(clientConf.getDefaultBackoffIntervalNanos()));
+                } else {
+                    // remove the message from the queue and emit to the topology, only if it should not be backedoff
+                    LOG.info("[{}] Retrying failed message {}", spoutId, msg.getMessageId());
+                    failedMessages.remove();
+                    mapToValueAndEmit(msg);
+                }
+                return true;
+            }
+
+            // messageRetries is null because messageRetries is already acked and removed from pendingMessageRetries
+            // then remove it from failed message queue as well.
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("[{}]-{} removing {} from failedMessage because it's already acked",
+                        pulsarSpoutConf.getTopic(), spoutId, msg.getMessageId());
+            }
+            failedMessages.remove();
+            // try to find out next failed message
+            continue;
+        }
+        return false;
+    }
+
     @Override
     @SuppressWarnings({ "rawtypes" })
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
diff --git a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
new file mode 100644
index 0000000..588c5a0
--- /dev/null
+++ b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Maps;
+
+public class PulsarSpoutTest {
+
+    private static final Logger log = LoggerFactory.getLogger(PulsarSpoutTest.class);
+    
+    @Test
+    public void testAckFailedMessage() throws Exception {
+        
+        PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration();
+        conf.setServiceUrl("http://localhost:8080");
+        conf.setSubscriptionName("sub1");
+        conf.setTopic("persistent://prop/ns1/topic1");
+        conf.setSubscriptionType(SubscriptionType.Exclusive);
+        conf.setMessageToValuesMapper(new MessageToValuesMapper() {
+            @Override
+            public Values toValues(Message<byte[]> msg) {
+                return null;
+            }
+
+            @Override
+            public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            }
+            
+        });
+        
+        ClientBuilder builder = spy(new ClientBuilderImpl());
+        PulsarSpout spout = spy(new PulsarSpout(conf, builder));
+        
+        Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), null, Schema.BYTES);
+        Consumer<byte[]> consumer = mock(Consumer.class);
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        future.complete(null);
+        doReturn(future).when(consumer).acknowledgeAsync(msg.getMessageId());
+        Field consField = PulsarSpout.class.getDeclaredField("consumer");
+        consField.setAccessible(true);
+        consField.set(spout, consumer);
+        
+        spout.fail(msg);
+        spout.ack(msg);
+        spout.emitNextAvailableTuple();
+        verify(consumer, atLeast(1)).receive(anyInt(), any());
+    }
+}