You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/30 11:09:01 UTC

[GitHub] [pulsar] eolivelli opened a new pull request, #16304: [fix][broker] Do not use IO thread for consumerFlow in Shared subscription

eolivelli opened a new pull request, #16304:
URL: https://github.com/apache/pulsar/pull/16304

   ### Motivation
   
   In `consumerFlow` for Shared subscriptions (PersistentDispatcherMultipleConsumers) we execute readMoreEntries and this happens in the `pulsar-io` threadpool.
   
   In PersistentDispatcherSingleActiveConsumer (Failover/Exclusive) we defer the execution of that code to another threadpool 
   
   @dave2wave reported in OMB tests with offloaders that the execution of tests with Shared subscription is really less performant.
   
   ### Modifications
   
   Move the execution of readMoreEntries to the topic thread.
   
   ### Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on pull request #16304: [fix][broker] Do not use IO thread for consumerFlow in Shared subscription

Posted by GitBox <gi...@apache.org>.
eolivelli commented on PR #16304:
URL: https://github.com/apache/pulsar/pull/16304#issuecomment-1175859712

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on pull request #16304: [fix][broker] Do not use IO thread for consumerFlow in Shared subscription

Posted by GitBox <gi...@apache.org>.
eolivelli commented on PR #16304:
URL: https://github.com/apache/pulsar/pull/16304#issuecomment-1176094870

   Actually the read from offloader  happens in another thread.
   But we still are contending on the Dispatcher instance on a shared thread pool used for Broker io.
   
   This patch doesn't bring much improvement in performance. But it is a good clean up, in order to simplify how threading works for the Dispatcher. I was surprised when I found this difference


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on pull request #16304: [fix][broker] Do not use IO thread for consumerFlow in Shared subscription

Posted by GitBox <gi...@apache.org>.
eolivelli commented on PR #16304:
URL: https://github.com/apache/pulsar/pull/16304#issuecomment-1173760419

   @dave2wave reported that this patch brings some small improvements but not very much.
   
   btw I think that it is better to move this `synchronised` part out of the main "pulsar-io" thread pool.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on a diff in pull request #16304: [fix][broker] Do not use IO thread for consumerFlow in Shared subscription

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on code in PR #16304:
URL: https://github.com/apache/pulsar/pull/16304#discussion_r913091092


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -205,7 +206,13 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE
     }
 
     @Override
-    public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
+    public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
+        topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topic.getName(), SafeRun.safeRun(() -> {
+            internalConsumerFlow(consumer, additionalNumberOfMessages);
+        }));
+    }
+
+    private synchronized void internalConsumerFlow(Consumer consumer, int additionalNumberOfMessages) {

Review Comment:
   If we use the same thread to execute internalConsumerFlow in order, do we still need `synchronized`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16304: [fix][broker] Do not use IO thread for consumerFlow in Shared subscription

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16304:
URL: https://github.com/apache/pulsar/pull/16304#discussion_r913091969


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -205,7 +206,13 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE
     }
 
     @Override
-    public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
+    public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
+        topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topic.getName(), SafeRun.safeRun(() -> {
+            internalConsumerFlow(consumer, additionalNumberOfMessages);
+        }));
+    }
+
+    private synchronized void internalConsumerFlow(Consumer consumer, int additionalNumberOfMessages) {

Review Comment:
   this is required, because we are accessing shared variables



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli merged pull request #16304: [fix][broker] Do not use IO thread for consumerFlow in Shared subscription

Posted by GitBox <gi...@apache.org>.
eolivelli merged PR #16304:
URL: https://github.com/apache/pulsar/pull/16304


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16304: [fix][broker] Do not use IO thread for consumerFlow in Shared subscription

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16304:
URL: https://github.com/apache/pulsar/pull/16304#discussion_r913092154


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -205,7 +206,13 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE
     }
 
     @Override
-    public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
+    public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
+        topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topic.getName(), SafeRun.safeRun(() -> {
+            internalConsumerFlow(consumer, additionalNumberOfMessages);
+        }));
+    }
+
+    private synchronized void internalConsumerFlow(Consumer consumer, int additionalNumberOfMessages) {

Review Comment:
   maybe it won't add costs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on pull request #16304: [fix][broker] Do not use IO thread for consumerFlow in Shared subscription

Posted by GitBox <gi...@apache.org>.
eolivelli commented on PR #16304:
URL: https://github.com/apache/pulsar/pull/16304#issuecomment-1178928285

   @hangc0276 @codelipenghui thanks for your review, I have simplified the patch.
   no need for an additional review, but I wanted to let you know that the patch is a little bit different
   
   the last change is about the fact that in this class we are not using the Topic Ordered executor, so I have updated the code to use the same threadpool we use when calling readMoreEntries()


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Do not use IO thread for consumerFlow in Shared subscription [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on PR #16304:
URL: https://github.com/apache/pulsar/pull/16304#issuecomment-2045850043

   @eolivelli It looks does not make any change, BrokerService#executor also returns I/O threadpool. Is there any potential mechanism?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on pull request #16304: [fix][broker] Do not use IO thread for consumerFlow in Shared subscription

Posted by GitBox <gi...@apache.org>.
eolivelli commented on PR #16304:
URL: https://github.com/apache/pulsar/pull/16304#issuecomment-1171085785

   @hangc0276 this is more details about `BlobStoreBackedReadHandleImpl`
   
   This is when the read happens (it is actually “triggered”) for the Shared subscription (PersistentDispatcherMultipleConsumers)
   
   ```
   2022-06-30T12:23:19,689+0200 [pulsar-io-18-21] INFO  org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreBackedReadHandleImpl - Ledger 0: reading 95 - 95
   java.lang.Exception: Reading 95 95
   at org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreBackedReadHandleImpl.readAsync(BlobStoreBackedReadHandleImpl.java:105) ~[?:?]
   at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.asyncReadEntry0(EntryCacheImpl.java:211) ~[managed-ledger.jar:2.10.1.1-SNAPSHOT]
   at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.asyncReadEntry(EntryCacheImpl.java:188) ~[managed-ledger.jar:2.10.1.1-SNAPSHOT]
   at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncReadEntry(ManagedLedgerImpl.java:1982) ~[managed-ledger.jar:2.10.1.1-SNAPSHOT]
   at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$asyncReadEntry$24(ManagedLedgerImpl.java:1899) ~[managed-ledger.jar:2.10.1.1-SNAPSHOT]
   at java.util.concurrent.CompletableFuture.uniAcceptNow(CompletableFuture.java:753) ~[?:?]
   at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:731) ~[?:?]
   at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2108) ~[?:?]
   at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncReadEntry(ManagedLedgerImpl.java:1899) ~[managed-ledger.jar:2.10.1.1-SNAPSHOT]
   at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.lambda$asyncReplayEntries$11(ManagedCursorImpl.java:1413) ~[managed-ledger.jar:2.10.1.1-SNAPSHOT]
   at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) [?:?]
   at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) [?:?]
   at java.util.TreeMap$KeySpliterator.forEachRemaining(TreeMap.java:2739) [?:?]
   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) [?:?]
   at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) [?:?]
   at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) [?:?]
   at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) [?:?]
   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) [?:?]
   at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) [?:?]
   at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReplayEntries(ManagedCursorImpl.java:1407) [managed-ledger.jar:2.10.1.1-SNAPSHOT]
   at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.asyncReplayEntriesInOrder(PersistentDispatcherMultipleConsumers.java:393) [pulsar-broker.jar:2.10.1.1-SNAPSHOT]
   at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:252) [pulsar-broker.jar:2.10.1.1-SNAPSHOT]
   at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.consumerFlow(PersistentDispatcherMultipleConsumers.java:224) [pulsar-broker.jar:2.10.1.1-SNAPSHOT]
   at org.apache.pulsar.broker.service.persistent.PersistentSubscription.consumerFlow(PersistentSubscription.java:363) [pulsar-broker.jar:2.10.1.1-SNAPSHOT]
   at org.apache.pulsar.broker.service.Consumer.flowPermits(Consumer.java:691) [pulsar-broker.jar:2.10.1.1-SNAPSHOT]
   at org.apache.pulsar.broker.service.ServerCnx.handleFlow(ServerCnx.java:1583) [pulsar-broker.jar:2.10.1.1-SNAPSHOT]
   ```
   
    
   
   And this is the Failover subscription
   
   ```
   2022-06-30T12:25:07,666+0200 [broker-topic-workers-OrderedExecutor-3-0] INFO  org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreBackedReadHandleImpl - Ledger 0: reading 195 - 237
   java.lang.Exception: Reading 195 237
   at org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreBackedReadHandleImpl.readAsync(BlobStoreBackedReadHandleImpl.java:105) ~[?:?]
   at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.asyncReadEntry0(EntryCacheImpl.java:292) ~[managed-ledger.jar:2.10.1.1-SNAPSHOT]
   at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.asyncReadEntry(EntryCacheImpl.java:242) ~[managed-ledger.jar:2.10.1.1-SNAPSHOT]
   at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncReadEntry(ManagedLedgerImpl.java:1997) ~[managed-ledger.jar:2.10.1.1-SNAPSHOT]
   at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.internalReadFromLedger(ManagedLedgerImpl.java:1969) ~[managed-ledger.jar:2.10.1.1-SNAPSHOT]
   at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$asyncReadEntries$17(ManagedLedgerImpl.java:1779) ~[managed-ledger.jar:2.10.1.1-SNAPSHOT]
   at java.util.concurrent.CompletableFuture.uniAcceptNow(CompletableFuture.java:753) ~[?:?]
   at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:731) ~[?:?]
   at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2108) ~[?:?]
   at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncReadEntries(ManagedLedgerImpl.java:1779) ~[managed-ledger.jar:2.10.1.1-SNAPSHOT]
   at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReadEntries(ManagedCursorImpl.java:721) ~[managed-ledger.jar:2.10.1.1-SNAPSHOT]
   at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReadEntries(ManagedCursorImpl.java:704) ~[managed-ledger.jar:2.10.1.1-SNAPSHOT]
   at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReadEntriesOrWait(ManagedCursorImpl.java:855) ~[managed-ledger.jar:2.10.1.1-SNAPSHOT]
   at org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.readMoreEntries(PersistentDispatcherSingleActiveConsumer.java:363) ~[pulsar-broker.jar:2.10.1.1-SNAPSHOT]
   at org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.internalConsumerFlow(PersistentDispatcherSingleActiveConsumer.java:285) ~[pulsar-broker.jar:2.10.1.1-SNAPSHOT]
   at org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.lambda$consumerFlow$5(PersistentDispatcherSingleActiveConsumer.java:261) ~[pulsar-broker.jar:2.10.1.1-SNAPSHOT]
   at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [managed-ledger.jar:2.10.1.1-SNAPSHOT]
   at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [bookkeeper-common-4.14.5.1.0.1.jar:4.14.5.1.0.1]
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
   at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.77.Final.jar:4.1.77.Final]
   at java.lang.Thread.run(Thread.java:829) [?:?]
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on pull request #16304: [fix][broker] Do not use IO thread for consumerFlow in Shared subscription

Posted by GitBox <gi...@apache.org>.
eolivelli commented on PR #16304:
URL: https://github.com/apache/pulsar/pull/16304#issuecomment-1171086368

   we are doing more performance testing with @dave2wave . I will make this patch as "ready" when we have full confirmation of the benefits


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on pull request #16304: [fix][broker] Do not use IO thread for consumerFlow in Shared subscription

Posted by GitBox <gi...@apache.org>.
eolivelli commented on PR #16304:
URL: https://github.com/apache/pulsar/pull/16304#issuecomment-1176143082

   > And it's better to have a test data with this new change to make it can really resolve the problem
   
   @dave2wave confirmed that in his testing with OpenMessaging benchmarks this patch is a small win.
   
   so @codelipenghui I believe it is worth to do this change.
   
   I am sending other patches regarding this stuff in order to improve Shared subscriptions with Offloaders.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org