You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/06/14 17:36:23 UTC

[pulsar] branch master updated: Fix NPE of ProducerInterceptors (#4517)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2285f0d  Fix NPE of ProducerInterceptors (#4517)
2285f0d is described below

commit 2285f0d65de293a41cb964b8045871e4a530b529
Author: lipenghui <co...@gmail.com>
AuthorDate: Sat Jun 15 01:36:17 2019 +0800

    Fix NPE of ProducerInterceptors (#4517)
    
    * Fix ProducerInterceptors NPE issue.
    
    * Add UT test for ProducerInterceptors while interceptor cause exceptions.
---
 .../apache/pulsar/client/api/InterceptorsTest.java | 28 ++++++++++++++++++++++
 .../pulsar/client/impl/ProducerInterceptors.java   |  4 ++--
 2 files changed, 30 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
index e531f08..c079d65 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
@@ -118,6 +118,34 @@ public class InterceptorsTest extends ProducerConsumerBase {
     }
 
     @Test
+    public void testProducerInterceptorsWithExceptions() throws PulsarClientException {
+        ProducerInterceptor<String> interceptor = new ProducerInterceptor<String>() {
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public Message<String> beforeSend(Producer<String> producer, Message<String> message) {
+                throw new NullPointerException();
+            }
+
+            @Override
+            public void onSendAcknowledgement(Producer<String> producer, Message<String> message, MessageId msgId, Throwable exception) {
+                throw new NullPointerException();
+            }
+        };
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+            .topic("persistent://my-property/my-ns/my-topic")
+            .intercept(interceptor)
+            .create();
+
+        MessageId messageId = producer.newMessage().value("Hello Pulsar!").send();
+        Assert.assertNotNull(messageId);
+        producer.close();
+    }
+
+    @Test
     public void testConsumerInterceptorWithSingleTopicSubscribe() throws PulsarClientException {
         ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>() {
             @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java
index 59d1ad3..c70e4a3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java
@@ -64,8 +64,8 @@ public class ProducerInterceptors<T> implements Closeable {
             try {
                 interceptorMessage = interceptors.get(i).beforeSend(producer, interceptorMessage);
             } catch (Exception e) {
-                if (message != null && producer != null) {
-                    log.warn("Error executing interceptor beforeSend callback for messageId: {}, topicName:{} ", message.getMessageId(), producer.getTopic(), e);
+                if (producer != null) {
+                    log.warn("Error executing interceptor beforeSend callback for topicName:{} ", producer.getTopic(), e);
                 } else {
                     log.warn("Error Error executing interceptor beforeSend callback ", e);
                 }