You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/08/18 03:04:19 UTC

[GitHub] [pulsar] 315157973 commented on a change in pull request #11691: [Issue 11689][Client] Fixed block forever bug in Consumer.batchReceive

315157973 commented on a change in pull request #11691:
URL: https://github.com/apache/pulsar/pull/11691#discussion_r690863726



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -82,6 +79,7 @@
     protected final ConsumerInterceptors<T> interceptors;
     protected final BatchReceivePolicy batchReceivePolicy;
     protected ConcurrentLinkedQueue<OpBatchReceive<T>> pendingBatchReceives;
+    protected Lock pendingBatchLock;

Review comment:
       final

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -865,32 +842,42 @@ private void pendingBatchReceiveTask(Timeout timeout) throws Exception {
             if (getState() == State.Closing || getState() == State.Closed) {
                 return;
             }
-            if (pendingBatchReceives == null) {
-                pendingBatchReceives = Queues.newConcurrentLinkedQueue();
-            }
-            OpBatchReceive<T> firstOpBatchReceive = peekNextBatchReceive();
+
             timeToWaitMs = batchReceivePolicy.getTimeoutMs();
 
-            while (firstOpBatchReceive != null) {
+            while (hasNextBatchReceive()) {
                 // If there is at least one batch receive, calculate the diff between the batch receive timeout
                 // and the elapsed time since the operation was created.
-                long diff = batchReceivePolicy.getTimeoutMs()
-                        - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - firstOpBatchReceive.createdAt);
-                if (diff <= 0) {
-                    // The diff is less than or equal to zero, meaning that the batch receive has been timed out.
-                    // complete the OpBatchReceive and continue to check the next OpBatchReceive in pendingBatchReceives.
-                    OpBatchReceive<T> op = pollNextBatchReceive();
-                    if (op != null) {
-                        completeOpBatchReceive(op);
+
+                boolean timeoutTriggered = false;
+                OpBatchReceive<T> opBatchReceive = null;
+                try {
+                    pendingBatchLock.lock();

Review comment:
       Please put the lock outside the `try`

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -204,26 +205,11 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat
         }
     }
 
-    protected CompletableFuture<Message<T>> peekPendingReceive() {
-        CompletableFuture<Message<T>> receivedFuture = null;
-        while (receivedFuture == null) {
-            receivedFuture = pendingReceives.peek();
-            if (receivedFuture == null) {
-                break;
-            }
-            // skip done futures (cancelling a future could mark it done)
-            if (receivedFuture.isDone()) {
-                CompletableFuture<Message<T>> removed = pendingReceives.poll();
-                if (removed != receivedFuture) {
-                    log.error("Bug! Removed future wasn't the expected one. expected={} removed={}", receivedFuture, removed);
-                }
-                receivedFuture = null;
-            }
-        }
-        return receivedFuture;
+    protected boolean hasNextPendingReceive() {

Review comment:
       This is thread-unsafe. If multiple threads call this method at the same time, it will return true at the same time.
   Eventually cause multiple notifications

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -790,31 +778,20 @@ protected void notifyPendingBatchReceivedCallBack() {
         }
     }
 
-    private OpBatchReceive<T> peekNextBatchReceive() {
-        OpBatchReceive<T> opBatchReceive = null;
-        while (opBatchReceive == null) {
-            opBatchReceive = pendingBatchReceives.peek();
-            // no entry available
-            if (opBatchReceive == null) {
-                return null;
-            }
-            // remove entries where future is null or has been completed (cancel / timeout)
-            if (opBatchReceive.future == null || opBatchReceive.future.isDone()) {
-                OpBatchReceive<T> removed = pendingBatchReceives.poll();
-                if (removed != opBatchReceive) {
-                    log.error("Bug: Removed entry wasn't the expected one. expected={}, removed={}", opBatchReceive, removed);
-                }
-                opBatchReceive = null;
-            }
-        }
-        return opBatchReceive;
+    private boolean hasNextBatchReceive() {
+        return !pendingBatchReceives.isEmpty();
     }
 
 
-    private OpBatchReceive<T> pollNextBatchReceive() {
+    private OpBatchReceive<T> nextBatchReceive() {
         OpBatchReceive<T> opBatchReceive = null;
         while (opBatchReceive == null) {
-            opBatchReceive = pendingBatchReceives.poll();
+            try {
+                pendingBatchLock.lock();

Review comment:
       Please put the lock outside the `try`, otherwise the unlock will be executed if the lock is abnormal




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org