You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:33:55 UTC

[pulsar] 09/38: [pulsar-broker] avoid backpressure by skipping dispatching if consumer channel is not writable (#6740)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0d5d0c1559d695e870aec1feb970b5fb01e8f2b7
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Wed Apr 15 16:00:14 2020 -0700

    [pulsar-broker] avoid backpressure by skipping dispatching if consumer channel is not writable (#6740)
    
    ### Motivation
    Recently we are seeing broker is crashing with OutOfMemory when it has higher dispatch rate with large size messages. High message rate out saturates network and broker will try to write on the channel which is not writable which buffers the message and eventually broker sees OOM and shutdown with below error:
    ```
    java.lang.OutOfMemoryError: Direct buffer memory
            at java.nio.Bits.reserveMemory(Bits.java:175) ~[?:?]
            at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118) ~[?:?]
            at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) ~[?:?]
            at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:769) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:745) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.buffer.PoolArena.allocate(PoolArena.java:226) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.buffer.PoolArena.allocate(PoolArena.java:146) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:164) ~[bookkeeper-common-allocator-4.9.4.2-yahoo.jar:4.9.4.2-yahoo]
            at org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:158) ~[bookkeeper-common-allocator-4.9.4.2-yahoo.jar:4.9.4.2-yahoo]
            at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.handler.ssl.SslHandler.allocate(SslHandler.java:1912) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.handler.ssl.SslHandler.allocateOutNetBuf(SslHandler.java:1923) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:826) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:797) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.handler.ssl.SslHandler.flush(SslHandler.java:778) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
            at org.apache.pulsar.broker.service.Consumer.lambda$sendMessages$51(Consumer.java:265) ~[pulsar-broker-2.4.6-yahoo.jar:2.4.6-yahoo]
            at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) [netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) [netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:335) [netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) [netty-all-4.1.32.Final.jar:4.1.32.Final]
            at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-all-4.1.32.Final.jar:4.1.32.Final]
            at java.lang.Thread.run(Thread.java:834) [?:?]
    ```
    
    ### Modification
    In order to reduce backpressure, broker should slow down dispatching if consumer cnx-channel is writable. Broker does it for replicator and Exclusive consumer but not doing for shared consumer. So, add similar check for shared subscription to avoid OOM for high dispatch rate. It might be helpful for #5896 as well.
    (cherry picked from commit 1fd1b2b440af2477f916999a67752f9f532d1620)
---
 .../PersistentDispatcherMultipleConsumers.java     | 27 +++++++++++++++++++++-
 ...istentStickyKeyDispatcherMultipleConsumers.java |  7 +++++-
 2 files changed, 32 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 08a3a01..b69ee94 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -255,6 +255,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         if (totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
             int messagesToRead = Math.min(totalAvailablePermits, readBatchSize);
 
+            if (!isConsumerWritable()) {
+                // If the connection is not currently writable, we issue the read request anyway, but for a single
+                // message. The intent here is to keep use the request as a notification mechanism while avoiding to
+                // read and dispatch a big batch of messages which will need to wait before getting written to the
+                // socket.
+                messagesToRead = 1;
+            }
+
             // throttle only if: (1) cursor is not active (or flag for throttle-nonBacklogConsumer is enabled) bcz
             // active-cursor reads message from cache rather from bookkeeper (2) if topic has reached message-rate
             // threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS
@@ -481,8 +489,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             }
 
             // round-robin dispatch batch size for this consumer
+            int availablePermits = c.isWritable() ? c.getAvailablePermits() : 1;
+            if (log.isDebugEnabled() && !c.isWritable()) {
+                log.debug("[{}-{}] consumer is not writable. dispatching only 1 message to {} ", topic.getName(), name,
+                        c);
+            }
             int messagesForC = Math.min(
-                    Math.min(entriesToDispatch, c.getAvailablePermits()),
+                    Math.min(entriesToDispatch, availablePermits),
                     serviceConfig.getDispatcherMaxRoundRobinBatchSize());
 
             if (messagesForC > 0) {
@@ -619,6 +632,18 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         return false;
     }
 
+    private boolean isConsumerWritable() {
+        for (Consumer consumer : consumerList) {
+            if (consumer.isWritable()) {
+                return true;
+            }
+        }
+        if (log.isDebugEnabled()) {
+            log.debug("[{}-{}] consumer is not writable", topic.getName(), name);
+        }
+        return false;
+    }
+
     @Override
     public boolean isConsumerAvailable(Consumer consumer) {
         return consumer != null && !consumer.isBlocked() && consumer.getAvailablePermits() > 0;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index b8b85ab..a0b23b3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -92,7 +92,12 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                 return;
             }
 
-            int messagesForC = Math.min(entriesWithSameKey.getValue().size(), consumer.getAvailablePermits());
+            int availablePermits = consumer.isWritable() ? consumer.getAvailablePermits() : 1;
+            if (log.isDebugEnabled() && !consumer.isWritable()) {
+                log.debug("[{}-{}] consumer is not writable. dispatching only 1 message to {} ", topic.getName(), name,
+                        consumer);
+            }
+            int messagesForC = Math.min(entriesWithSameKey.getValue().size(), availablePermits);
             if (log.isDebugEnabled()) {
                 log.debug("[{}] select consumer {} for key {} with messages num {}, read type is {}",
                         name, consumer.consumerName(), entriesWithSameKey.getKey(), messagesForC, readType);