You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2019/06/26 03:35:05 UTC
[pulsar] branch master updated: Reduce unnecessary track message
calls. (#4595)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 553f0fd Reduce unnecessary track message calls. (#4595)
553f0fd is described below
commit 553f0fde697239ee72798f09371d0f283df7f6e9
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed Jun 26 11:35:00 2019 +0800
Reduce unnecessary track message calls. (#4595)
---
.../pulsar/client/api/ConsumerRedeliveryTest.java | 54 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 43 ++++++-----------
.../client/impl/MultiTopicsConsumerImpl.java | 2 +-
3 files changed, 70 insertions(+), 29 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
index 63c974d..a42d80c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
@@ -18,7 +18,11 @@
*/
package org.apache.pulsar.client.api;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -29,6 +33,7 @@ import org.testng.annotations.Test;
import com.google.common.collect.Sets;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertEquals;
@@ -123,4 +128,53 @@ public class ConsumerRedeliveryTest extends ProducerConsumerBase {
consumer2.close();
}
+ @Test
+ public void testUnAckMessageRedeliveryWithReceiveAsync() throws PulsarClientException, ExecutionException, InterruptedException {
+ String topic = "persistent://my-property/my-ns/async-unack-redelivery";
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("s1")
+ .ackTimeout(3, TimeUnit.SECONDS)
+ .subscribe();
+
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .enableBatching(true)
+ .batchingMaxMessages(5)
+ .batchingMaxPublishDelay(1, TimeUnit.SECONDS)
+ .create();
+
+ final int messages = 10;
+ List<CompletableFuture<Message<String>>> futures = new ArrayList<>(10);
+ for (int i = 0; i < messages; i++) {
+ futures.add(consumer.receiveAsync());
+ }
+
+ for (int i = 0; i < messages; i++) {
+ producer.sendAsync("my-message-" + i);
+ }
+
+ int messageReceived = 0;
+ for (CompletableFuture<Message<String>> future : futures) {
+ Message<String> message = future.get();
+ assertNotNull(message);
+ messageReceived++;
+ // Don't ack message, wait for ack timeout.
+ }
+
+ assertEquals(10, messageReceived);
+
+ for (int i = 0; i < messages; i++) {
+ Message<String> message = consumer.receive();
+ assertNotNull(message);
+ messageReceived++;
+ consumer.acknowledge(message);
+ }
+
+ assertEquals(20, messageReceived);
+
+ producer.close();
+ consumer.close();
+ }
+
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index a25a98a..8704fd3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -312,10 +312,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
Message<T> message;
try {
message = incomingMessages.take();
- trackMessage(message);
- Message<T> interceptMsg = beforeConsume(message);
- messageProcessed(interceptMsg);
- return interceptMsg;
+ messageProcessed(message);
+ return beforeConsume(message);
} catch (InterruptedException e) {
stats.incrementNumReceiveFailed();
throw PulsarClientException.unwrap(e);
@@ -341,10 +339,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
}
if (message != null) {
- trackMessage(message);
- Message<T> interceptMsg = beforeConsume(message);
- messageProcessed(interceptMsg);
- result.complete(interceptMsg);
+ messageProcessed(message);
+ result.complete(beforeConsume(message));
}
return result;
@@ -355,12 +351,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
Message<T> message;
try {
message = incomingMessages.poll(timeout, unit);
- trackMessage(message);
- Message<T> interceptMsg = beforeConsume(message);
- if (interceptMsg != null) {
- messageProcessed(interceptMsg);
+ if (message == null) {
+ return null;
}
- return interceptMsg;
+ messageProcessed(message);
+ return beforeConsume(message);
} catch (InterruptedException e) {
State state = getState();
if (state != State.Closing && state != State.Closed) {
@@ -821,7 +816,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(), Collections.singletonList(message));
}
if (!pendingReceives.isEmpty()) {
- trackMessage(message);
notifyPendingReceivedCallback(message, null);
} else if (canEnqueueMessage(message)) {
incomingMessages.add(message);
@@ -1044,19 +1038,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
increaseAvailablePermits(currentCnx);
stats.updateNumMsgsReceived(msg);
- if (conf.getAckTimeoutMillis() != 0) {
- // reset timer for messages that are received by the client
- MessageIdImpl id = (MessageIdImpl) msg.getMessageId();
- if (id instanceof BatchMessageIdImpl) {
- id = new MessageIdImpl(id.getLedgerId(), id.getEntryId(), getPartitionIndex());
- }
- if (partitionIndex != -1) {
- // we should no longer track this message, TopicsConsumer will take care from now onwards
- unAckedMessageTracker.remove(id);
- } else {
- unAckedMessageTracker.add(id);
- }
- }
+ trackMessage(msg);
}
protected void trackMessage(Message<?> msg) {
@@ -1068,7 +1050,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
// do not add each item in batch message into tracker
id = new MessageIdImpl(id.getLedgerId(), id.getEntryId(), getPartitionIndex());
}
- unAckedMessageTracker.add(id);
+ if (partitionIndex != -1) {
+ // we should no longer track this message, TopicsConsumer will take care from now onwards
+ unAckedMessageTracker.remove(id);
+ } else {
+ unAckedMessageTracker.add(id);
+ }
}
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index a5e5138..5d45ca0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -260,7 +260,6 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
try {
TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(
consumer.getTopic(), consumer.getTopicNameWithoutPartition(), message);
- unAckedMessageTracker.add(topicMessage.getMessageId());
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Received message from topics-consumer {}",
@@ -270,6 +269,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
// if asyncReceive is waiting : return message to callback without adding to incomingMessages queue
if (!pendingReceives.isEmpty()) {
CompletableFuture<Message<T>> receivedFuture = pendingReceives.poll();
+ unAckedMessageTracker.add(topicMessage.getMessageId());
listenerExecutor.execute(() -> receivedFuture.complete(topicMessage));
} else {
// Enqueue the message so that it can be retrieved when application calls receive()