You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/04/09 02:09:51 UTC
[pulsar] branch master updated: [pulsar-storm] Fix NPE while
emitting next tuple (#3991)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 12de91f [pulsar-storm] Fix NPE while emitting next tuple (#3991)
12de91f is described below
commit 12de91fb231c3e609137de60bbdbc29c5c2f2e13
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Mon Apr 8 19:09:46 2019 -0700
[pulsar-storm] Fix NPE while emitting next tuple (#3991)
### Motivation
[PulsarSpout] removes messages from [pendingMessageRetries](https://github.com/apache/pulsar/blob/master/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java#L126) but it doesn't remove from the `failedMessages` queue because of that PulsarSpout throws NPE while [emitting next tuple](https://github.com/apache/pulsar/blob/master/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java#L181)
````
stack-trace with old pulsar-storm lib: 1.20
2019-04-05 18:49:58.240 b.s.util CmsSpout_[1 1] [INFO] Async loop Stacktrace is: {} java.lang.NullPointerException
at org.apache.pulsar.storm.PulsarSpout.emitNextAvailableTuple(PulsarSpout.java:176)
at org.apache.pulsar.storm.PulsarSpout.nextTuple(PulsarSpout.java:160)
at backtype.storm.daemon.executor$fn__7365$fn__7380$fn__7411.invoke(executor.clj:577)
at backtype.storm.util$async_loop$fn__551.invoke(util.clj:491)
at clojure.lang.AFn.run(AFn.java:22)
at java.lang.Thread.run(Thread.java:748)
```
---
.../java/org/apache/pulsar/storm/PulsarSpout.java | 54 +++++++++----
.../org/apache/pulsar/storm/PulsarSpoutTest.java | 88 ++++++++++++++++++++++
2 files changed, 127 insertions(+), 15 deletions(-)
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
index 8907fc3..2588741 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
@@ -40,6 +40,7 @@ import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.shade.org.eclipse.jetty.util.log.Log;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -124,6 +125,8 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
}
consumer.acknowledgeAsync(msg);
pendingMessageRetries.remove(msg.getMessageId());
+ // we should also remove message from failedMessages but it will be eventually removed while emitting next
+ // tuple
--pendingAcks;
}
}
@@ -172,25 +175,12 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
* emit.
*/
public void emitNextAvailableTuple() {
- Message<byte[]> msg;
-
// check if there are any failed messages to re-emit in the topology
- msg = failedMessages.peek();
- if (msg != null) {
- MessageRetries messageRetries = pendingMessageRetries.get(msg.getMessageId());
- if (Backoff.shouldBackoff(messageRetries.getTimeStamp(), TimeUnit.NANOSECONDS,
- messageRetries.getNumRetries(), clientConf.getDefaultBackoffIntervalNanos(),
- clientConf.getMaxBackoffIntervalNanos())) {
- Utils.sleep(TimeUnit.NANOSECONDS.toMillis(clientConf.getDefaultBackoffIntervalNanos()));
- } else {
- // remove the message from the queue and emit to the topology, only if it should not be backedoff
- LOG.info("[{}] Retrying failed message {}", spoutId, msg.getMessageId());
- failedMessages.remove();
- mapToValueAndEmit(msg);
- }
+ if(emitFailedMessage()) {
return;
}
+ Message<byte[]> msg;
// receive from consumer if no failed messages
if (consumer != null) {
if (LOG.isDebugEnabled()) {
@@ -215,6 +205,40 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
}
}
+ private boolean emitFailedMessage() {
+ Message<byte[]> msg;
+
+ while ((msg = failedMessages.peek()) != null) {
+ MessageRetries messageRetries = pendingMessageRetries.get(msg.getMessageId());
+ if (messageRetries != null) {
+ // emit the tuple if retry doesn't need backoff else sleep with backoff time and return without doing
+ // anything
+ if (Backoff.shouldBackoff(messageRetries.getTimeStamp(), TimeUnit.NANOSECONDS,
+ messageRetries.getNumRetries(), clientConf.getDefaultBackoffIntervalNanos(),
+ clientConf.getMaxBackoffIntervalNanos())) {
+ Utils.sleep(TimeUnit.NANOSECONDS.toMillis(clientConf.getDefaultBackoffIntervalNanos()));
+ } else {
+ // remove the message from the queue and emit to the topology, only if it should not be backedoff
+ LOG.info("[{}] Retrying failed message {}", spoutId, msg.getMessageId());
+ failedMessages.remove();
+ mapToValueAndEmit(msg);
+ }
+ return true;
+ }
+
+ // messageRetries is null because messageRetries is already acked and removed from pendingMessageRetries
+ // then remove it from failed message queue as well.
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("[{}]-{} removing {} from failedMessage because it's already acked",
+ pulsarSpoutConf.getTopic(), spoutId, msg.getMessageId());
+ }
+ failedMessages.remove();
+ // try to find out next failed message
+ continue;
+ }
+ return false;
+ }
+
@Override
@SuppressWarnings({ "rawtypes" })
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
diff --git a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
new file mode 100644
index 0000000..588c5a0
--- /dev/null
+++ b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Maps;
+
+public class PulsarSpoutTest {
+
+ private static final Logger log = LoggerFactory.getLogger(PulsarSpoutTest.class);
+
+ @Test
+ public void testAckFailedMessage() throws Exception {
+
+ PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration();
+ conf.setServiceUrl("http://localhost:8080");
+ conf.setSubscriptionName("sub1");
+ conf.setTopic("persistent://prop/ns1/topic1");
+ conf.setSubscriptionType(SubscriptionType.Exclusive);
+ conf.setMessageToValuesMapper(new MessageToValuesMapper() {
+ @Override
+ public Values toValues(Message<byte[]> msg) {
+ return null;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ }
+
+ });
+
+ ClientBuilder builder = spy(new ClientBuilderImpl());
+ PulsarSpout spout = spy(new PulsarSpout(conf, builder));
+
+ Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), null, Schema.BYTES);
+ Consumer<byte[]> consumer = mock(Consumer.class);
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ future.complete(null);
+ doReturn(future).when(consumer).acknowledgeAsync(msg.getMessageId());
+ Field consField = PulsarSpout.class.getDeclaredField("consumer");
+ consField.setAccessible(true);
+ consField.set(spout, consumer);
+
+ spout.fail(msg);
+ spout.ack(msg);
+ spout.emitNextAvailableTuple();
+ verify(consumer, atLeast(1)).receive(anyInt(), any());
+ }
+}