You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by wa...@apache.org on 2022/03/31 20:46:04 UTC
[pulsar] branch branch-2.8 updated: Revert "Fix Regression in Consumer Performance"
This is an automated email from the ASF dual-hosted git repository.
wave pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 328f30c Revert "Fix Regression in Consumer Performance"
328f30c is described below
commit 328f30c1fe0396a5e39f69b15133f979366df62a
Author: Dave Fisher <da...@datastax.com>
AuthorDate: Thu Mar 31 13:40:24 2022 -0700
Revert "Fix Regression in Consumer Performance"
This reverts commit 2a786c10c01b0686e7bce0eba07035c9989a0bad.
---
.../apache/pulsar/client/impl/ConsumerBase.java | 48 +++++++++++-----------
.../apache/pulsar/client/impl/ConsumerImpl.java | 3 +-
2 files changed, 26 insertions(+), 25 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index b0b4879..dd21666 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -908,33 +908,35 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
protected void triggerListener() {
// Use internalPinnedExecutor to maintain message ordering
- try {
- // Listener should only have one pending/running executable to process a message
- // See https://github.com/apache/pulsar/issues/11008 for context.
- if (!isListenerHandlingMessage) {
- final Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS);
- if (msg != null) {
- isListenerHandlingMessage = true;
- // Trigger the notification on the message listener in a separate thread to avoid blocking the
- // internal pinned executor thread while the message processing happens
- if (SubscriptionType.Key_Shared == conf.getSubscriptionType()) {
- executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
- callMessageListener(msg));
- } else {
- getExternalExecutor(msg).execute(() -> {
- callMessageListener(msg);
- });
+ internalPinnedExecutor.execute(() -> {
+ try {
+ // Listener should only have one pending/running executable to process a message
+ // See https://github.com/apache/pulsar/issues/11008 for context.
+ if (!isListenerHandlingMessage) {
+ final Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS);
+ if (msg != null) {
+ isListenerHandlingMessage = true;
+ // Trigger the notification on the message listener in a separate thread to avoid blocking the
+ // internal pinned executor thread while the message processing happens
+ if (SubscriptionType.Key_Shared == conf.getSubscriptionType()) {
+ executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
+ callMessageListener(msg));
+ } else {
+ getExternalExecutor(msg).execute(() -> {
+ callMessageListener(msg);
+ });
+ }
}
}
+ } catch (PulsarClientException e) {
+ log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e);
+ return;
}
- } catch (PulsarClientException e) {
- log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e);
- return;
- }
- if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription);
- }
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription);
+ }
+ });
}
protected void callMessageListener(Message<T> msg) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index cd111f0..ebf3583 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1085,8 +1085,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
uncompressedPayload.release();
}
- internalPinnedExecutor.execute(()
- -> tryTriggerListener());
+ tryTriggerListener();
}