You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/08/21 04:05:49 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #11691: [Issue 11689][Client] Fixed block forever bug in Consumer.batchReceive

codelipenghui commented on a change in pull request #11691:
URL: https://github.com/apache/pulsar/pull/11691#discussion_r693297930



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -992,9 +1002,9 @@ protected void clearIncomingMessages() {
     private ExecutorService getExecutor(Message<T> msg) {
         ConsumerImpl receivedConsumer = (msg instanceof TopicMessageImpl) ? ((TopicMessageImpl) msg).receivedByconsumer
                 : null;
-        ExecutorService executor = receivedConsumer != null && receivedConsumer.pinnedExecutor != null
-                ? receivedConsumer.pinnedExecutor
-                : pinnedExecutor;
+        ExecutorService executor = receivedConsumer != null && receivedConsumer.externalPinnedExecutor != null
+                ? receivedConsumer.externalPinnedExecutor
+                : externalPinnedExecutor;

Review comment:
       Looks like we need to have 2 methods for getting an internal executor and an external executor. We have 2 places use this method, one is `completePendingReceive()` method, it should use the internal executor and another one is `triggerListener()`, it should use the external executor.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1220,19 +1208,19 @@ void notifyPendingReceivedCallback(final Message<T> message, Exception exception
         }
 
         // fetch receivedCallback from queue
-        final CompletableFuture<Message<T>> receivedFuture = pollPendingReceive();
+        final CompletableFuture<Message<T>> receivedFuture = nextPendingReceive();
         if (receivedFuture == null) {
             return;
         }
 
         if (exception != null) {
-            pinnedExecutor.execute(() -> receivedFuture.completeExceptionally(exception));
+            externalPinnedExecutor.execute(() -> receivedFuture.completeExceptionally(exception));

Review comment:
       It should be internalPinnedExecutor?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -2098,7 +2086,7 @@ private void internalGetLastMessageIdAsync(final Backoff backoff,
                 return;
             }
 
-            pinnedExecutor.schedule(() -> {
+            externalPinnedExecutor.schedule(() -> {

Review comment:
       It should be the internalPinnedExecutor.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1132,7 +1120,7 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m
 
         // Lazy task scheduling to expire incomplete chunk message
         if (!expireChunkMessageTaskScheduled && expireTimeOfIncompleteChunkedMessageMillis > 0) {
-            pinnedExecutor.scheduleAtFixedRate(() -> {
+            externalPinnedExecutor.scheduleAtFixedRate(() -> {

Review comment:
       Looks this is an issue not related to this PR want to fix, I think here should be internalPinnedExecutor because here will not call user's code, just process internally of the consumer.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -865,32 +863,44 @@ private void pendingBatchReceiveTask(Timeout timeout) throws Exception {
             if (getState() == State.Closing || getState() == State.Closed) {
                 return;
             }
-            if (pendingBatchReceives == null) {
-                pendingBatchReceives = Queues.newConcurrentLinkedQueue();
-            }
-            OpBatchReceive<T> firstOpBatchReceive = peekNextBatchReceive();
+
             timeToWaitMs = batchReceivePolicy.getTimeoutMs();
+            OpBatchReceive<T> opBatchReceive = pendingBatchReceives.peek();
 
-            while (firstOpBatchReceive != null) {
+            while (opBatchReceive != null) {
                 // If there is at least one batch receive, calculate the diff between the batch receive timeout
                 // and the elapsed time since the operation was created.
                 long diff = batchReceivePolicy.getTimeoutMs()
-                        - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - firstOpBatchReceive.createdAt);
+                        - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - opBatchReceive.createdAt);
+
                 if (diff <= 0) {
-                    // The diff is less than or equal to zero, meaning that the batch receive has been timed out.
-                    // complete the OpBatchReceive and continue to check the next OpBatchReceive in pendingBatchReceives.
-                    OpBatchReceive<T> op = pollNextBatchReceive();
-                    if (op != null) {
-                        completeOpBatchReceive(op);
+                    completeOpBatchReceive(opBatchReceive);
+
+                    // remove the peeked item from the queue
+                    OpBatchReceive<T> removed = pendingBatchReceives.poll();
+
+                    if (removed != opBatchReceive) {
+                        // regression check, if this were to happen due to incorrect code changes in the future,
+                        // (allowing multi-threaded calls to poll()), then ensure that the polled item is completed
+                        // to avoid blocking user code
+
+                        log.error("Race condition in consumer {} (should not cause data loss). "
+                                + " Concurrent operations on pendingBatchReceives is not safe", this.consumerName);
+                        if (!removed.future.isDone()) {

Review comment:
       The `removed` can be null here?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -244,29 +230,52 @@ protected void completePendingReceive(CompletableFuture<Message<T>> receivedFutu
         });
     }
 
-    protected void failPendingReceives(ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives) {
+    protected CompletableFuture<Void> failPendingReceive() {
+        if (internalPinnedExecutor.isShutdown()) {
+            // we need to fail any pending receives no matter what,
+            // to avoid blocking user code
+            failPendingReceives();
+            failPendingBatchReceives();
+            return CompletableFuture.completedFuture(null);
+        } else {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            internalPinnedExecutor.execute(() -> {
+                try {
+                    failPendingReceives();
+                    failPendingBatchReceives();
+                } finally {
+                    future.complete(null);
+                }
+            });
+            return future;
+        }
+    }
+
+    protected void failPendingReceives() {
         while (!pendingReceives.isEmpty()) {
             CompletableFuture<Message<T>> receiveFuture = pendingReceives.poll();
             if (receiveFuture == null) {
                 break;
             }
             if (!receiveFuture.isDone()) {
                 receiveFuture.completeExceptionally(
-                        new PulsarClientException.AlreadyClosedException(String.format("The consumer which subscribes the topic %s with subscription name %s " +
+                        new PulsarClientException.AlreadyClosedException(
+                                String.format("The consumer which subscribes the topic %s with subscription name %s " +
                                 "was already closed when cleaning and closing the consumers", topic, subscription)));
             }
         }
     }
 
-    protected void failPendingBatchReceives(ConcurrentLinkedQueue<OpBatchReceive<T>> pendingBatchReceives) {
-        while (!pendingBatchReceives.isEmpty()) {
-            OpBatchReceive<T> opBatchReceive = pendingBatchReceives.poll();
+    protected void failPendingBatchReceives() {

Review comment:
       ```suggestion
       private void failPendingBatchReceives() {
   ```

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1220,19 +1208,19 @@ void notifyPendingReceivedCallback(final Message<T> message, Exception exception
         }
 
         // fetch receivedCallback from queue
-        final CompletableFuture<Message<T>> receivedFuture = pollPendingReceive();
+        final CompletableFuture<Message<T>> receivedFuture = nextPendingReceive();
         if (receivedFuture == null) {
             return;
         }
 
         if (exception != null) {
-            pinnedExecutor.execute(() -> receivedFuture.completeExceptionally(exception));
+            externalPinnedExecutor.execute(() -> receivedFuture.completeExceptionally(exception));
             return;
         }
 
         if (message == null) {
             IllegalStateException e = new IllegalStateException("received message can't be null");
-            pinnedExecutor.execute(() -> receivedFuture.completeExceptionally(e));
+            externalPinnedExecutor.execute(() -> receivedFuture.completeExceptionally(e));

Review comment:
       It should be internalPinnedExecutor?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -244,29 +230,52 @@ protected void completePendingReceive(CompletableFuture<Message<T>> receivedFutu
         });
     }
 
-    protected void failPendingReceives(ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives) {
+    protected CompletableFuture<Void> failPendingReceive() {
+        if (internalPinnedExecutor.isShutdown()) {
+            // we need to fail any pending receives no matter what,
+            // to avoid blocking user code
+            failPendingReceives();
+            failPendingBatchReceives();
+            return CompletableFuture.completedFuture(null);
+        } else {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            internalPinnedExecutor.execute(() -> {
+                try {
+                    failPendingReceives();
+                    failPendingBatchReceives();
+                } finally {
+                    future.complete(null);
+                }
+            });
+            return future;
+        }
+    }
+
+    protected void failPendingReceives() {

Review comment:
       ```suggestion
       private void failPendingReceives() {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org