You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/30 03:22:07 UTC

[pulsar] 13/18: Add log error tracking for semaphore count leak (#12410)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 008452b32db9b3c258944587b8d3ddd86612ea2a
Author: Ali Ahmed <al...@gmail.com>
AuthorDate: Wed Oct 20 13:16:24 2021 -0700

    Add log error tracking for semaphore count leak (#12410)
    
    Co-authored-by: Ali Ahmed <al...@splunk.com>
    (cherry picked from commit 7c219b11966d4eb8cc20111468c3439d23f8777c)
---
 .../apache/pulsar/client/impl/ProducerImpl.java    | 31 +++++++++++++++++-----
 1 file changed, 24 insertions(+), 7 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 31a32da..f5165d7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -151,6 +151,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
     private Optional<Long> topicEpoch = Optional.empty();
     private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
 
+    private boolean errorState;
+
     @SuppressWarnings("rawtypes")
     private static final AtomicLongFieldUpdater<ProducerImpl> msgIdGeneratorUpdater = AtomicLongFieldUpdater
             .newUpdater(ProducerImpl.class, "msgIdGenerator");
@@ -261,6 +263,21 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         grabCnx();
     }
 
+    protected void semaphoreRelease(final int releaseCountRequest) {
+        if (semaphore.isPresent()) {
+            if (!errorState) {
+                final int availablePermits = semaphore.get().availablePermits();
+                if (availablePermits - releaseCountRequest < 0) {
+                    log.error("Semaphore permit release count request greater then availablePermits" +
+                                    " : availablePermits={}, releaseCountRequest={}",
+                            availablePermits, releaseCountRequest);
+                    errorState = true;
+                }
+            }
+            semaphore.get().release(releaseCountRequest);
+        }
+    }
+
     protected OpSendMsgQueue createPendingMessagesQueue() {
         return new OpSendMsgQueue();
     }
@@ -1022,9 +1039,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
     }
 
     private void releaseSemaphoreForSendOp(OpSendMsg op) {
-        if (semaphore.isPresent()) {
-            semaphore.get().release(isBatchMessagingEnabled() ? op.numMessagesInBatch : 1);
-        }
+
+        semaphoreRelease(isBatchMessagingEnabled() ? op.numMessagesInBatch : 1);
+
         client.getMemoryLimitController().releaseMemory(op.uncompressedSize);
     }
 
@@ -1778,7 +1795,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             });
 
             pendingMessages.clear();
-            semaphore.ifPresent(s -> s.release(releaseCount.get()));
+            semaphoreRelease(releaseCount.get());
             if (batchMessagingEnabled) {
                 failPendingBatchMessages(ex);
             }
@@ -1804,7 +1821,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         }
         final int numMessagesInBatch = batchMessageContainer.getNumMessagesInBatch();
         batchMessageContainer.discard(ex);
-        semaphore.ifPresent(s -> s.release(numMessagesInBatch));
+        semaphoreRelease(numMessagesInBatch);
     }
 
     @Override
@@ -1847,9 +1864,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                     processOpSendMsg(opSendMsg);
                 }
             } catch (PulsarClientException e) {
-                semaphore.ifPresent(s -> s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphoreRelease(batchMessageContainer.getNumMessagesInBatch());
             } catch (Throwable t) {
-                semaphore.ifPresent(s -> s.release(batchMessageContainer.getNumMessagesInBatch()));
+                semaphoreRelease(batchMessageContainer.getNumMessagesInBatch());
                 log.warn("[{}] [{}] error while create opSendMsg by batch message container", topic, producerName, t);
             }
         }