You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/07/27 08:08:42 UTC

[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16812: Issue 16802: fix Repeated messages of shared dispatcher

BewareMyPower commented on code in PR #16812:
URL: https://github.com/apache/pulsar/pull/16812#discussion_r930749709


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -528,135 +533,143 @@ public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
             log.debug("[{}] Distributing {} messages to {} consumers", name, entries.size(), consumerList.size());
         }
 
+        // dispatch messages to a separate thread, but still in order for this subscription
+        // sendMessagesToConsumers is responsible for running broker-side filters
+        // that may be quite expensive
         if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
-            // dispatch messages to a separate thread, but still in order for this subscription
-            // sendMessagesToConsumers is responsible for running broker-side filters
-            // that may be quite expensive
+            // setting sendInProgress here, because sendMessagesToConsumers will be executed
+            // in a separate thread, and we want to prevent more reads
+            sendInProgress = true;
             dispatchMessagesThread.execute(safeRun(() -> sendMessagesToConsumers(readType, entries)));
         } else {
             sendMessagesToConsumers(readType, entries);
         }
     }
 
     protected synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
-
-        if (needTrimAckedMessages()) {
-            cursor.trimDeletedEntries(entries);
-        }
-
-        int entriesToDispatch = entries.size();
-        // Trigger read more messages
-        if (entriesToDispatch == 0) {
-            readMoreEntries();
-            return;
-        }
-        final MessageMetadata[] metadataArray = entries.stream()
-                .map(entry -> Commands.peekAndCopyMessageMetadata(entry.getDataBuffer(), subscription.toString(), -1))
-                .toArray(MessageMetadata[]::new);
-        int remainingMessages = Stream.of(metadataArray).filter(Objects::nonNull)
-                .map(MessageMetadata::getNumMessagesInBatch)
-                .reduce(0, Integer::sum);
-
-        int start = 0;
-        long totalMessagesSent = 0;
-        long totalBytesSent = 0;
-        long totalEntries = 0;
-        int avgBatchSizePerMsg = remainingMessages > 0 ? Math.max(remainingMessages / entries.size(), 1) : 1;
-
-        int firstAvailableConsumerPermits, currentTotalAvailablePermits;
-        boolean dispatchMessage;
-        while (entriesToDispatch > 0) {
-            firstAvailableConsumerPermits = getFirstAvailableConsumerPermits();
-            currentTotalAvailablePermits = Math.max(totalAvailablePermits, firstAvailableConsumerPermits);
-            dispatchMessage = currentTotalAvailablePermits > 0 && firstAvailableConsumerPermits > 0;
-            if (!dispatchMessage) {
-                break;
+        sendInProgress = true;
+        try {
+            if (needTrimAckedMessages()) {

Review Comment:
   Could you move this `try` block to another method to avoid so many code changes? For example,
   
   ```java
       private void trySendMessagesToConsumers(ReadType readType, List<Entry> entries) {
           /* ... */
       }
   
       protected synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
           sendInProgress = true;
           try {
               trySendMessagesToConsumers(readType, entries);
           } finally {
               sendInProgress = false;
           }
           readMoreEntries();
       }
   ```
   
   In addition, the original following code can be removed because in the `finally` block `sendInProgress` would be false, then `readMoreEntries` will be called after that.
   
   ```java
               if (entriesToDispatch == 0) {
                   sendInProgress = false;
                   readMoreEntries();
                   return;
               }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org