You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/06/14 17:36:23 UTC
[pulsar] branch master updated: Fix NPE of ProducerInterceptors
(#4517)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2285f0d Fix NPE of ProducerInterceptors (#4517)
2285f0d is described below
commit 2285f0d65de293a41cb964b8045871e4a530b529
Author: lipenghui <co...@gmail.com>
AuthorDate: Sat Jun 15 01:36:17 2019 +0800
Fix NPE of ProducerInterceptors (#4517)
* Fix ProducerInterceptors NPE issue.
* Add UT test for ProducerInterceptors while interceptor cause exceptions.
---
.../apache/pulsar/client/api/InterceptorsTest.java | 28 ++++++++++++++++++++++
.../pulsar/client/impl/ProducerInterceptors.java | 4 ++--
2 files changed, 30 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
index e531f08..c079d65 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
@@ -118,6 +118,34 @@ public class InterceptorsTest extends ProducerConsumerBase {
}
@Test
+ public void testProducerInterceptorsWithExceptions() throws PulsarClientException {
+ ProducerInterceptor<String> interceptor = new ProducerInterceptor<String>() {
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public Message<String> beforeSend(Producer<String> producer, Message<String> message) {
+ throw new NullPointerException();
+ }
+
+ @Override
+ public void onSendAcknowledgement(Producer<String> producer, Message<String> message, MessageId msgId, Throwable exception) {
+ throw new NullPointerException();
+ }
+ };
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic("persistent://my-property/my-ns/my-topic")
+ .intercept(interceptor)
+ .create();
+
+ MessageId messageId = producer.newMessage().value("Hello Pulsar!").send();
+ Assert.assertNotNull(messageId);
+ producer.close();
+ }
+
+ @Test
public void testConsumerInterceptorWithSingleTopicSubscribe() throws PulsarClientException {
ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>() {
@Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java
index 59d1ad3..c70e4a3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java
@@ -64,8 +64,8 @@ public class ProducerInterceptors<T> implements Closeable {
try {
interceptorMessage = interceptors.get(i).beforeSend(producer, interceptorMessage);
} catch (Exception e) {
- if (message != null && producer != null) {
- log.warn("Error executing interceptor beforeSend callback for messageId: {}, topicName:{} ", message.getMessageId(), producer.getTopic(), e);
+ if (producer != null) {
+ log.warn("Error executing interceptor beforeSend callback for topicName:{} ", producer.getTopic(), e);
} else {
log.warn("Error Error executing interceptor beforeSend callback ", e);
}