You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by franz1981 <gi...@git.apache.org> on 2017/03/22 14:49:34 UTC

[GitHub] activemq-artemis pull request #1119: ARTEMIS-1025 OutOfDirectMemoryError rai...

GitHub user franz1981 opened a pull request:

    https://github.com/apache/activemq-artemis/pull/1119

    ARTEMIS-1025 OutOfDirectMemoryError raised from Netty

    It solve the OOM issue while under heavy load while:
    - allowing the NettyConnection to perform a (configurable) weak form of backpressure on unbatched "big" messages
    - allowing multiple (concurrent) clients to query the memory usage of the connection
    - allowing batching without involving further copies of the data
    - providing detailed memory load informations on OOM errors

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/franz1981/activemq-artemis oom_netty_batch_backpressure

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/activemq-artemis/pull/1119.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1119
    
----
commit a421cd5b19e95feddfbb4b35e423591617e86283
Author: Francesco Nigro <ni...@gmail.com>
Date:   2017-03-15T13:52:52Z

    ARTEMIS-1025 OutOfDirectMemoryError raised from Netty

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #1119: ARTEMIS-1025 OutOfDirectMemoryError rai...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1119#discussion_r109327492
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java ---
    @@ -92,69 +86,143 @@ public NettyConnection(final Map<String, Object> configuration,
     
           this.listener = listener;
     
    +      this.directDeliver = directDeliver;
    +
           this.batchingEnabled = batchingEnabled;
     
    -      this.directDeliver = directDeliver;
    +      this.writeBufferHighWaterMark = this.channel.config().getWriteBufferHighWaterMark();
    +
    +      this.batchLimit = batchingEnabled ? Math.min(this.writeBufferHighWaterMark, DEFAULT_BATCH_BYTES) : 0;
    +   }
    +
    +   private static void waitFor(ChannelPromise promise, long millis) {
    +      try {
    +         final boolean completed = promise.await(millis);
    +         if (!completed) {
    +            ActiveMQClientLogger.LOGGER.timeoutFlushingPacket();
    +         }
    +      } catch (InterruptedException e) {
    +         throw new ActiveMQInterruptedException(e);
    +      }
    +   }
    +
    +   /**
    +    * Returns an estimation of the current size of the write buffer in the channel.
    +    * To obtain a more precise value is necessary to use the unsafe API of the channel to
    +    * call the {@link io.netty.channel.ChannelOutboundBuffer#totalPendingWriteBytes()}.
    +    * Anyway, both these values are subject to concurrent modifications.
    +    */
    +   private static int batchBufferSize(Channel channel, int writeBufferHighWaterMark) {
    +      //Channel::bytesBeforeUnwritable is performing a volatile load
    +      //this is the reason why writeBufferHighWaterMark is passed as an argument
    +      final int bytesBeforeUnwritable = (int) channel.bytesBeforeUnwritable();
    +      assert bytesBeforeUnwritable >= 0;
    +      final int writtenBytes = writeBufferHighWaterMark - bytesBeforeUnwritable;
    +      assert writtenBytes >= 0;
    +      return writtenBytes;
    +   }
    +
    +   /**
    +    * When batching is not enabled, it tries to back-pressure the caller thread.
    +    * The back-pressure provided is not before the writeAndFlush request, buf after it: too many threads that are not
    +    * using {@link Channel#isWritable} to know when push unbatched data will risk to cause OOM due to the enqueue of each own {@link Channel#writeAndFlush} requests.
    +    * Trying to provide back-pressure before the {@link Channel#writeAndFlush} request could work, but in certain scenarios it will block {@link Channel#isWritable} to be true.
    +    */
    +   private static ChannelFuture backPressuredWriteAndFlush(final ByteBuf bytes,
    +                                                           final int readableBytes,
    +                                                           final Channel channel,
    +                                                           final ChannelPromise promise) {
    +      final ChannelFuture future;
    +      if (!channel.isWritable()) {
    +         final ChannelPromise channelPromise = promise.isVoid() ? channel.newPromise() : promise;
    +         future = channel.writeAndFlush(bytes, channelPromise);
    +         //is the channel is not writable wait the current request to be flushed, providing backpressuring on the caller thread
    +         if (!channel.isWritable() && !future.awaitUninterruptibly(DEFAULT_BACK_PRESSURE_WAIT_MILLIS)) {
    +            if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
    --- End diff --
    
    >> but I'm worried it could lead to a storm of users complaining that while sending 1 EB <<
    
    *We* are the users on this case...
    
    if an user is sending an huge message, it's out job to break it up into smaller packets...
    
    on that case, we never block inside the NettyConnector... we verify isWritable().. and stop writing...
    
    I don't think we need to be very specific about the capacity... we just put the bytes and resume whatever was happening before.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis issue #1119: ARTEMIS-1025 OutOfDirectMemoryError raised fro...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1119
  
    It doesn't unblock once blocked.  That's the issue.  
    
    
    
    
    It wasn't supposed to block in the first place.  It should avoid it before it blocks.  
    
    Let's talk about this later today.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis issue #1119: ARTEMIS-1025 OutOfDirectMemoryError raised fro...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1119
  
    I see what you're doing now... if !isWritable, it think it should perform a flush and block the thread...
    
    
    We should always avoid actually calling write while isWritable...
    
    
    
    I think this needs some simplification though...
    
    
    i - just call a simple writeAndflush if beyond the limit... you may add a log.debug (**debug**) statement if it needed to flush for the lack of flow control from our codebase. We can use a fixed timeout here.. and log.warn if missed. We have a callback timeout already.. we should just reuse the same. (the less properties to be configured the better).
    
    ii - that means no more crazy properties.
    
    
    We may talk on monday about this.. but this is getting shape for me now.. we just need to clear the PR before we merge it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #1119: ARTEMIS-1025 OutOfDirectMemoryError rai...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1119#discussion_r109077316
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java ---
    @@ -46,40 +43,37 @@
     
     public class NettyConnection implements Connection {
     
    -   // Constants -----------------------------------------------------
    -   private static final int BATCHING_BUFFER_SIZE = 8192;
    -
    -   // Attributes ----------------------------------------------------
    +   private static final int DEFAULT_MTU_BYTES = Integer.getInteger("io.netty.mtu", 1460);
    +   //backpressure on unbatched writes is enabled by default
    --- End diff --
    
    Any form of configuration here would have to be done through connection properties.. on the case through the URI that will specify the connector...
    
    
    I'm not saying I agree with these.. more to follow


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis issue #1119: ARTEMIS-1025 Improve flow control on NettyConn...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1119
  
    @franz1981 Awesome work here Francesco!! thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #1119: ARTEMIS-1025 OutOfDirectMemoryError rai...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1119#discussion_r109248555
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java ---
    @@ -92,69 +86,143 @@ public NettyConnection(final Map<String, Object> configuration,
     
           this.listener = listener;
     
    +      this.directDeliver = directDeliver;
    +
           this.batchingEnabled = batchingEnabled;
     
    -      this.directDeliver = directDeliver;
    +      this.writeBufferHighWaterMark = this.channel.config().getWriteBufferHighWaterMark();
    +
    +      this.batchLimit = batchingEnabled ? Math.min(this.writeBufferHighWaterMark, DEFAULT_BATCH_BYTES) : 0;
    +   }
    +
    +   private static void waitFor(ChannelPromise promise, long millis) {
    +      try {
    +         final boolean completed = promise.await(millis);
    +         if (!completed) {
    +            ActiveMQClientLogger.LOGGER.timeoutFlushingPacket();
    +         }
    +      } catch (InterruptedException e) {
    +         throw new ActiveMQInterruptedException(e);
    +      }
    +   }
    +
    +   /**
    +    * Returns an estimation of the current size of the write buffer in the channel.
    +    * To obtain a more precise value is necessary to use the unsafe API of the channel to
    +    * call the {@link io.netty.channel.ChannelOutboundBuffer#totalPendingWriteBytes()}.
    +    * Anyway, both these values are subject to concurrent modifications.
    +    */
    +   private static int batchBufferSize(Channel channel, int writeBufferHighWaterMark) {
    +      //Channel::bytesBeforeUnwritable is performing a volatile load
    +      //this is the reason why writeBufferHighWaterMark is passed as an argument
    +      final int bytesBeforeUnwritable = (int) channel.bytesBeforeUnwritable();
    +      assert bytesBeforeUnwritable >= 0;
    +      final int writtenBytes = writeBufferHighWaterMark - bytesBeforeUnwritable;
    +      assert writtenBytes >= 0;
    +      return writtenBytes;
    +   }
    +
    +   /**
    +    * When batching is not enabled, it tries to back-pressure the caller thread.
    +    * The back-pressure provided is not before the writeAndFlush request, buf after it: too many threads that are not
    +    * using {@link Channel#isWritable} to know when push unbatched data will risk to cause OOM due to the enqueue of each own {@link Channel#writeAndFlush} requests.
    +    * Trying to provide back-pressure before the {@link Channel#writeAndFlush} request could work, but in certain scenarios it will block {@link Channel#isWritable} to be true.
    +    */
    +   private static ChannelFuture backPressuredWriteAndFlush(final ByteBuf bytes,
    +                                                           final int readableBytes,
    +                                                           final Channel channel,
    +                                                           final ChannelPromise promise) {
    +      final ChannelFuture future;
    +      if (!channel.isWritable()) {
    +         final ChannelPromise channelPromise = promise.isVoid() ? channel.newPromise() : promise;
    +         future = channel.writeAndFlush(bytes, channelPromise);
    +         //is the channel is not writable wait the current request to be flushed, providing backpressuring on the caller thread
    +         if (!channel.isWritable() && !future.awaitUninterruptibly(DEFAULT_BACK_PRESSURE_WAIT_MILLIS)) {
    +            if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
    --- End diff --
    
    I previously complained here that the log.was trace, and you were checking of isTrace...
    
    This is probably a warn...
    
    
    if it's not being able to flush in time, the writable thread is probably busy with something else.
    
    
    perhaps we should even do something more drastic eventually.. such as close the connection?
    
    
    for now a warn will do... (I may merge this and do it myself)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis issue #1119: ARTEMIS-1025 OutOfDirectMemoryError raised fro...

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1119
  
    @clebertsuconic Hung on AMQP always or only in the 50K case?
    If you're experiencing it on the 50K case it is the effect of the backpressure needed to avoid OOM errors while [streaming huge messages](https://issues.apache.org/jira/browse/ARTEMIS-1036): it is configurable using the property -Dio.netty.disable.backpressure=true or lowering the default -Dio.netty.backpressure.millis=1000. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis issue #1119: ARTEMIS-1025 OutOfDirectMemoryError raised fro...

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1119
  
    @clebertsuconic @jbertram Thanks guys!
    Np @clebertsuconic , another couple of eyes on it will be more than welcome: it is a impactful change hence better have more than my 4 eyes on it :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #1119: ARTEMIS-1025 OutOfDirectMemoryError rai...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1119#discussion_r109950618
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java ---
    @@ -211,143 +251,203 @@ public void run() {
     
        @Override
        public ActiveMQBuffer createTransportBuffer(final int size) {
    -      return new ChannelBufferWrapper(PooledByteBufAllocator.DEFAULT.directBuffer(size), true);
    +      try {
    +         return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true);
    +      } catch (OutOfMemoryError oom) {
    +         if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
    +            final long totalPendingWriteBytes = batchBufferSize(this.channel, this.writeBufferHighWaterMark);
    +            ActiveMQClientLogger.LOGGER.trace("pendingWrites: [NETTY] -> " + totalPendingWriteBytes + "[EVENT LOOP] -> " + pendingWritesOnEventLoopView.get() + " causes: " + oom.getMessage(), oom);
    +         }
    +         throw oom;
    +      }
        }
     
        @Override
    -   public Object getID() {
    +   public final Object getID() {
           // TODO: Think of it
           return channel.hashCode();
        }
     
        // This is called periodically to flush the batch buffer
        @Override
    -   public void checkFlushBatchBuffer() {
    -      if (!batchingEnabled) {
    -         return;
    -      }
    -
    -      if (writeLock.tryAcquire()) {
    -         try {
    -            if (batchBuffer != null && batchBuffer.readable()) {
    -               channel.writeAndFlush(batchBuffer.byteBuf());
    -
    -               batchBuffer = createTransportBuffer(BATCHING_BUFFER_SIZE);
    -            }
    -         } finally {
    -            writeLock.release();
    +   public final void checkFlushBatchBuffer() {
    +      if (this.batchingEnabled) {
    +         //perform the flush only if necessary
    +         final int batchBufferSize = batchBufferSize(this.channel, this.writeBufferHighWaterMark);
    +         if (batchBufferSize > 0) {
    +            this.channel.flush();
              }
           }
        }
     
        @Override
    -   public void write(final ActiveMQBuffer buffer) {
    +   public final void write(final ActiveMQBuffer buffer) {
           write(buffer, false, false);
        }
     
        @Override
    -   public void write(ActiveMQBuffer buffer, final boolean flush, final boolean batched) {
    +   public final void write(ActiveMQBuffer buffer, final boolean flush, final boolean batched) {
           write(buffer, flush, batched, null);
        }
     
        @Override
    -   public void write(ActiveMQBuffer buffer,
    -                     final boolean flush,
    -                     final boolean batched,
    -                     final ChannelFutureListener futureListener) {
    -
    -      try {
    -         writeLock.acquire();
    -
    -         try {
    -            if (batchBuffer == null && batchingEnabled && batched && !flush) {
    -               // Lazily create batch buffer
    -
    -               batchBuffer = ActiveMQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
    -            }
    -
    -            if (batchBuffer != null) {
    -               batchBuffer.writeBytes(buffer, 0, buffer.writerIndex());
    -
    -               if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE || !batched || flush) {
    -                  // If the batch buffer is full or it's flush param or not batched then flush the buffer
    -
    -                  buffer = batchBuffer;
    -               } else {
    -                  return;
    -               }
    -
    -               if (!batched || flush) {
    -                  batchBuffer = null;
    -               } else {
    -                  // Create a new buffer
    +   public final boolean blockUntilWritable(final int requiredCapacity, final long timeout, final TimeUnit timeUnit) {
    +      final boolean isAllowedToBlock = isAllowedToBlock();
    +      if (!isAllowedToBlock) {
    +         return canWrite(requiredCapacity);
    +      } else {
    +         final long timeoutNanos = timeUnit.toNanos(timeout);
    +         final long deadline = System.nanoTime() + timeoutNanos;
    +         //choose wait time unit size
    +         final long parkNanos;
    +         //if is requested to wait more than a millisecond than we could use
    +         if (timeoutNanos >= 1_000_000L) {
    +            parkNanos = 100_000L;
    +         } else {
    +            //reduce it doesn't make sense, only a spin loop could be enough precise with the most OS
    +            parkNanos = 1000L;
    +         }
    +         boolean canWrite;
    +         while (!(canWrite = canWrite(requiredCapacity)) && System.nanoTime() < deadline) {
    +            LockSupport.parkNanos(parkNanos);
    +         }
    +         return canWrite;
    +      }
    +   }
     
    -                  batchBuffer = ActiveMQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
    -               }
    -            }
    +   private boolean isAllowedToBlock() {
    +      final EventLoop eventLoop = channel.eventLoop();
    +      final boolean inEventLoop = eventLoop.inEventLoop();
    --- End diff --
    
    It's not a blocker for this merge.. it was just a question.. would it return inevenLoop if it's on the evenLoop for another connection?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #1119: ARTEMIS-1025 OutOfDirectMemoryError rai...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1119#discussion_r109950463
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java ---
    @@ -211,143 +251,203 @@ public void run() {
     
        @Override
        public ActiveMQBuffer createTransportBuffer(final int size) {
    -      return new ChannelBufferWrapper(PooledByteBufAllocator.DEFAULT.directBuffer(size), true);
    +      try {
    +         return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true);
    +      } catch (OutOfMemoryError oom) {
    +         if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
    +            final long totalPendingWriteBytes = batchBufferSize(this.channel, this.writeBufferHighWaterMark);
    +            ActiveMQClientLogger.LOGGER.trace("pendingWrites: [NETTY] -> " + totalPendingWriteBytes + "[EVENT LOOP] -> " + pendingWritesOnEventLoopView.get() + " causes: " + oom.getMessage(), oom);
    +         }
    +         throw oom;
    +      }
        }
     
        @Override
    -   public Object getID() {
    +   public final Object getID() {
           // TODO: Think of it
           return channel.hashCode();
        }
     
        // This is called periodically to flush the batch buffer
        @Override
    -   public void checkFlushBatchBuffer() {
    -      if (!batchingEnabled) {
    -         return;
    -      }
    -
    -      if (writeLock.tryAcquire()) {
    -         try {
    -            if (batchBuffer != null && batchBuffer.readable()) {
    -               channel.writeAndFlush(batchBuffer.byteBuf());
    -
    -               batchBuffer = createTransportBuffer(BATCHING_BUFFER_SIZE);
    -            }
    -         } finally {
    -            writeLock.release();
    +   public final void checkFlushBatchBuffer() {
    +      if (this.batchingEnabled) {
    +         //perform the flush only if necessary
    +         final int batchBufferSize = batchBufferSize(this.channel, this.writeBufferHighWaterMark);
    +         if (batchBufferSize > 0) {
    +            this.channel.flush();
              }
           }
        }
     
        @Override
    -   public void write(final ActiveMQBuffer buffer) {
    +   public final void write(final ActiveMQBuffer buffer) {
           write(buffer, false, false);
        }
     
        @Override
    -   public void write(ActiveMQBuffer buffer, final boolean flush, final boolean batched) {
    +   public final void write(ActiveMQBuffer buffer, final boolean flush, final boolean batched) {
           write(buffer, flush, batched, null);
        }
     
        @Override
    -   public void write(ActiveMQBuffer buffer,
    -                     final boolean flush,
    -                     final boolean batched,
    -                     final ChannelFutureListener futureListener) {
    -
    -      try {
    -         writeLock.acquire();
    -
    -         try {
    -            if (batchBuffer == null && batchingEnabled && batched && !flush) {
    -               // Lazily create batch buffer
    -
    -               batchBuffer = ActiveMQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
    -            }
    -
    -            if (batchBuffer != null) {
    -               batchBuffer.writeBytes(buffer, 0, buffer.writerIndex());
    -
    -               if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE || !batched || flush) {
    -                  // If the batch buffer is full or it's flush param or not batched then flush the buffer
    -
    -                  buffer = batchBuffer;
    -               } else {
    -                  return;
    -               }
    -
    -               if (!batched || flush) {
    -                  batchBuffer = null;
    -               } else {
    -                  // Create a new buffer
    +   public final boolean blockUntilWritable(final int requiredCapacity, final long timeout, final TimeUnit timeUnit) {
    +      final boolean isAllowedToBlock = isAllowedToBlock();
    +      if (!isAllowedToBlock) {
    +         return canWrite(requiredCapacity);
    +      } else {
    +         final long timeoutNanos = timeUnit.toNanos(timeout);
    +         final long deadline = System.nanoTime() + timeoutNanos;
    +         //choose wait time unit size
    +         final long parkNanos;
    +         //if is requested to wait more than a millisecond than we could use
    +         if (timeoutNanos >= 1_000_000L) {
    +            parkNanos = 100_000L;
    +         } else {
    +            //reduce it doesn't make sense, only a spin loop could be enough precise with the most OS
    +            parkNanos = 1000L;
    +         }
    +         boolean canWrite;
    +         while (!(canWrite = canWrite(requiredCapacity)) && System.nanoTime() < deadline) {
    +            LockSupport.parkNanos(parkNanos);
    +         }
    +         return canWrite;
    +      }
    +   }
     
    -                  batchBuffer = ActiveMQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
    -               }
    -            }
    +   private boolean isAllowedToBlock() {
    +      final EventLoop eventLoop = channel.eventLoop();
    +      final boolean inEventLoop = eventLoop.inEventLoop();
    --- End diff --
    
    My concern is when you are in the evenLoop for another connection
    
    Acceptor receives data, replication sends to backup.. blocks... main receiver blocks.!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #1119: ARTEMIS-1025 OutOfDirectMemoryError rai...

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1119#discussion_r109967119
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java ---
    @@ -211,143 +251,203 @@ public void run() {
     
        @Override
        public ActiveMQBuffer createTransportBuffer(final int size) {
    -      return new ChannelBufferWrapper(PooledByteBufAllocator.DEFAULT.directBuffer(size), true);
    +      try {
    +         return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true);
    +      } catch (OutOfMemoryError oom) {
    +         if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
    +            final long totalPendingWriteBytes = batchBufferSize(this.channel, this.writeBufferHighWaterMark);
    +            ActiveMQClientLogger.LOGGER.trace("pendingWrites: [NETTY] -> " + totalPendingWriteBytes + "[EVENT LOOP] -> " + pendingWritesOnEventLoopView.get() + " causes: " + oom.getMessage(), oom);
    +         }
    +         throw oom;
    +      }
        }
     
        @Override
    -   public Object getID() {
    +   public final Object getID() {
           // TODO: Think of it
           return channel.hashCode();
        }
     
        // This is called periodically to flush the batch buffer
        @Override
    -   public void checkFlushBatchBuffer() {
    -      if (!batchingEnabled) {
    -         return;
    -      }
    -
    -      if (writeLock.tryAcquire()) {
    -         try {
    -            if (batchBuffer != null && batchBuffer.readable()) {
    -               channel.writeAndFlush(batchBuffer.byteBuf());
    -
    -               batchBuffer = createTransportBuffer(BATCHING_BUFFER_SIZE);
    -            }
    -         } finally {
    -            writeLock.release();
    +   public final void checkFlushBatchBuffer() {
    +      if (this.batchingEnabled) {
    +         //perform the flush only if necessary
    +         final int batchBufferSize = batchBufferSize(this.channel, this.writeBufferHighWaterMark);
    +         if (batchBufferSize > 0) {
    +            this.channel.flush();
              }
           }
        }
     
        @Override
    -   public void write(final ActiveMQBuffer buffer) {
    +   public final void write(final ActiveMQBuffer buffer) {
           write(buffer, false, false);
        }
     
        @Override
    -   public void write(ActiveMQBuffer buffer, final boolean flush, final boolean batched) {
    +   public final void write(ActiveMQBuffer buffer, final boolean flush, final boolean batched) {
           write(buffer, flush, batched, null);
        }
     
        @Override
    -   public void write(ActiveMQBuffer buffer,
    -                     final boolean flush,
    -                     final boolean batched,
    -                     final ChannelFutureListener futureListener) {
    -
    -      try {
    -         writeLock.acquire();
    -
    -         try {
    -            if (batchBuffer == null && batchingEnabled && batched && !flush) {
    -               // Lazily create batch buffer
    -
    -               batchBuffer = ActiveMQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
    -            }
    -
    -            if (batchBuffer != null) {
    -               batchBuffer.writeBytes(buffer, 0, buffer.writerIndex());
    -
    -               if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE || !batched || flush) {
    -                  // If the batch buffer is full or it's flush param or not batched then flush the buffer
    -
    -                  buffer = batchBuffer;
    -               } else {
    -                  return;
    -               }
    -
    -               if (!batched || flush) {
    -                  batchBuffer = null;
    -               } else {
    -                  // Create a new buffer
    +   public final boolean blockUntilWritable(final int requiredCapacity, final long timeout, final TimeUnit timeUnit) {
    +      final boolean isAllowedToBlock = isAllowedToBlock();
    +      if (!isAllowedToBlock) {
    +         return canWrite(requiredCapacity);
    +      } else {
    +         final long timeoutNanos = timeUnit.toNanos(timeout);
    +         final long deadline = System.nanoTime() + timeoutNanos;
    +         //choose wait time unit size
    +         final long parkNanos;
    +         //if is requested to wait more than a millisecond than we could use
    +         if (timeoutNanos >= 1_000_000L) {
    +            parkNanos = 100_000L;
    +         } else {
    +            //reduce it doesn't make sense, only a spin loop could be enough precise with the most OS
    +            parkNanos = 1000L;
    +         }
    +         boolean canWrite;
    +         while (!(canWrite = canWrite(requiredCapacity)) && System.nanoTime() < deadline) {
    +            LockSupport.parkNanos(parkNanos);
    +         }
    +         return canWrite;
    +      }
    +   }
     
    -                  batchBuffer = ActiveMQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
    -               }
    -            }
    +   private boolean isAllowedToBlock() {
    +      final EventLoop eventLoop = channel.eventLoop();
    +      final boolean inEventLoop = eventLoop.inEventLoop();
    --- End diff --
    
    If you try to call it when you are in the event loop it refuses to block to avoid any possible issue about stopping Netty I/O threads!
    Being "inEventLoop" is only to mark a caller thread as one that could perform channels and I/O operations for Netty.
    In general Netty tends to reuse the same thread(s) to perform operations on group of Connections/Sockets (using EPOLL ad NIO at least).
    The call ``inEventLoop`` could return ``true`` only if the current Thread is created through the configured thread factory passed to the Netty's bootstrap and not in base of what it is actually doing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis issue #1119: ARTEMIS-1025 OutOfDirectMemoryError raised fro...

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1119
  
    Considering that is quite an impactful change any feedback/review is more than welcome!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #1119: ARTEMIS-1025 Improve flow control on Ne...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/activemq-artemis/pull/1119


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis issue #1119: ARTEMIS-1025 OutOfDirectMemoryError raised fro...

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1119
  
    @clebertsuconic Exactly, it tries to block it for a finite amount of time and only if not in the event loop: you've mentioned about stalling behaviours and I've dig in the Netty code to find out a possible reason of it, finding only 1 answer: maybe we share the I/O threads (via a pool?) with Netty and calling ``writeAndFlush`` while on one of them.
    Altought it is considered legal, IMHO is something that must be avoided because it increases the risk to go OOM (it steals time to a thread that would be happier while doing I/O stuff and releasing the sent ``ByteBuf``s) and it will lead to unexpected infinite stalls...
    It could be the key to solve other strange blocking behaviours happening on replication too!
    
    Anwers:
    i - The original behaviour was already to perform a ``writeAndFlush`` there, but without applying any backpressure on the caller, streaming huge messages with clusters (with 1 GB per msg) would lead anyway to log some warns but then going OOM.
    As you said we need to perform flow control in our codebase, but the granularity allowed using Netty in idiomatic way couldn't be enough: a "writable" flag is good for the most cases but not enough with very high traffic with long burst of data (=== streaming huge messages in batches).
    Maybe we need an API change, like returning a ``boolean`` on the ``NettyConnection::write`` method, in order to let the caller choose what to do as part of its duties, but is weird ( as a user, I mean).
    Monday we could talk about it better or I could ask on the forum for feedbacks: solving it properly will improve Artemis a lot, I'm sure of it :+1:  
    ii - :100: No more crazy properties
    
    Thanks Cleb!!!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #1119: ARTEMIS-1025 OutOfDirectMemoryError rai...

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1119#discussion_r109368909
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java ---
    @@ -92,69 +86,143 @@ public NettyConnection(final Map<String, Object> configuration,
     
           this.listener = listener;
     
    +      this.directDeliver = directDeliver;
    +
           this.batchingEnabled = batchingEnabled;
     
    -      this.directDeliver = directDeliver;
    +      this.writeBufferHighWaterMark = this.channel.config().getWriteBufferHighWaterMark();
    +
    +      this.batchLimit = batchingEnabled ? Math.min(this.writeBufferHighWaterMark, DEFAULT_BATCH_BYTES) : 0;
    +   }
    +
    +   private static void waitFor(ChannelPromise promise, long millis) {
    +      try {
    +         final boolean completed = promise.await(millis);
    +         if (!completed) {
    +            ActiveMQClientLogger.LOGGER.timeoutFlushingPacket();
    +         }
    +      } catch (InterruptedException e) {
    +         throw new ActiveMQInterruptedException(e);
    +      }
    +   }
    +
    +   /**
    +    * Returns an estimation of the current size of the write buffer in the channel.
    +    * To obtain a more precise value is necessary to use the unsafe API of the channel to
    +    * call the {@link io.netty.channel.ChannelOutboundBuffer#totalPendingWriteBytes()}.
    +    * Anyway, both these values are subject to concurrent modifications.
    +    */
    +   private static int batchBufferSize(Channel channel, int writeBufferHighWaterMark) {
    +      //Channel::bytesBeforeUnwritable is performing a volatile load
    +      //this is the reason why writeBufferHighWaterMark is passed as an argument
    +      final int bytesBeforeUnwritable = (int) channel.bytesBeforeUnwritable();
    +      assert bytesBeforeUnwritable >= 0;
    +      final int writtenBytes = writeBufferHighWaterMark - bytesBeforeUnwritable;
    +      assert writtenBytes >= 0;
    +      return writtenBytes;
    +   }
    +
    +   /**
    +    * When batching is not enabled, it tries to back-pressure the caller thread.
    +    * The back-pressure provided is not before the writeAndFlush request, buf after it: too many threads that are not
    +    * using {@link Channel#isWritable} to know when push unbatched data will risk to cause OOM due to the enqueue of each own {@link Channel#writeAndFlush} requests.
    +    * Trying to provide back-pressure before the {@link Channel#writeAndFlush} request could work, but in certain scenarios it will block {@link Channel#isWritable} to be true.
    +    */
    +   private static ChannelFuture backPressuredWriteAndFlush(final ByteBuf bytes,
    +                                                           final int readableBytes,
    +                                                           final Channel channel,
    +                                                           final ChannelPromise promise) {
    +      final ChannelFuture future;
    +      if (!channel.isWritable()) {
    +         final ChannelPromise channelPromise = promise.isVoid() ? channel.newPromise() : promise;
    +         future = channel.writeAndFlush(bytes, channelPromise);
    +         //is the channel is not writable wait the current request to be flushed, providing backpressuring on the caller thread
    +         if (!channel.isWritable() && !future.awaitUninterruptibly(DEFAULT_BACK_PRESSURE_WAIT_MILLIS)) {
    +            if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
    --- End diff --
    
    > on that case, we never block inside the NettyConnector... we verify isWritable().. and stop writing..
    
    I'm not 100 % sure it could work: it is counterintuitive, but not all the pending writes are counted in the write buffer of Netty and having that buffer writable doesn't mean that you can't go OOM :(
    We'll talk about it tomorrow anyway :+1: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #1119: ARTEMIS-1025 OutOfDirectMemoryError rai...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1119#discussion_r109077531
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java ---
    @@ -46,40 +43,37 @@
     
     public class NettyConnection implements Connection {
     
    -   // Constants -----------------------------------------------------
    -   private static final int BATCHING_BUFFER_SIZE = 8192;
    -
    -   // Attributes ----------------------------------------------------
    +   private static final int DEFAULT_MTU_BYTES = Integer.getInteger("io.netty.mtu", 1460);
    +   //backpressure on unbatched writes is enabled by default
    --- End diff --
    
    Ii will continue evaluting this PR tomorrow (friday)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis issue #1119: ARTEMIS-1025 OutOfDirectMemoryError raised fro...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1119
  
    @franz1981 you will have to call blockUntilWritable on the Large Message Producer, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis issue #1119: ARTEMIS-1025 OutOfDirectMemoryError raised fro...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1119
  
    @franz1981 mind rebasing?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #1119: ARTEMIS-1025 OutOfDirectMemoryError rai...

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1119#discussion_r108835826
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java ---
    @@ -45,40 +43,37 @@
     
     public class NettyConnection implements Connection {
     
    -   // Constants -----------------------------------------------------
    -   private static final int BATCHING_BUFFER_SIZE = 8192;
    -
    -   // Attributes ----------------------------------------------------
    +   private static final int DEFAULT_MTU_BYTES = Integer.getInteger("io.netty.mtu", 1460);
    +   //backpressure on unbatched writes is enabled by default
    +   private static final boolean ENABLED_RELAXED_BACK_PRESSURE = !Boolean.getBoolean("io.netty.disable.backpressure");
    +   //it is the limit while waiting the data to be flushed and alerting (if in trace mode) the event
    +   private static final long DEFAULT_BACK_PRESSURE_WAIT_MILLIS = Long.getLong("io.netty.backpressure.millis", 1_000L);
    +   //if not specified the default batch size will be equal to the ChannelConfig::writeBufferHighWaterMark
    +   private static final int DEFAULT_BATCH_BYTES = Integer.getInteger("io.netty.batch.bytes", Integer.MAX_VALUE);
    +   private static final int DEFAULT_WAIT_MILLIS = 10_000;
     
        protected final Channel channel;
    -
    -   private boolean closed;
    -
        private final BaseConnectionLifeCycleListener listener;
    -
    -   private final boolean batchingEnabled;
    -
        private final boolean directDeliver;
    -
    -   private volatile ActiveMQBuffer batchBuffer;
    -
        private final Map<String, Object> configuration;
    -
    -   private final Semaphore writeLock = new Semaphore(1);
    -
    -   private RemotingConnection protocolConnection;
    -
    -   private boolean ready = true;
    -
        /**
         * if {@link #isWritable(ReadyListener)} returns false, we add a callback
         * here for when the connection (or Netty Channel) becomes available again.
         */
    -   private final Deque<ReadyListener> readyListeners = new LinkedList<>();
    +   private final List<ReadyListener> readyListeners = new ArrayList<>();
    --- End diff --
    
    It has a better locality than the linked one and is simpler to manage for any kind of garbage collector


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis issue #1119: ARTEMIS-1025 OutOfDirectMemoryError raised fro...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1119
  
    something is not feeding back the credits on the logic you made... 
    
    
    run quiver with body-size=50k.. it just hungs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #1119: ARTEMIS-1025 OutOfDirectMemoryError rai...

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1119#discussion_r109086290
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java ---
    @@ -46,40 +43,37 @@
     
     public class NettyConnection implements Connection {
     
    -   // Constants -----------------------------------------------------
    -   private static final int BATCHING_BUFFER_SIZE = 8192;
    -
    -   // Attributes ----------------------------------------------------
    +   private static final int DEFAULT_MTU_BYTES = Integer.getInteger("io.netty.mtu", 1460);
    +   //backpressure on unbatched writes is enabled by default
    --- End diff --
    
    @clebertsuconic Makes sense!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis issue #1119: ARTEMIS-1025 OutOfDirectMemoryError raised fro...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1119
  
    I cannot merge this...
    
    This is blocking eventually.. in certain cases this is causing everything to block.. and hung on AMQP.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis issue #1119: ARTEMIS-1025 OutOfDirectMemoryError raised fro...

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1119
  
    @clebertsuconic it uses an await with timeout on the write and flush promise offered by the netty channel; it has to return by (Netty) contract either in 2 cases:
    - timeout occurs (by default io.netty.backpressure.millis=1000 milliseconds)
    - the data are flushed 
    If it doesn't return we could have some issue on the thread pool used by Netty to perform the flush (the I/O threads), but is pretty strange! Anyway I need feedback mate, I'll ping you ASAP :+1: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis issue #1119: ARTEMIS-1025 OutOfDirectMemoryError raised fro...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1119
  
    @jbertram  I wanted to test with / without it...
    
    not that I don't believe it's good.. I want to make sure about a test I'm doing.. if this would be affected by it or not. will talk to Franz this tuesday.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis issue #1119: ARTEMIS-1025 OutOfDirectMemoryError raised fro...

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1119
  
    @clebertsuconic Yes, I'm planning to use it there!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis issue #1119: ARTEMIS-1025 OutOfDirectMemoryError raised fro...

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1119
  
    @clebertsuconic I've simplified further the connection usage adding only a single method to it: [Connection::blockUntilWritable](https://github.com/franz1981/activemq-artemis/blob/95ef25d17eb29910553153caff743f215c3e69be/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java#L59).
    It is meant to be used when is necessary to block (if allowed by the caller thread) until the Connection's write buffer will have enough room to buffer the request.
    In the NettyConnection implementation it will choose to block only if not in the event loop and considering the Netty write buffer size plus the pending writes on the event loop too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis issue #1119: ARTEMIS-1025 OutOfDirectMemoryError raised fro...

Posted by jbertram <gi...@git.apache.org>.
Github user jbertram commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1119
  
    @clebertsuconic, is this ready to be merged?  I've reviewed it and it looks good as far as I can tell, but I'm not terribly familiar with this area of the code-base.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis issue #1119: ARTEMIS-1025 OutOfDirectMemoryError raised fro...

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1119
  
    @clebertsuconic I've simplified NettyConnection and made a change on Connection too, to allow to perform flow control externally:
    [Connection](https://github.com/franz1981/activemq-artemis/commit/84b1ab95705b99297e45c0aa1a82264ceec0a1d9#diff-7a60dfeb163c8c6c3d0dda152da91b9c)
    
    Then, I've taken out from NettyConnection a blocking use case on [ChannelImpl](https://github.com/franz1981/activemq-artemis/commit/84b1ab95705b99297e45c0aa1a82264ceec0a1d9#diff-6ff722f1dd45dc1c7038f57110f7a70f).
    This will help to avoid OutOfDirectMemory issues on specific use cases.
    Wdyt?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #1119: ARTEMIS-1025 OutOfDirectMemoryError rai...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1119#discussion_r108671820
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java ---
    @@ -45,40 +43,37 @@
     
     public class NettyConnection implements Connection {
     
    -   // Constants -----------------------------------------------------
    -   private static final int BATCHING_BUFFER_SIZE = 8192;
    -
    -   // Attributes ----------------------------------------------------
    +   private static final int DEFAULT_MTU_BYTES = Integer.getInteger("io.netty.mtu", 1460);
    +   //backpressure on unbatched writes is enabled by default
    +   private static final boolean ENABLED_RELAXED_BACK_PRESSURE = !Boolean.getBoolean("io.netty.disable.backpressure");
    +   //it is the limit while waiting the data to be flushed and alerting (if in trace mode) the event
    +   private static final long DEFAULT_BACK_PRESSURE_WAIT_MILLIS = Long.getLong("io.netty.backpressure.millis", 1_000L);
    +   //if not specified the default batch size will be equal to the ChannelConfig::writeBufferHighWaterMark
    +   private static final int DEFAULT_BATCH_BYTES = Integer.getInteger("io.netty.batch.bytes", Integer.MAX_VALUE);
    +   private static final int DEFAULT_WAIT_MILLIS = 10_000;
     
        protected final Channel channel;
    -
    -   private boolean closed;
    -
        private final BaseConnectionLifeCycleListener listener;
    -
    -   private final boolean batchingEnabled;
    -
        private final boolean directDeliver;
    -
    -   private volatile ActiveMQBuffer batchBuffer;
    -
        private final Map<String, Object> configuration;
    -
    -   private final Semaphore writeLock = new Semaphore(1);
    -
    -   private RemotingConnection protocolConnection;
    -
    -   private boolean ready = true;
    -
        /**
         * if {@link #isWritable(ReadyListener)} returns false, we add a callback
         * here for when the connection (or Netty Channel) becomes available again.
         */
    -   private final Deque<ReadyListener> readyListeners = new LinkedList<>();
    +   private final List<ReadyListener> readyListeners = new ArrayList<>();
    --- End diff --
    
    What was the issue with LinkedList? Why using an ArrayList instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #1119: ARTEMIS-1025 OutOfDirectMemoryError rai...

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1119#discussion_r109278189
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java ---
    @@ -92,69 +86,143 @@ public NettyConnection(final Map<String, Object> configuration,
     
           this.listener = listener;
     
    +      this.directDeliver = directDeliver;
    +
           this.batchingEnabled = batchingEnabled;
     
    -      this.directDeliver = directDeliver;
    +      this.writeBufferHighWaterMark = this.channel.config().getWriteBufferHighWaterMark();
    +
    +      this.batchLimit = batchingEnabled ? Math.min(this.writeBufferHighWaterMark, DEFAULT_BATCH_BYTES) : 0;
    +   }
    +
    +   private static void waitFor(ChannelPromise promise, long millis) {
    +      try {
    +         final boolean completed = promise.await(millis);
    +         if (!completed) {
    +            ActiveMQClientLogger.LOGGER.timeoutFlushingPacket();
    +         }
    +      } catch (InterruptedException e) {
    +         throw new ActiveMQInterruptedException(e);
    +      }
    +   }
    +
    +   /**
    +    * Returns an estimation of the current size of the write buffer in the channel.
    +    * To obtain a more precise value is necessary to use the unsafe API of the channel to
    +    * call the {@link io.netty.channel.ChannelOutboundBuffer#totalPendingWriteBytes()}.
    +    * Anyway, both these values are subject to concurrent modifications.
    +    */
    +   private static int batchBufferSize(Channel channel, int writeBufferHighWaterMark) {
    +      //Channel::bytesBeforeUnwritable is performing a volatile load
    +      //this is the reason why writeBufferHighWaterMark is passed as an argument
    +      final int bytesBeforeUnwritable = (int) channel.bytesBeforeUnwritable();
    +      assert bytesBeforeUnwritable >= 0;
    +      final int writtenBytes = writeBufferHighWaterMark - bytesBeforeUnwritable;
    +      assert writtenBytes >= 0;
    +      return writtenBytes;
    +   }
    +
    +   /**
    +    * When batching is not enabled, it tries to back-pressure the caller thread.
    +    * The back-pressure provided is not before the writeAndFlush request, buf after it: too many threads that are not
    +    * using {@link Channel#isWritable} to know when push unbatched data will risk to cause OOM due to the enqueue of each own {@link Channel#writeAndFlush} requests.
    +    * Trying to provide back-pressure before the {@link Channel#writeAndFlush} request could work, but in certain scenarios it will block {@link Channel#isWritable} to be true.
    +    */
    +   private static ChannelFuture backPressuredWriteAndFlush(final ByteBuf bytes,
    +                                                           final int readableBytes,
    +                                                           final Channel channel,
    +                                                           final ChannelPromise promise) {
    +      final ChannelFuture future;
    +      if (!channel.isWritable()) {
    +         final ChannelPromise channelPromise = promise.isVoid() ? channel.newPromise() : promise;
    +         future = channel.writeAndFlush(bytes, channelPromise);
    +         //is the channel is not writable wait the current request to be flushed, providing backpressuring on the caller thread
    +         if (!channel.isWritable() && !future.awaitUninterruptibly(DEFAULT_BACK_PRESSURE_WAIT_MILLIS)) {
    +            if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
    --- End diff --
    
    Agree with the warn and about reaction strategy: a sensible default and customization via connection properties would be welcome.
    I'm personally a fan of "kill the slowest connections", but I'm worried it could lead to a storm of users complaining that while sending 1 [EB](https://it.wikipedia.org/wiki/Exabyte) of message strangely the broker has died while screaming: ``"WARN: killed connection X - too slow; required 10 days to send -9223372036854775808 bytes"``  :)
    I'm joking, but effectively I'm just worried to do not break everything adding here new behaviours that would break (too much) the expectations of any piece of code that will request to write something on the network: maybe an external job (running in the even loop) that will monitor the connections and detecting the slowest ones (using the new methods I've added to monitor the buffering state) applying the configured countermeasures could do the job, leaving the ``NettyConnection`` simpler and with less responsabilities, wdyt?
    Just thinking loud



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---