You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/30 03:22:07 UTC
[pulsar] 13/18: Add log error tracking for semaphore count leak (#12410)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 008452b32db9b3c258944587b8d3ddd86612ea2a
Author: Ali Ahmed <al...@gmail.com>
AuthorDate: Wed Oct 20 13:16:24 2021 -0700
Add log error tracking for semaphore count leak (#12410)
Co-authored-by: Ali Ahmed <al...@splunk.com>
(cherry picked from commit 7c219b11966d4eb8cc20111468c3439d23f8777c)
---
.../apache/pulsar/client/impl/ProducerImpl.java | 31 +++++++++++++++++-----
1 file changed, 24 insertions(+), 7 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 31a32da..f5165d7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -151,6 +151,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
private Optional<Long> topicEpoch = Optional.empty();
private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
+ private boolean errorState;
+
@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<ProducerImpl> msgIdGeneratorUpdater = AtomicLongFieldUpdater
.newUpdater(ProducerImpl.class, "msgIdGenerator");
@@ -261,6 +263,21 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
grabCnx();
}
+ protected void semaphoreRelease(final int releaseCountRequest) {
+ if (semaphore.isPresent()) {
+ if (!errorState) {
+ final int availablePermits = semaphore.get().availablePermits();
+ if (availablePermits - releaseCountRequest < 0) {
+ log.error("Semaphore permit release count request greater then availablePermits" +
+ " : availablePermits={}, releaseCountRequest={}",
+ availablePermits, releaseCountRequest);
+ errorState = true;
+ }
+ }
+ semaphore.get().release(releaseCountRequest);
+ }
+ }
+
protected OpSendMsgQueue createPendingMessagesQueue() {
return new OpSendMsgQueue();
}
@@ -1022,9 +1039,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
}
private void releaseSemaphoreForSendOp(OpSendMsg op) {
- if (semaphore.isPresent()) {
- semaphore.get().release(isBatchMessagingEnabled() ? op.numMessagesInBatch : 1);
- }
+
+ semaphoreRelease(isBatchMessagingEnabled() ? op.numMessagesInBatch : 1);
+
client.getMemoryLimitController().releaseMemory(op.uncompressedSize);
}
@@ -1778,7 +1795,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
});
pendingMessages.clear();
- semaphore.ifPresent(s -> s.release(releaseCount.get()));
+ semaphoreRelease(releaseCount.get());
if (batchMessagingEnabled) {
failPendingBatchMessages(ex);
}
@@ -1804,7 +1821,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
}
final int numMessagesInBatch = batchMessageContainer.getNumMessagesInBatch();
batchMessageContainer.discard(ex);
- semaphore.ifPresent(s -> s.release(numMessagesInBatch));
+ semaphoreRelease(numMessagesInBatch);
}
@Override
@@ -1847,9 +1864,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
processOpSendMsg(opSendMsg);
}
} catch (PulsarClientException e) {
- semaphore.ifPresent(s -> s.release(batchMessageContainer.getNumMessagesInBatch()));
+ semaphoreRelease(batchMessageContainer.getNumMessagesInBatch());
} catch (Throwable t) {
- semaphore.ifPresent(s -> s.release(batchMessageContainer.getNumMessagesInBatch()));
+ semaphoreRelease(batchMessageContainer.getNumMessagesInBatch());
log.warn("[{}] [{}] error while create opSendMsg by batch message container", topic, producerName, t);
}
}