You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/06/04 05:05:59 UTC

[GitHub] [pulsar] codelipenghui opened a new pull request #10824: Fix consumer stuck issue due to reuse entry wrapper.

codelipenghui opened a new pull request #10824:
URL: https://github.com/apache/pulsar/pull/10824


   Fixes #10813
   The issue is introduced by #7266, it only affects the master branch.
   
   ### Motivation
   
   1. Add wrapperOffset to make sure get the correct batch size from the metadata
   2. Fix the issue that using (batch count / avgBatchSizePerMsg) to calculate messages for the consumer
      it should be (messages / avgBatchSizePerMsg)
   
   ### Verifying this change
   
        * The test case is to simulate dispatch batches with different batch size to the consumer.
        * 1. The consumer has 1000 available permits
        * 2. The producer send batches with different batch size
        *
        * According the batch average size dispatching, the broker will dispatch all the batches to the consumer
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (no)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
     - The rest endpoints: (no)
     - The admin cli options: (no)
     - Anything that affects deployment: (no)
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #10824: Fix consumer stuck issue due to reuse entry wrapper.

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #10824:
URL: https://github.com/apache/pulsar/pull/10824#discussion_r645310069



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
##########
@@ -532,13 +532,14 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
 
                 EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
                 EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
-                filterEntriesForConsumer(Optional.ofNullable(entryWrappers), entriesForThisConsumer, batchSizes,
-                        sendMessageInfo, batchIndexesAcks, cursor, readType == ReadType.Replay);
+                filterEntriesForConsumer(Optional.ofNullable(entryWrappers), start, entriesForThisConsumer,
+                        batchSizes, sendMessageInfo, batchIndexesAcks, cursor, readType == ReadType.Replay);
 
                 c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
                         sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);
 
                 int msgSent = sendMessageInfo.getTotalMessages();
+                totalMessages -= msgSent;

Review comment:
       I'm trying to understand the logic beyond the changes made in this PR. I have a question about the calculations.
   
   A few lines below, there is this line of code on line 545:
   ```
                   TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this,
                           -(msgSent - batchIndexesAcks.getTotalAckedIndexCount()));
   ```
   
   Just wondering if  `batchIndexesAcks.getTotalAckedIndexCount()` should impact the calculation `totalMessages -= msgSent`. Why does it impact `totalAvailablePermits` field (updated with `TOTAL_AVAILABLE_PERMITS_UPDATER` )?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #10824: Fix consumer stuck issue due to reuse entry wrapper.

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #10824:
URL: https://github.com/apache/pulsar/pull/10824#discussion_r645309756



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
##########
@@ -532,13 +532,14 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
 
                 EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
                 EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
-                filterEntriesForConsumer(Optional.ofNullable(entryWrappers), entriesForThisConsumer, batchSizes,
-                        sendMessageInfo, batchIndexesAcks, cursor, readType == ReadType.Replay);
+                filterEntriesForConsumer(Optional.ofNullable(entryWrappers), start, entriesForThisConsumer,
+                        batchSizes, sendMessageInfo, batchIndexesAcks, cursor, readType == ReadType.Replay);
 
                 c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
                         sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);
 
                 int msgSent = sendMessageInfo.getTotalMessages();
+                totalMessages -= msgSent;

Review comment:
       The naming of `totalMessages` seems a bit confusing. Isn't it more about `remainingMessages`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #10824: Fix consumer stuck issue due to reuse entry wrapper.

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #10824:
URL: https://github.com/apache/pulsar/pull/10824#issuecomment-854521907


   The fix makes sense to me.
   but unfortunately but test case is still failing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #10824: Fix consumer stuck issue due to reuse entry wrapper.

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #10824:
URL: https://github.com/apache/pulsar/pull/10824#discussion_r645298702



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
##########
@@ -114,9 +114,11 @@ public void filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, List
                 continue;
             }
             ByteBuf metadataAndPayload = entry.getDataBuffer();
-            MessageMetadata msgMetadata = entryWrapper.isPresent() && entryWrapper.get()[i] != null
-                    ? entryWrapper.get()[i].getMetadata()
+            int entryWrapperIndex = i + entryWrapperOffset;
+            MessageMetadata msgMetadata = entryWrapper.isPresent() && entryWrapper.get()[entryWrapperIndex] != null
+                    ? entryWrapper.get()[entryWrapperIndex].getMetadata()
                     : null;
+            System.out.println(msgMetadata.getNumMessagesInBatch());

Review comment:
       ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #10824: Fix consumer stuck issue due to reuse entry wrapper.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #10824:
URL: https://github.com/apache/pulsar/pull/10824#discussion_r645459380



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/DispatchAccordingPermitsTests.java
##########
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Topics;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.TimeUnit;
+
+@Test(groups = "broker-impl")
+public class DispatchAccordingPermitsTests extends ProducerConsumerBase {
+
+    @Override
+    @BeforeMethod
+    public void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @Override
+    @AfterMethod(alwaysRun = true)
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    /**
+     * The test case is to simulate dispatch batches with different batch size to the consumer.
+     * 1. The consumer has 1000 available permits
+     * 2. The producer send batches with different batch size
+     *
+     * According the batch average size dispatching, the broker will dispatch all the batches to the consumer
+     */
+    @Test
+    public void testFlowPermitsWithMultiBatchesDispatch() throws PulsarAdminException, PulsarClientException {

Review comment:
       @eolivelli I have merged the PR, will try to investigate the problem with the partitioned topic. I have test 100 partitions on the standalone, seem not able to reproduce the issue. Will try to deploy to a real cluster for testing. Will try to write a UT for partitioned case if it can be reproduced




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #10824: Fix consumer stuck issue due to reuse entry wrapper.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #10824:
URL: https://github.com/apache/pulsar/pull/10824#discussion_r645358257



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
##########
@@ -532,13 +532,14 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
 
                 EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
                 EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
-                filterEntriesForConsumer(Optional.ofNullable(entryWrappers), entriesForThisConsumer, batchSizes,
-                        sendMessageInfo, batchIndexesAcks, cursor, readType == ReadType.Replay);
+                filterEntriesForConsumer(Optional.ofNullable(entryWrappers), start, entriesForThisConsumer,
+                        batchSizes, sendMessageInfo, batchIndexesAcks, cursor, readType == ReadType.Replay);
 
                 c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
                         sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);
 
                 int msgSent = sendMessageInfo.getTotalMessages();
+                totalMessages -= msgSent;

Review comment:
       change to remainingMessages




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #10824: Fix consumer stuck issue due to reuse entry wrapper.

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #10824:
URL: https://github.com/apache/pulsar/pull/10824#discussion_r645298702



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
##########
@@ -114,9 +114,11 @@ public void filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, List
                 continue;
             }
             ByteBuf metadataAndPayload = entry.getDataBuffer();
-            MessageMetadata msgMetadata = entryWrapper.isPresent() && entryWrapper.get()[i] != null
-                    ? entryWrapper.get()[i].getMetadata()
+            int entryWrapperIndex = i + entryWrapperOffset;
+            MessageMetadata msgMetadata = entryWrapper.isPresent() && entryWrapper.get()[entryWrapperIndex] != null
+                    ? entryWrapper.get()[entryWrapperIndex].getMetadata()
                     : null;
+            System.out.println(msgMetadata.getNumMessagesInBatch());

Review comment:
       ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on pull request #10824: Fix consumer stuck issue due to reuse entry wrapper.

Posted by GitBox <gi...@apache.org>.
lhotari commented on pull request #10824:
URL: https://github.com/apache/pulsar/pull/10824#issuecomment-854408822


   I have tested the fix. The situation looks better, but all messages don't get delivered from the backlog to the consumer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10824: Fix consumer stuck issue due to reuse entry wrapper.

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10824:
URL: https://github.com/apache/pulsar/pull/10824#discussion_r645425031



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/DispatchAccordingPermitsTests.java
##########
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Topics;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.TimeUnit;
+
+@Test(groups = "broker-impl")
+public class DispatchAccordingPermitsTests extends ProducerConsumerBase {
+
+    @Override
+    @BeforeMethod
+    public void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @Override
+    @AfterMethod(alwaysRun = true)
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    /**
+     * The test case is to simulate dispatch batches with different batch size to the consumer.
+     * 1. The consumer has 1000 available permits
+     * 2. The producer send batches with different batch size
+     *
+     * According the batch average size dispatching, the broker will dispatch all the batches to the consumer
+     */
+    @Test
+    public void testFlowPermitsWithMultiBatchesDispatch() throws PulsarAdminException, PulsarClientException {

Review comment:
       the main difference between my test and this test is that here we do not have a partitioned topic.
   
   a good follow up work is to add a test for partitioned topics 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #10824: Fix consumer stuck issue due to reuse entry wrapper.

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #10824:
URL: https://github.com/apache/pulsar/pull/10824#issuecomment-854524938


   let's merge the patch as soon as CI is green


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #10824: Fix consumer stuck issue due to reuse entry wrapper.

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #10824:
URL: https://github.com/apache/pulsar/pull/10824#issuecomment-854406842


   The change makes sense to me.
   Let me test it locally and see if my case is solved.
   
   Thank you very much !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #10824: Fix consumer stuck issue due to reuse entry wrapper.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #10824:
URL: https://github.com/apache/pulsar/pull/10824#discussion_r645354985



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
##########
@@ -532,13 +532,14 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
 
                 EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
                 EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
-                filterEntriesForConsumer(Optional.ofNullable(entryWrappers), entriesForThisConsumer, batchSizes,
-                        sendMessageInfo, batchIndexesAcks, cursor, readType == ReadType.Replay);
+                filterEntriesForConsumer(Optional.ofNullable(entryWrappers), start, entriesForThisConsumer,
+                        batchSizes, sendMessageInfo, batchIndexesAcks, cursor, readType == ReadType.Replay);
 
                 c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
                         sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);
 
                 int msgSent = sendMessageInfo.getTotalMessages();
+                totalMessages -= msgSent;

Review comment:
       Because the consume will ignore the acked index.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #10824: Fix consumer stuck issue due to reuse entry wrapper.

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #10824:
URL: https://github.com/apache/pulsar/pull/10824#discussion_r645299661



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
##########
@@ -114,9 +114,11 @@ public void filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, List
                 continue;
             }
             ByteBuf metadataAndPayload = entry.getDataBuffer();
-            MessageMetadata msgMetadata = entryWrapper.isPresent() && entryWrapper.get()[i] != null
-                    ? entryWrapper.get()[i].getMetadata()
+            int entryWrapperIndex = i + entryWrapperOffset;
+            MessageMetadata msgMetadata = entryWrapper.isPresent() && entryWrapper.get()[entryWrapperIndex] != null
+                    ? entryWrapper.get()[entryWrapperIndex].getMetadata()
                     : null;
+            System.out.println(msgMetadata.getNumMessagesInBatch());

Review comment:
       Forgotten System.out debug logging?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #10824: Fix consumer stuck issue due to reuse entry wrapper.

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #10824:
URL: https://github.com/apache/pulsar/pull/10824#issuecomment-854606784


   @codelipenghui I would like to add that with this fix my test arrived to 99.85% !
   it is a good improvement BTW
   
   thanks !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #10824: Fix consumer stuck issue due to reuse entry wrapper.

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #10824:
URL: https://github.com/apache/pulsar/pull/10824


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on pull request #10824: Fix consumer stuck issue due to reuse entry wrapper.

Posted by GitBox <gi...@apache.org>.
lhotari commented on pull request #10824:
URL: https://github.com/apache/pulsar/pull/10824#issuecomment-854408822


   I have tested the fix. The situation looks better, but all messages don't get delivered from the backlog to the consumer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #10824: Fix consumer stuck issue due to reuse entry wrapper.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #10824:
URL: https://github.com/apache/pulsar/pull/10824#discussion_r645354985



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
##########
@@ -532,13 +532,14 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
 
                 EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
                 EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
-                filterEntriesForConsumer(Optional.ofNullable(entryWrappers), entriesForThisConsumer, batchSizes,
-                        sendMessageInfo, batchIndexesAcks, cursor, readType == ReadType.Replay);
+                filterEntriesForConsumer(Optional.ofNullable(entryWrappers), start, entriesForThisConsumer,
+                        batchSizes, sendMessageInfo, batchIndexesAcks, cursor, readType == ReadType.Replay);
 
                 c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
                         sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);
 
                 int msgSent = sendMessageInfo.getTotalMessages();
+                totalMessages -= msgSent;

Review comment:
       Because the consume will ignore the acked index.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
##########
@@ -532,13 +532,14 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
 
                 EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
                 EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
-                filterEntriesForConsumer(Optional.ofNullable(entryWrappers), entriesForThisConsumer, batchSizes,
-                        sendMessageInfo, batchIndexesAcks, cursor, readType == ReadType.Replay);
+                filterEntriesForConsumer(Optional.ofNullable(entryWrappers), start, entriesForThisConsumer,
+                        batchSizes, sendMessageInfo, batchIndexesAcks, cursor, readType == ReadType.Replay);
 
                 c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
                         sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);
 
                 int msgSent = sendMessageInfo.getTotalMessages();
+                totalMessages -= msgSent;

Review comment:
       change to remainingMessages




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #10824: Fix consumer stuck issue due to reuse entry wrapper.

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #10824:
URL: https://github.com/apache/pulsar/pull/10824#discussion_r645299661



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
##########
@@ -114,9 +114,11 @@ public void filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, List
                 continue;
             }
             ByteBuf metadataAndPayload = entry.getDataBuffer();
-            MessageMetadata msgMetadata = entryWrapper.isPresent() && entryWrapper.get()[i] != null
-                    ? entryWrapper.get()[i].getMetadata()
+            int entryWrapperIndex = i + entryWrapperOffset;
+            MessageMetadata msgMetadata = entryWrapper.isPresent() && entryWrapper.get()[entryWrapperIndex] != null
+                    ? entryWrapper.get()[entryWrapperIndex].getMetadata()
                     : null;
+            System.out.println(msgMetadata.getNumMessagesInBatch());

Review comment:
       Forgotten System.out debug logging?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
##########
@@ -532,13 +532,14 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
 
                 EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
                 EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
-                filterEntriesForConsumer(Optional.ofNullable(entryWrappers), entriesForThisConsumer, batchSizes,
-                        sendMessageInfo, batchIndexesAcks, cursor, readType == ReadType.Replay);
+                filterEntriesForConsumer(Optional.ofNullable(entryWrappers), start, entriesForThisConsumer,
+                        batchSizes, sendMessageInfo, batchIndexesAcks, cursor, readType == ReadType.Replay);
 
                 c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
                         sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);
 
                 int msgSent = sendMessageInfo.getTotalMessages();
+                totalMessages -= msgSent;

Review comment:
       The naming of `totalMessages` seems a bit confusing. Isn't it more about `remainingMessages`?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
##########
@@ -532,13 +532,14 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
 
                 EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
                 EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
-                filterEntriesForConsumer(Optional.ofNullable(entryWrappers), entriesForThisConsumer, batchSizes,
-                        sendMessageInfo, batchIndexesAcks, cursor, readType == ReadType.Replay);
+                filterEntriesForConsumer(Optional.ofNullable(entryWrappers), start, entriesForThisConsumer,
+                        batchSizes, sendMessageInfo, batchIndexesAcks, cursor, readType == ReadType.Replay);
 
                 c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
                         sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);
 
                 int msgSent = sendMessageInfo.getTotalMessages();
+                totalMessages -= msgSent;

Review comment:
       I'm trying to understand the logic beyond the changes made in this PR. I have a question about the calculations.
   
   A few lines below, there is this line of code on line 545:
   ```
                   TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this,
                           -(msgSent - batchIndexesAcks.getTotalAckedIndexCount()));
   ```
   
   Just wondering if  `batchIndexesAcks.getTotalAckedIndexCount()` should impact the calculation `totalMessages -= msgSent`. Why does it impact `totalAvailablePermits` field (updated with `TOTAL_AVAILABLE_PERMITS_UPDATER` )?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #10824: Fix consumer stuck issue due to reuse entry wrapper.

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #10824:
URL: https://github.com/apache/pulsar/pull/10824#issuecomment-854406842


   The change makes sense to me.
   Let me test it locally and see if my case is solved.
   
   Thank you very much !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org