You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/04/15 23:00:31 UTC
[pulsar] branch master updated: [pulsar-broker] avoid backpressure
by skipping dispatching if consumer channel is not writable (#6740)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1fd1b2b [pulsar-broker] avoid backpressure by skipping dispatching if consumer channel is not writable (#6740)
1fd1b2b is described below
commit 1fd1b2b440af2477f916999a67752f9f532d1620
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Wed Apr 15 16:00:14 2020 -0700
[pulsar-broker] avoid backpressure by skipping dispatching if consumer channel is not writable (#6740)
### Motivation
Recently we are seeing broker is crashing with OutOfMemory when it has higher dispatch rate with large size messages. High message rate out saturates network and broker will try to write on the channel which is not writable which buffers the message and eventually broker sees OOM and shutdown with below error:
```
java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:175) ~[?:?]
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118) ~[?:?]
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) ~[?:?]
at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:769) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:745) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.buffer.PoolArena.allocate(PoolArena.java:226) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.buffer.PoolArena.allocate(PoolArena.java:146) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:164) ~[bookkeeper-common-allocator-4.9.4.2-yahoo.jar:4.9.4.2-yahoo]
at org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:158) ~[bookkeeper-common-allocator-4.9.4.2-yahoo.jar:4.9.4.2-yahoo]
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.handler.ssl.SslHandler.allocate(SslHandler.java:1912) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.handler.ssl.SslHandler.allocateOutNetBuf(SslHandler.java:1923) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:826) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:797) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.handler.ssl.SslHandler.flush(SslHandler.java:778) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
at org.apache.pulsar.broker.service.Consumer.lambda$sendMessages$51(Consumer.java:265) ~[pulsar-broker-2.4.6-yahoo.jar:2.4.6-yahoo]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) [netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) [netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:335) [netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) [netty-all-4.1.32.Final.jar:4.1.32.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-all-4.1.32.Final.jar:4.1.32.Final]
at java.lang.Thread.run(Thread.java:834) [?:?]
```
### Modification
In order to reduce backpressure, broker should slow down dispatching if consumer cnx-channel is writable. Broker does it for replicator and Exclusive consumer but not doing for shared consumer. So, add similar check for shared subscription to avoid OOM for high dispatch rate. It might be helpful for #5896 as well.
---
.../PersistentDispatcherMultipleConsumers.java | 27 +++++++++++++++++++++-
...istentStickyKeyDispatcherMultipleConsumers.java | 7 +++++-
2 files changed, 32 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 4c9aef1..74d131f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -254,6 +254,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
if (totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
int messagesToRead = Math.min(totalAvailablePermits, readBatchSize);
+ if (!isConsumerWritable()) {
+ // If the connection is not currently writable, we issue the read request anyway, but for a single
+ // message. The intent here is to keep use the request as a notification mechanism while avoiding to
+ // read and dispatch a big batch of messages which will need to wait before getting written to the
+ // socket.
+ messagesToRead = 1;
+ }
+
// throttle only if: (1) cursor is not active (or flag for throttle-nonBacklogConsumer is enabled) bcz
// active-cursor reads message from cache rather from bookkeeper (2) if topic has reached message-rate
// threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS
@@ -476,8 +484,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
}
// round-robin dispatch batch size for this consumer
+ int availablePermits = c.isWritable() ? c.getAvailablePermits() : 1;
+ if (log.isDebugEnabled() && !c.isWritable()) {
+ log.debug("[{}-{}] consumer is not writable. dispatching only 1 message to {} ", topic.getName(), name,
+ c);
+ }
int messagesForC = Math.min(
- Math.min(entriesToDispatch, c.getAvailablePermits()),
+ Math.min(entriesToDispatch, availablePermits),
serviceConfig.getDispatcherMaxRoundRobinBatchSize());
if (messagesForC > 0) {
@@ -614,6 +627,18 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
return false;
}
+ private boolean isConsumerWritable() {
+ for (Consumer consumer : consumerList) {
+ if (consumer.isWritable()) {
+ return true;
+ }
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("[{}-{}] consumer is not writable", topic.getName(), name);
+ }
+ return false;
+ }
+
@Override
public boolean isConsumerAvailable(Consumer consumer) {
return consumer != null && !consumer.isBlocked() && consumer.getAvailablePermits() > 0;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 248f45a..c651782 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -92,7 +92,12 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
return;
}
- int messagesForC = Math.min(entriesWithSameKey.getValue().size(), consumer.getAvailablePermits());
+ int availablePermits = consumer.isWritable() ? consumer.getAvailablePermits() : 1;
+ if (log.isDebugEnabled() && !consumer.isWritable()) {
+ log.debug("[{}-{}] consumer is not writable. dispatching only 1 message to {} ", topic.getName(), name,
+ consumer);
+ }
+ int messagesForC = Math.min(entriesWithSameKey.getValue().size(), availablePermits);
if (log.isDebugEnabled()) {
log.debug("[{}] select consumer {} for key {} with messages num {}, read type is {}",
name, consumer.consumerName(), entriesWithSameKey.getKey(), messagesForC, readType);