You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 10:41:27 UTC

[pulsar] 18/25: fix batchReceiveAsync not completed exceptionally when closing Consumer (#7661)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 180e5f26e4afdfda178e7253fb22a8b21b2c7f54
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Tue Jul 28 14:02:34 2020 +0800

    fix batchReceiveAsync not completed exceptionally when closing Consumer (#7661)
    
    ### Motivation
    
    CompletableFuture<Messages<T>> from Consumer.batchReceiveAsync() not completed exceptionnally when closing Consumer.
    
    ### Modifications
    
    pendingBatchReceives was not cleaned up when the connection was closed, so I added pendingBatchReceives cleanup.
    
    (cherry picked from commit 48156ad9a5c2e0d85813367bcaaf6ea845fffc2c)
---
 .../client/api/SimpleProducerConsumerTest.java     | 74 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerBase.java    | 26 +++++++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 34 +++-------
 .../client/impl/MultiTopicsConsumerImpl.java       | 11 +---
 4 files changed, 111 insertions(+), 34 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index dce6c10..669e259 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -3352,4 +3352,78 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         }
         log.info("-- Exiting {} test --", methodName);
     }
+
+    @Test(timeOut = 5000)
+    public void testReceiveAsyncCompletedWhenClosing() throws Exception {
+        final String topic = "persistent://my-property/my-ns/testCompletedWhenClosing";
+        final String partitionedTopic = "persistent://my-property/my-ns/testCompletedWhenClosing-partitioned";
+        final String errorMsg = "cleaning and closing the consumers";
+        BatchReceivePolicy batchReceivePolicy
+                = BatchReceivePolicy.builder().maxNumBytes(10 * 1024).maxNumMessages(10).timeout(-1, TimeUnit.SECONDS).build();
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic).subscriptionName("my-subscriber-name")
+                .batchReceivePolicy(batchReceivePolicy).subscribe();
+        // 1) Test receiveAsync is interrupted
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        new Thread(() -> {
+            try {
+                consumer.receiveAsync().get();
+                Assert.fail("should be interrupted");
+            } catch (Exception e) {
+                Assert.assertTrue(e.getMessage().contains(errorMsg));
+                countDownLatch.countDown();
+            }
+        }).start();
+        new Thread(() -> {
+            try {
+                consumer.close();
+            } catch (PulsarClientException ignore) {
+            }
+        }).start();
+        countDownLatch.await();
+
+        // 2) Test batchReceiveAsync is interrupted
+        CountDownLatch countDownLatch2 = new CountDownLatch(1);
+        Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic).subscriptionName("my-subscriber-name")
+                .batchReceivePolicy(batchReceivePolicy).subscribe();
+        new Thread(() -> {
+            try {
+                consumer2.batchReceiveAsync().get();
+                Assert.fail("should be interrupted");
+            } catch (Exception e) {
+                Assert.assertTrue(e.getMessage().contains(errorMsg));
+                countDownLatch2.countDown();
+            }
+        }).start();
+        new Thread(() -> {
+            try {
+                consumer2.close();
+            } catch (PulsarClientException ignore) {
+            }
+        }).start();
+        countDownLatch2.await();
+        // 3) Test partitioned topic batchReceiveAsync is interrupted
+        CountDownLatch countDownLatch3 = new CountDownLatch(1);
+        admin.topics().createPartitionedTopic(partitionedTopic, 3);
+        Consumer<String> partitionedTopicConsumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(partitionedTopic).subscriptionName("my-subscriber-name-partitionedTopic")
+                .batchReceivePolicy(batchReceivePolicy).subscribe();
+        new Thread(() -> {
+            try {
+                partitionedTopicConsumer.batchReceiveAsync().get();
+                Assert.fail("should be interrupted");
+            } catch (Exception e) {
+                Assert.assertTrue(e.getMessage().contains(errorMsg));
+                countDownLatch3.countDown();
+            }
+        }).start();
+        new Thread(() -> {
+            try {
+                partitionedTopicConsumer.close();
+            } catch (PulsarClientException ignore) {
+            }
+        }).start();
+        countDownLatch3.await();
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 3d45931..5391cb6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -195,6 +195,30 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         }
     }
 
+    protected void failPendingReceives(ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives) {
+        while (!pendingReceives.isEmpty()) {
+            CompletableFuture<Message<T>> receiveFuture = pendingReceives.poll();
+            if (receiveFuture == null) {
+                break;
+            }
+            receiveFuture.completeExceptionally(
+                    new PulsarClientException.AlreadyClosedException(String.format("The consumer which subscribes the topic %s with subscription name %s " +
+                            "was already closed when cleaning and closing the consumers", topic, subscription)));
+        }
+    }
+
+    protected void failPendingBatchReceives(ConcurrentLinkedQueue<OpBatchReceive<T>> pendingBatchReceives) {
+        while (!pendingBatchReceives.isEmpty()) {
+            OpBatchReceive<T> opBatchReceive = pendingBatchReceives.poll();
+            if (opBatchReceive == null || opBatchReceive.future == null) {
+                break;
+            }
+            opBatchReceive.future.completeExceptionally(
+                    new PulsarClientException.AlreadyClosedException(String.format("The consumer which subscribes the topic %s with subscription name %s " +
+                            "was already closed when cleaning and closing the consumers", topic, subscription)));
+        }
+    }
+
     abstract protected Messages<T> internalBatchReceive() throws PulsarClientException;
 
     abstract protected CompletableFuture<Messages<T>> internalBatchReceiveAsync();
@@ -405,7 +429,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
                                                              TransactionImpl txn);
 
     protected abstract CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType ackType,
-                                                                Map<String,Long> properties, 
+                                                                Map<String,Long> properties,
                                                                 long delayTime,
                                                                 TimeUnit unit);
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 1757f1e..10320da 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -25,7 +25,6 @@ import static org.apache.pulsar.common.protocol.Commands.readChecksum;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
@@ -40,8 +39,6 @@ import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -84,7 +81,6 @@ import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.client.util.RetryMessageUtil;
-import org.apache.pulsar.common.api.proto.PulsarApi.IntRange;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
@@ -172,9 +168,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
     private volatile Producer<T> retryLetterProducer;
     private final ReadWriteLock createProducerLock = new ReentrantReadWriteLock();
-    
+
     protected volatile boolean paused;
-    
+
     protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap = new ConcurrentOpenHashMap<>();
     private int pendingChunckedMessageCount = 0;
     protected long expireTimeOfIncompleteChunkedMessageMillis = 0;
@@ -560,7 +556,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     @SuppressWarnings("unchecked")
     @Override
     protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType ackType,
-                                                       Map<String,Long> properties, 
+                                                       Map<String,Long> properties,
                                                        long delayTime,
                                                        TimeUnit unit) {
         MessageId messageId = message.getMessageId();
@@ -620,7 +616,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 if (propertiesMap.containsKey(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
                     reconsumetimes = Integer.valueOf(propertiesMap.get(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES));
                     reconsumetimes = reconsumetimes + 1;
-                   
+
                 } else {
                     propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, originTopicNameStr);
                     propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr);
@@ -628,7 +624,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
                 propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES, String.valueOf(reconsumetimes));
                 propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME, String.valueOf(unit.toMillis(delayTime)));
-                
+
                if (reconsumetimes > this.deadLetterPolicy.getMaxRedeliverCount()) {
                    processPossibleToDLQ((MessageIdImpl)messageId);
                     if (deadLetterProducer == null) {
@@ -996,18 +992,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         lock.readLock().lock();
         try {
             if (listenerExecutor != null && !listenerExecutor.isShutdown()) {
-                while (!pendingReceives.isEmpty()) {
-                    CompletableFuture<Message<T>> receiveFuture = pendingReceives.poll();
-                    if (receiveFuture != null) {
-                        receiveFuture.completeExceptionally(
-                            new PulsarClientException.AlreadyClosedException(
-                                String.format("The consumer which subscribes the topic %s with subscription name %s " +
-                                        "was already closed when cleaning and closing the consumers",
-                                    topicName.toString(), subscription)));
-                    } else {
-                        break;
-                    }
-                }
+                failPendingReceives(this.pendingReceives);
+                failPendingBatchReceives(this.pendingBatchReceives);
             }
         } finally {
             lock.readLock().unlock();
@@ -1083,7 +1069,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         // if message is not decryptable then it can't be parsed as a batch-message. so, add EncyrptionCtx to message
         // and return undecrypted payload
         if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) {
-            
+
             // right now, chunked messages are only supported by non-shared subscription
             if (isChunkedMessage) {
                 uncompressedPayload = processMessageChunk(uncompressedPayload, msgMetadata, msgId, messageId, cnx);
@@ -1152,7 +1138,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                     TimeUnit.MILLISECONDS);
             expireChunkMessageTaskScheduled = true;
         }
-        
+
         if (msgMetadata.getChunkId() == 0) {
             ByteBuf chunkedMsgBuffer = Unpooled.directBuffer(msgMetadata.getTotalChunkMsgSize(),
                     msgMetadata.getTotalChunkMsgSize());
@@ -1222,7 +1208,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         compressedPayload.release();
         return uncompressedPayload;
     }
-    
+
     protected void triggerListener(int numMessages) {
         // Trigger the notification on the message listener in a separate thread to avoid blocking the networking
         // thread while the message processing happens
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 7f44836..50e4db9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -566,15 +566,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         lock.readLock().lock();
         try {
             if (listenerExecutor != null && !listenerExecutor.isShutdown()) {
-                while (!pendingReceives.isEmpty()) {
-                    CompletableFuture<Message<T>> receiveFuture = pendingReceives.poll();
-                    if (receiveFuture != null) {
-                        receiveFuture.completeExceptionally(
-                                new PulsarClientException.AlreadyClosedException("Consumer is already closed"));
-                    } else {
-                        break;
-                    }
-                }
+                failPendingReceives(pendingReceives);
+                failPendingBatchReceives(pendingBatchReceives);
             }
         } finally {
             lock.readLock().unlock();