You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/04/22 21:54:12 UTC

[pulsar] branch master updated: Feature / Interceptor for negative ack redelivery (#3962)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7890750  Feature / Interceptor for negative ack redelivery (#3962)
7890750 is described below

commit 789075013b404780a62c89b2acfea343680d73b3
Author: Ezequiel Lovelle <ez...@gmail.com>
AuthorDate: Mon Apr 22 18:54:07 2019 -0300

    Feature / Interceptor for negative ack redelivery (#3962)
    
    * Feature / Interceptor for negative ack redelivery
    
    *Motivation*
    
    In some scenarios is it helpful to be able to set interceptor for redeliveries
    being happening due to negative acknowledge.
    
    *Modifications*
    
      - Add onNegativeAcksSend() method in ConsumerInterceptor interface.
      - Add handler for onNegativeAcksSend() interceptor in ConsumerBase.
      - Favor forEach on ConsumerInterceptor instead of classic for loop by index.
      - Optimization for each by index to avoid compute size() every iteration.
      - Add call method to onNegativeAckRedelivery() from NegativeAcksTracker.
    
    * Add test case for onNegativeAcksSend interceptor
---
 .../apache/pulsar/client/api/InterceptorsTest.java | 88 ++++++++++++++++++++++
 .../pulsar/client/api/ConsumerInterceptor.java     | 13 ++++
 .../apache/pulsar/client/impl/ConsumerBase.java    |  6 ++
 .../pulsar/client/impl/ConsumerInterceptors.java   | 30 +++++++-
 .../pulsar/client/impl/NegativeAcksTracker.java    |  1 +
 5 files changed, 134 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
index 36d35a3..3e4eedf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
@@ -30,6 +30,9 @@ import org.testng.annotations.Test;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 public class InterceptorsTest extends ProducerConsumerBase {
 
@@ -138,6 +141,11 @@ public class InterceptorsTest extends ProducerConsumerBase {
             public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
                 log.info("onAcknowledgeCumulative messageIds: {}", messageId, cause);
             }
+
+            @Override
+            public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
+
+            }
         };
 
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
@@ -192,6 +200,11 @@ public class InterceptorsTest extends ProducerConsumerBase {
             public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
                 log.info("onAcknowledgeCumulative messageIds: {}", messageId, cause);
             }
+
+            @Override
+            public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
+
+            }
         };
 
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
@@ -254,6 +267,11 @@ public class InterceptorsTest extends ProducerConsumerBase {
             public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
                 log.info("onAcknowledgeCumulative messageIds: {}", messageId, cause);
             }
+
+            @Override
+            public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
+
+            }
         };
 
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
@@ -321,6 +339,11 @@ public class InterceptorsTest extends ProducerConsumerBase {
                 ackHolder.clear();
                 log.info("onAcknowledgeCumulative messageIds: {}", messageId, cause);
             }
+
+            @Override
+            public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
+
+            }
         };
 
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
@@ -356,4 +379,69 @@ public class InterceptorsTest extends ProducerConsumerBase {
         producer.close();
         consumer.close();
     }
+
+    @Test(timeOut = 5000)
+    public void testConsumerInterceptorForNegativeAcksSend() throws PulsarClientException, InterruptedException {
+        final int totalNumOfMessages = 100;
+        CountDownLatch latch = new CountDownLatch(totalNumOfMessages / 2);
+
+        ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>() {
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
+                return message;
+            }
+
+            @Override
+            public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+
+            }
+
+            @Override
+            public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+
+            }
+
+            @Override
+            public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
+                messageIds.forEach(messageId -> latch.countDown());
+            }
+        };
+
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic")
+                .subscriptionType(SubscriptionType.Failover)
+                .intercept(interceptor)
+                .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
+                .subscriptionName("my-subscription")
+                .subscribe();
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic")
+                .create();
+
+        for (int i = 0; i < totalNumOfMessages; i++) {
+            producer.send("Mock message");
+        }
+
+        for (int i = 0; i < totalNumOfMessages; i++) {
+            Message<String> message = consumer.receive();
+
+            if (i % 2 == 0) {
+                consumer.negativeAcknowledge(message);
+            } else {
+                consumer.acknowledge(message);
+            }
+        }
+
+        latch.await();
+        Assert.assertEquals(latch.getCount(), 0);
+
+        producer.close();
+        consumer.close();
+    }
 }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java
index ce99df0..7c94d86 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.api;
 
+import java.util.Set;
+
 /**
  * A plugin interface that allows you to intercept (and possibly mutate)
  * messages received by the consumer.
@@ -94,4 +96,15 @@ public interface ConsumerInterceptor<T> extends AutoCloseable {
      * @param exception the exception on acknowledge.
      */
     void onAcknowledgeCumulative(Consumer<T> consumer, MessageId messageId, Throwable exception);
+
+    /**
+     *
+     * This method will be called when a redelivery from a negative acknowledge occurs.
+     *
+     * <p>Any exception thrown by this method will be ignored by the caller.
+     *
+     * @param consumer the consumer which contains the interceptor
+     * @param messageIds message to ack, null if acknowledge fail.
+     */
+    void onNegativeAcksSend(Consumer<T> consumer, Set<MessageId> messageIds);
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index f5ab15a..350ed9c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -389,6 +389,12 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         }
     }
 
+    protected void onNegativeAcksSend(Set<MessageId> messageIds) {
+        if (interceptors != null) {
+            interceptors.onNegativeAcksSend(this, messageIds);
+        }
+    }
+
     protected synchronized void incrRefCount() {
         ++refCount;
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
index a0d30fc..6513b36 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
+import java.util.Set;
 
 /**
  * A container that hold the list {@link org.apache.pulsar.client.api.ConsumerInterceptor} and wraps calls to the chain
@@ -62,7 +63,7 @@ public class ConsumerInterceptors<T> implements Closeable {
      */
     public Message<T> beforeConsume(Consumer<T> consumer, Message<T> message) {
         Message<T> interceptorMessage = message;
-        for (int i = 0; i < interceptors.size(); i++) {
+        for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) {
             try {
                 interceptorMessage = interceptors.get(i).beforeConsume(consumer, interceptorMessage);
             } catch (Exception e) {
@@ -88,7 +89,7 @@ public class ConsumerInterceptors<T> implements Closeable {
      * @param exception exception returned by broker.
      */
     public void onAcknowledge(Consumer<T> consumer, MessageId messageId, Throwable exception) {
-        for (int i = 0; i < interceptors.size(); i++) {
+        for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) {
             try {
                 interceptors.get(i).onAcknowledge(consumer, messageId, exception);
             } catch (Exception e) {
@@ -109,7 +110,7 @@ public class ConsumerInterceptors<T> implements Closeable {
      * @param exception exception returned by broker.
      */
     public void onAcknowledgeCumulative(Consumer<T> consumer, MessageId messageId, Throwable exception) {
-        for (int i = 0; i < interceptors.size(); i++) {
+        for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) {
             try {
                 interceptors.get(i).onAcknowledgeCumulative(consumer, messageId, exception);
             } catch (Exception e) {
@@ -118,9 +119,30 @@ public class ConsumerInterceptors<T> implements Closeable {
         }
     }
 
+    /**
+     * This is called when a redelivery from a negative acknowledge occurs.
+     * <p>
+     * This method calls {@link ConsumerInterceptor#onNegativeAcksSend(Consumer, Set)
+     * onNegativeAcksSend(Consumer, Set&lt;MessageId&gt;)} method for each interceptor.
+     * <p>
+     * This method does not throw exceptions. Exceptions thrown by any of interceptors in the chain are logged, but not propagated.
+     *
+     * @param consumer the consumer which contains the interceptors.
+     * @param messageIds set of message IDs being redelivery due a negative acknowledge.
+     */
+    public void onNegativeAcksSend(Consumer<T> consumer, Set<MessageId> messageIds) {
+        for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) {
+            try {
+                interceptors.get(i).onNegativeAcksSend(consumer, messageIds);
+            } catch (Exception e) {
+                log.warn("Error executing interceptor onNegativeAcksSend callback", e);
+            }
+        }
+    }
+
     @Override
     public void close() throws IOException {
-        for (int i = 0; i < interceptors.size(); i++) {
+        for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) {
             try {
                 interceptors.get(i).close();
             } catch (Exception e) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index 0c3ac67..138c761 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -67,6 +67,7 @@ class NegativeAcksTracker {
         });
 
         messagesToRedeliver.forEach(nackedMessages::remove);
+        consumer.onNegativeAcksSend(messagesToRedeliver);
         consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
 
         this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS);