You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/04/22 21:54:12 UTC
[pulsar] branch master updated: Feature / Interceptor for negative
ack redelivery (#3962)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7890750 Feature / Interceptor for negative ack redelivery (#3962)
7890750 is described below
commit 789075013b404780a62c89b2acfea343680d73b3
Author: Ezequiel Lovelle <ez...@gmail.com>
AuthorDate: Mon Apr 22 18:54:07 2019 -0300
Feature / Interceptor for negative ack redelivery (#3962)
* Feature / Interceptor for negative ack redelivery
*Motivation*
In some scenarios is it helpful to be able to set interceptor for redeliveries
being happening due to negative acknowledge.
*Modifications*
- Add onNegativeAcksSend() method in ConsumerInterceptor interface.
- Add handler for onNegativeAcksSend() interceptor in ConsumerBase.
- Favor forEach on ConsumerInterceptor instead of classic for loop by index.
- Optimization for each by index to avoid compute size() every iteration.
- Add call method to onNegativeAckRedelivery() from NegativeAcksTracker.
* Add test case for onNegativeAcksSend interceptor
---
.../apache/pulsar/client/api/InterceptorsTest.java | 88 ++++++++++++++++++++++
.../pulsar/client/api/ConsumerInterceptor.java | 13 ++++
.../apache/pulsar/client/impl/ConsumerBase.java | 6 ++
.../pulsar/client/impl/ConsumerInterceptors.java | 30 +++++++-
.../pulsar/client/impl/NegativeAcksTracker.java | 1 +
5 files changed, 134 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
index 36d35a3..3e4eedf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
@@ -30,6 +30,9 @@ import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
public class InterceptorsTest extends ProducerConsumerBase {
@@ -138,6 +141,11 @@ public class InterceptorsTest extends ProducerConsumerBase {
public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
log.info("onAcknowledgeCumulative messageIds: {}", messageId, cause);
}
+
+ @Override
+ public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
+
+ }
};
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
@@ -192,6 +200,11 @@ public class InterceptorsTest extends ProducerConsumerBase {
public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
log.info("onAcknowledgeCumulative messageIds: {}", messageId, cause);
}
+
+ @Override
+ public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
+
+ }
};
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
@@ -254,6 +267,11 @@ public class InterceptorsTest extends ProducerConsumerBase {
public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
log.info("onAcknowledgeCumulative messageIds: {}", messageId, cause);
}
+
+ @Override
+ public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
+
+ }
};
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
@@ -321,6 +339,11 @@ public class InterceptorsTest extends ProducerConsumerBase {
ackHolder.clear();
log.info("onAcknowledgeCumulative messageIds: {}", messageId, cause);
}
+
+ @Override
+ public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
+
+ }
};
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
@@ -356,4 +379,69 @@ public class InterceptorsTest extends ProducerConsumerBase {
producer.close();
consumer.close();
}
+
+ @Test(timeOut = 5000)
+ public void testConsumerInterceptorForNegativeAcksSend() throws PulsarClientException, InterruptedException {
+ final int totalNumOfMessages = 100;
+ CountDownLatch latch = new CountDownLatch(totalNumOfMessages / 2);
+
+ ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>() {
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
+ return message;
+ }
+
+ @Override
+ public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+
+ }
+
+ @Override
+ public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+
+ }
+
+ @Override
+ public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
+ messageIds.forEach(messageId -> latch.countDown());
+ }
+ };
+
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic("persistent://my-property/my-ns/my-topic")
+ .subscriptionType(SubscriptionType.Failover)
+ .intercept(interceptor)
+ .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
+ .subscriptionName("my-subscription")
+ .subscribe();
+
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic("persistent://my-property/my-ns/my-topic")
+ .create();
+
+ for (int i = 0; i < totalNumOfMessages; i++) {
+ producer.send("Mock message");
+ }
+
+ for (int i = 0; i < totalNumOfMessages; i++) {
+ Message<String> message = consumer.receive();
+
+ if (i % 2 == 0) {
+ consumer.negativeAcknowledge(message);
+ } else {
+ consumer.acknowledge(message);
+ }
+ }
+
+ latch.await();
+ Assert.assertEquals(latch.getCount(), 0);
+
+ producer.close();
+ consumer.close();
+ }
}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java
index ce99df0..7c94d86 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.client.api;
+import java.util.Set;
+
/**
* A plugin interface that allows you to intercept (and possibly mutate)
* messages received by the consumer.
@@ -94,4 +96,15 @@ public interface ConsumerInterceptor<T> extends AutoCloseable {
* @param exception the exception on acknowledge.
*/
void onAcknowledgeCumulative(Consumer<T> consumer, MessageId messageId, Throwable exception);
+
+ /**
+ *
+ * This method will be called when a redelivery from a negative acknowledge occurs.
+ *
+ * <p>Any exception thrown by this method will be ignored by the caller.
+ *
+ * @param consumer the consumer which contains the interceptor
+ * @param messageIds message to ack, null if acknowledge fail.
+ */
+ void onNegativeAcksSend(Consumer<T> consumer, Set<MessageId> messageIds);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index f5ab15a..350ed9c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -389,6 +389,12 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
}
}
+ protected void onNegativeAcksSend(Set<MessageId> messageIds) {
+ if (interceptors != null) {
+ interceptors.onNegativeAcksSend(this, messageIds);
+ }
+ }
+
protected synchronized void incrRefCount() {
++refCount;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
index a0d30fc..6513b36 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
+import java.util.Set;
/**
* A container that hold the list {@link org.apache.pulsar.client.api.ConsumerInterceptor} and wraps calls to the chain
@@ -62,7 +63,7 @@ public class ConsumerInterceptors<T> implements Closeable {
*/
public Message<T> beforeConsume(Consumer<T> consumer, Message<T> message) {
Message<T> interceptorMessage = message;
- for (int i = 0; i < interceptors.size(); i++) {
+ for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) {
try {
interceptorMessage = interceptors.get(i).beforeConsume(consumer, interceptorMessage);
} catch (Exception e) {
@@ -88,7 +89,7 @@ public class ConsumerInterceptors<T> implements Closeable {
* @param exception exception returned by broker.
*/
public void onAcknowledge(Consumer<T> consumer, MessageId messageId, Throwable exception) {
- for (int i = 0; i < interceptors.size(); i++) {
+ for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) {
try {
interceptors.get(i).onAcknowledge(consumer, messageId, exception);
} catch (Exception e) {
@@ -109,7 +110,7 @@ public class ConsumerInterceptors<T> implements Closeable {
* @param exception exception returned by broker.
*/
public void onAcknowledgeCumulative(Consumer<T> consumer, MessageId messageId, Throwable exception) {
- for (int i = 0; i < interceptors.size(); i++) {
+ for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) {
try {
interceptors.get(i).onAcknowledgeCumulative(consumer, messageId, exception);
} catch (Exception e) {
@@ -118,9 +119,30 @@ public class ConsumerInterceptors<T> implements Closeable {
}
}
+ /**
+ * This is called when a redelivery from a negative acknowledge occurs.
+ * <p>
+ * This method calls {@link ConsumerInterceptor#onNegativeAcksSend(Consumer, Set)
+ * onNegativeAcksSend(Consumer, Set<MessageId>)} method for each interceptor.
+ * <p>
+ * This method does not throw exceptions. Exceptions thrown by any of interceptors in the chain are logged, but not propagated.
+ *
+ * @param consumer the consumer which contains the interceptors.
+ * @param messageIds set of message IDs being redelivery due a negative acknowledge.
+ */
+ public void onNegativeAcksSend(Consumer<T> consumer, Set<MessageId> messageIds) {
+ for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) {
+ try {
+ interceptors.get(i).onNegativeAcksSend(consumer, messageIds);
+ } catch (Exception e) {
+ log.warn("Error executing interceptor onNegativeAcksSend callback", e);
+ }
+ }
+ }
+
@Override
public void close() throws IOException {
- for (int i = 0; i < interceptors.size(); i++) {
+ for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) {
try {
interceptors.get(i).close();
} catch (Exception e) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index 0c3ac67..138c761 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -67,6 +67,7 @@ class NegativeAcksTracker {
});
messagesToRedeliver.forEach(nackedMessages::remove);
+ consumer.onNegativeAcksSend(messagesToRedeliver);
consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS);