You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/10 11:29:40 UTC

[pulsar] branch branch-2.11 updated (5c9947744b5 -> 80d2cc9f3f8)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a change to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 5c9947744b5 [fix][broker] Fix memory leak if entry exists in cache (#16996)
     new 17894f1625a [fix][rest] Fix swagger shows lookup API path error. (#17013)
     new dad4db16b4c [fix][broker]remove exception log when access status.html (#17025)
     new 436ee4bbb0e [fix][broker]Prevent `StackOverFlowException` in SHARED subscription(#16968)
     new 80d2cc9f3f8 [cleanup][broker] Follow up on #16968 to restore some behavior in PersistentDispatcherMultipleConsumers class (#17018)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../pulsar/common/configuration/VipStatus.java     |  4 ++-
 pulsar-broker/pom.xml                              |  2 +-
 .../PersistentDispatcherMultipleConsumers.java     | 34 +++++++++++++---------
 3 files changed, 25 insertions(+), 15 deletions(-)


[pulsar] 03/04: [fix][broker]Prevent `StackOverFlowException` in SHARED subscription(#16968)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 436ee4bbb0e6a09879a3e09a7018dc05a75c3826
Author: Qiang Zhao <ma...@gmail.com>
AuthorDate: Tue Aug 9 01:30:46 2022 +0800

    [fix][broker]Prevent `StackOverFlowException` in SHARED subscription(#16968)
---
 .../PersistentDispatcherMultipleConsumers.java     | 37 +++++++++++++---------
 1 file changed, 22 insertions(+), 15 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index a02b76c9aed..4887f6b0541 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -93,7 +93,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
     protected volatile PositionImpl minReplayedPosition = null;
     protected boolean shouldRewindBeforeReadingOrReplaying = false;
     protected final String name;
-    protected boolean sendInProgress;
+    protected volatile boolean sendInProgress;
     protected static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
             TOTAL_AVAILABLE_PERMITS_UPDATER =
             AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
@@ -244,6 +244,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         readMoreEntries();
     }
 
+    /**
+     * We should not call readMoreEntries() recursively in the same thread as there is a risk of StackOverflowError.
+     *
+     */
+    public void readMoreEntiresAsync() {
+        topic.getBrokerService().executor().execute(() -> readMoreEntries());
+    }
+
     public synchronized void readMoreEntries() {
         if (sendInProgress) {
             // we cannot read more entries while sending the previous batch
@@ -287,9 +295,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                 // next entries as readCompletedEntries-callback was never called
                 if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) {
                     havePendingReplayRead = false;
-                    // We should not call readMoreEntries() recursively in the same thread
-                    // as there is a risk of StackOverflowError
-                    topic.getBrokerService().executor().execute(() -> readMoreEntries());
+                    readMoreEntiresAsync();
                 }
             } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
                 if (log.isDebugEnabled()) {
@@ -544,24 +550,25 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
             // setting sendInProgress here, because sendMessagesToConsumers will be executed
             // in a separate thread, and we want to prevent more reads
-            sendInProgress = true;
-            dispatchMessagesThread.execute(safeRun(() -> sendMessagesToConsumers(readType, entries)));
+            dispatchMessagesThread.execute(safeRun(() -> {
+                if (sendMessagesToConsumers(readType, entries)) {
+                    readMoreEntries();
+                }
+            }));
         } else {
-            sendMessagesToConsumers(readType, entries);
+            if (sendMessagesToConsumers(readType, entries)) {
+                readMoreEntiresAsync();
+            }
         }
     }
 
-    protected final synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
+    protected final synchronized boolean sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
         sendInProgress = true;
-        boolean readMoreEntries;
         try {
-            readMoreEntries = trySendMessagesToConsumers(readType, entries);
+            return trySendMessagesToConsumers(readType, entries);
         } finally {
             sendInProgress = false;
         }
-        if (readMoreEntries) {
-            readMoreEntries();
-        }
     }
 
     /**
@@ -916,7 +923,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         if (maxUnackedMessages <= 0 && blockedDispatcherOnUnackedMsgs == TRUE
                 && BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
             log.info("[{}] Dispatcher is unblocked, since maxUnackedMessagesPerSubscription=0", name);
-            topic.getBrokerService().executor().execute(() -> readMoreEntries());
+            readMoreEntiresAsync();
         }
 
         int unAckedMessages = TOTAL_UNACKED_MESSAGES_UPDATER.addAndGet(this, numberOfMessages);
@@ -939,7 +946,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             // unblock dispatcher if it acks back enough messages
             if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
                 log.info("[{}] Dispatcher is unblocked", name);
-                topic.getBrokerService().executor().execute(() -> readMoreEntries());
+                readMoreEntiresAsync();
             }
         }
         // increment broker-level count


[pulsar] 04/04: [cleanup][broker] Follow up on #16968 to restore some behavior in PersistentDispatcherMultipleConsumers class (#17018)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 80d2cc9f3f81430ea0804e4a3d5d6e346b5bb51c
Author: Qiang Zhao <ma...@gmail.com>
AuthorDate: Wed Aug 10 19:11:15 2022 +0800

    [cleanup][broker] Follow up on #16968 to restore some behavior in PersistentDispatcherMultipleConsumers class (#17018)
---
 .../persistent/PersistentDispatcherMultipleConsumers.java | 15 ++++++++-------
 1 file changed, 8 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 4887f6b0541..ea3b9bbe540 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -93,7 +93,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
     protected volatile PositionImpl minReplayedPosition = null;
     protected boolean shouldRewindBeforeReadingOrReplaying = false;
     protected final String name;
-    protected volatile boolean sendInProgress;
+    protected boolean sendInProgress;
     protected static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
             TOTAL_AVAILABLE_PERMITS_UPDATER =
             AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
@@ -248,8 +248,8 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
      * We should not call readMoreEntries() recursively in the same thread as there is a risk of StackOverflowError.
      *
      */
-    public void readMoreEntiresAsync() {
-        topic.getBrokerService().executor().execute(() -> readMoreEntries());
+    public void readMoreEntriesAsync() {
+        topic.getBrokerService().executor().execute(this::readMoreEntries);
     }
 
     public synchronized void readMoreEntries() {
@@ -295,7 +295,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                 // next entries as readCompletedEntries-callback was never called
                 if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) {
                     havePendingReplayRead = false;
-                    readMoreEntiresAsync();
+                    readMoreEntriesAsync();
                 }
             } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
                 if (log.isDebugEnabled()) {
@@ -550,6 +550,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
             // setting sendInProgress here, because sendMessagesToConsumers will be executed
             // in a separate thread, and we want to prevent more reads
+            sendInProgress = true;
             dispatchMessagesThread.execute(safeRun(() -> {
                 if (sendMessagesToConsumers(readType, entries)) {
                     readMoreEntries();
@@ -557,7 +558,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             }));
         } else {
             if (sendMessagesToConsumers(readType, entries)) {
-                readMoreEntiresAsync();
+                readMoreEntriesAsync();
             }
         }
     }
@@ -923,7 +924,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         if (maxUnackedMessages <= 0 && blockedDispatcherOnUnackedMsgs == TRUE
                 && BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
             log.info("[{}] Dispatcher is unblocked, since maxUnackedMessagesPerSubscription=0", name);
-            readMoreEntiresAsync();
+            readMoreEntriesAsync();
         }
 
         int unAckedMessages = TOTAL_UNACKED_MESSAGES_UPDATER.addAndGet(this, numberOfMessages);
@@ -946,7 +947,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             // unblock dispatcher if it acks back enough messages
             if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
                 log.info("[{}] Dispatcher is unblocked", name);
-                readMoreEntiresAsync();
+                readMoreEntriesAsync();
             }
         }
         // increment broker-level count


[pulsar] 01/04: [fix][rest] Fix swagger shows lookup API path error. (#17013)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 17894f1625ab4ec3588902125d13cb3c81a0d94b
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Wed Aug 10 13:45:18 2022 +0800

    [fix][rest] Fix swagger shows lookup API path error. (#17013)
---
 pulsar-broker/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index afe66cc658c..21249c9994b 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -690,7 +690,7 @@
                   <springmvc>false</springmvc>
                   <locations>org.apache.pulsar.broker.lookup.v2</locations>
                   <schemes>http,https</schemes>
-                  <basePath>/lookup/v2</basePath>
+                  <basePath>/lookup</basePath>
                   <info>
                     <title>Pulsar Lookup REST API</title>
                     <version>v2</version>


[pulsar] 02/04: [fix][broker]remove exception log when access status.html (#17025)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit dad4db16b4c4b2cb57d1a42c80a151c938b2629e
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Wed Aug 10 13:46:12 2022 +0800

    [fix][broker]remove exception log when access status.html (#17025)
---
 .../main/java/org/apache/pulsar/common/configuration/VipStatus.java   | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java
index a0bd7a35bce..dcde2d02dd5 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java
@@ -26,10 +26,12 @@ import javax.ws.rs.Path;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.Response.Status;
+import lombok.extern.slf4j.Slf4j;
 
 /**
  * Web resource used by the VIP service to check to availability of the service instance.
  */
+@Slf4j
 @Path("/status.html")
 public class VipStatus {
 
@@ -40,7 +42,6 @@ public class VipStatus {
     protected ServletContext servletContext;
 
     @GET
-    @Context
     public String checkStatus() {
         String statusFilePath = (String) servletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH);
         @SuppressWarnings("unchecked")
@@ -54,6 +55,7 @@ public class VipStatus {
                 return "OK";
             }
         }
+        log.warn("Failed to access \"status.html\". The service is not ready");
         throw new WebApplicationException(Status.NOT_FOUND);
     }