You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 10:41:27 UTC
[pulsar] 18/25: fix batchReceiveAsync not completed exceptionally
when closing Consumer (#7661)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 180e5f26e4afdfda178e7253fb22a8b21b2c7f54
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Tue Jul 28 14:02:34 2020 +0800
fix batchReceiveAsync not completed exceptionally when closing Consumer (#7661)
### Motivation
CompletableFuture<Messages<T>> from Consumer.batchReceiveAsync() not completed exceptionnally when closing Consumer.
### Modifications
pendingBatchReceives was not cleaned up when the connection was closed, so I added pendingBatchReceives cleanup.
(cherry picked from commit 48156ad9a5c2e0d85813367bcaaf6ea845fffc2c)
---
.../client/api/SimpleProducerConsumerTest.java | 74 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerBase.java | 26 +++++++-
.../apache/pulsar/client/impl/ConsumerImpl.java | 34 +++-------
.../client/impl/MultiTopicsConsumerImpl.java | 11 +---
4 files changed, 111 insertions(+), 34 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index dce6c10..669e259 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -3352,4 +3352,78 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
}
log.info("-- Exiting {} test --", methodName);
}
+
+ @Test(timeOut = 5000)
+ public void testReceiveAsyncCompletedWhenClosing() throws Exception {
+ final String topic = "persistent://my-property/my-ns/testCompletedWhenClosing";
+ final String partitionedTopic = "persistent://my-property/my-ns/testCompletedWhenClosing-partitioned";
+ final String errorMsg = "cleaning and closing the consumers";
+ BatchReceivePolicy batchReceivePolicy
+ = BatchReceivePolicy.builder().maxNumBytes(10 * 1024).maxNumMessages(10).timeout(-1, TimeUnit.SECONDS).build();
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic).subscriptionName("my-subscriber-name")
+ .batchReceivePolicy(batchReceivePolicy).subscribe();
+ // 1) Test receiveAsync is interrupted
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ new Thread(() -> {
+ try {
+ consumer.receiveAsync().get();
+ Assert.fail("should be interrupted");
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains(errorMsg));
+ countDownLatch.countDown();
+ }
+ }).start();
+ new Thread(() -> {
+ try {
+ consumer.close();
+ } catch (PulsarClientException ignore) {
+ }
+ }).start();
+ countDownLatch.await();
+
+ // 2) Test batchReceiveAsync is interrupted
+ CountDownLatch countDownLatch2 = new CountDownLatch(1);
+ Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic).subscriptionName("my-subscriber-name")
+ .batchReceivePolicy(batchReceivePolicy).subscribe();
+ new Thread(() -> {
+ try {
+ consumer2.batchReceiveAsync().get();
+ Assert.fail("should be interrupted");
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains(errorMsg));
+ countDownLatch2.countDown();
+ }
+ }).start();
+ new Thread(() -> {
+ try {
+ consumer2.close();
+ } catch (PulsarClientException ignore) {
+ }
+ }).start();
+ countDownLatch2.await();
+ // 3) Test partitioned topic batchReceiveAsync is interrupted
+ CountDownLatch countDownLatch3 = new CountDownLatch(1);
+ admin.topics().createPartitionedTopic(partitionedTopic, 3);
+ Consumer<String> partitionedTopicConsumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(partitionedTopic).subscriptionName("my-subscriber-name-partitionedTopic")
+ .batchReceivePolicy(batchReceivePolicy).subscribe();
+ new Thread(() -> {
+ try {
+ partitionedTopicConsumer.batchReceiveAsync().get();
+ Assert.fail("should be interrupted");
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains(errorMsg));
+ countDownLatch3.countDown();
+ }
+ }).start();
+ new Thread(() -> {
+ try {
+ partitionedTopicConsumer.close();
+ } catch (PulsarClientException ignore) {
+ }
+ }).start();
+ countDownLatch3.await();
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 3d45931..5391cb6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -195,6 +195,30 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
}
}
+ protected void failPendingReceives(ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives) {
+ while (!pendingReceives.isEmpty()) {
+ CompletableFuture<Message<T>> receiveFuture = pendingReceives.poll();
+ if (receiveFuture == null) {
+ break;
+ }
+ receiveFuture.completeExceptionally(
+ new PulsarClientException.AlreadyClosedException(String.format("The consumer which subscribes the topic %s with subscription name %s " +
+ "was already closed when cleaning and closing the consumers", topic, subscription)));
+ }
+ }
+
+ protected void failPendingBatchReceives(ConcurrentLinkedQueue<OpBatchReceive<T>> pendingBatchReceives) {
+ while (!pendingBatchReceives.isEmpty()) {
+ OpBatchReceive<T> opBatchReceive = pendingBatchReceives.poll();
+ if (opBatchReceive == null || opBatchReceive.future == null) {
+ break;
+ }
+ opBatchReceive.future.completeExceptionally(
+ new PulsarClientException.AlreadyClosedException(String.format("The consumer which subscribes the topic %s with subscription name %s " +
+ "was already closed when cleaning and closing the consumers", topic, subscription)));
+ }
+ }
+
abstract protected Messages<T> internalBatchReceive() throws PulsarClientException;
abstract protected CompletableFuture<Messages<T>> internalBatchReceiveAsync();
@@ -405,7 +429,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
TransactionImpl txn);
protected abstract CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType ackType,
- Map<String,Long> properties,
+ Map<String,Long> properties,
long delayTime,
TimeUnit unit);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 1757f1e..10320da 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -25,7 +25,6 @@ import static org.apache.pulsar.common.protocol.Commands.readChecksum;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -40,8 +39,6 @@ import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -84,7 +81,6 @@ import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.RetryMessageUtil;
-import org.apache.pulsar.common.api.proto.PulsarApi.IntRange;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
@@ -172,9 +168,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private volatile Producer<T> retryLetterProducer;
private final ReadWriteLock createProducerLock = new ReentrantReadWriteLock();
-
+
protected volatile boolean paused;
-
+
protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap = new ConcurrentOpenHashMap<>();
private int pendingChunckedMessageCount = 0;
protected long expireTimeOfIncompleteChunkedMessageMillis = 0;
@@ -560,7 +556,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
@SuppressWarnings("unchecked")
@Override
protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType ackType,
- Map<String,Long> properties,
+ Map<String,Long> properties,
long delayTime,
TimeUnit unit) {
MessageId messageId = message.getMessageId();
@@ -620,7 +616,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
if (propertiesMap.containsKey(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
reconsumetimes = Integer.valueOf(propertiesMap.get(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES));
reconsumetimes = reconsumetimes + 1;
-
+
} else {
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, originTopicNameStr);
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr);
@@ -628,7 +624,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES, String.valueOf(reconsumetimes));
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME, String.valueOf(unit.toMillis(delayTime)));
-
+
if (reconsumetimes > this.deadLetterPolicy.getMaxRedeliverCount()) {
processPossibleToDLQ((MessageIdImpl)messageId);
if (deadLetterProducer == null) {
@@ -996,18 +992,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
lock.readLock().lock();
try {
if (listenerExecutor != null && !listenerExecutor.isShutdown()) {
- while (!pendingReceives.isEmpty()) {
- CompletableFuture<Message<T>> receiveFuture = pendingReceives.poll();
- if (receiveFuture != null) {
- receiveFuture.completeExceptionally(
- new PulsarClientException.AlreadyClosedException(
- String.format("The consumer which subscribes the topic %s with subscription name %s " +
- "was already closed when cleaning and closing the consumers",
- topicName.toString(), subscription)));
- } else {
- break;
- }
- }
+ failPendingReceives(this.pendingReceives);
+ failPendingBatchReceives(this.pendingBatchReceives);
}
} finally {
lock.readLock().unlock();
@@ -1083,7 +1069,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
// if message is not decryptable then it can't be parsed as a batch-message. so, add EncyrptionCtx to message
// and return undecrypted payload
if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) {
-
+
// right now, chunked messages are only supported by non-shared subscription
if (isChunkedMessage) {
uncompressedPayload = processMessageChunk(uncompressedPayload, msgMetadata, msgId, messageId, cnx);
@@ -1152,7 +1138,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
TimeUnit.MILLISECONDS);
expireChunkMessageTaskScheduled = true;
}
-
+
if (msgMetadata.getChunkId() == 0) {
ByteBuf chunkedMsgBuffer = Unpooled.directBuffer(msgMetadata.getTotalChunkMsgSize(),
msgMetadata.getTotalChunkMsgSize());
@@ -1222,7 +1208,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
compressedPayload.release();
return uncompressedPayload;
}
-
+
protected void triggerListener(int numMessages) {
// Trigger the notification on the message listener in a separate thread to avoid blocking the networking
// thread while the message processing happens
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 7f44836..50e4db9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -566,15 +566,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
lock.readLock().lock();
try {
if (listenerExecutor != null && !listenerExecutor.isShutdown()) {
- while (!pendingReceives.isEmpty()) {
- CompletableFuture<Message<T>> receiveFuture = pendingReceives.poll();
- if (receiveFuture != null) {
- receiveFuture.completeExceptionally(
- new PulsarClientException.AlreadyClosedException("Consumer is already closed"));
- } else {
- break;
- }
- }
+ failPendingReceives(pendingReceives);
+ failPendingBatchReceives(pendingBatchReceives);
}
} finally {
lock.readLock().unlock();