You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/04/06 15:28:49 UTC

[1/2] activemq-artemis git commit: ARTEMIS-1025 Improve flow control on NettyConnection

Repository: activemq-artemis
Updated Branches:
  refs/heads/master d0ae3f25a -> 25b5ddd90


ARTEMIS-1025 Improve flow control on NettyConnection


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/27cfb2d9
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/27cfb2d9
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/27cfb2d9

Branch: refs/heads/master
Commit: 27cfb2d90208403cec6727dd595f9dd1380ef441
Parents: d0ae3f2
Author: Francesco Nigro <ni...@gmail.com>
Authored: Wed Mar 15 14:52:52 2017 +0100
Committer: Francesco Nigro <ni...@gmail.com>
Committed: Thu Apr 6 17:22:05 2017 +0200

----------------------------------------------------------------------
 .../remoting/impl/netty/NettyConnection.java    | 474 +++++++++++--------
 .../artemis/spi/core/remoting/Connection.java   |  17 +
 2 files changed, 300 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/27cfb2d9/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
index 679844a..d8b2315 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
@@ -17,22 +17,21 @@
 package org.apache.activemq.artemis.core.remoting.impl.netty;
 
 import java.net.SocketAddress;
-import java.util.Deque;
-import java.util.LinkedList;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.EventLoop;
 import io.netty.handler.ssl.SslHandler;
-import io.netty.util.concurrent.GenericFutureListener;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
@@ -46,40 +45,35 @@ import org.apache.activemq.artemis.utils.IPV6Util;
 
 public class NettyConnection implements Connection {
 
-   // Constants -----------------------------------------------------
-   private static final int BATCHING_BUFFER_SIZE = 8192;
-
-   // Attributes ----------------------------------------------------
+   private static final int DEFAULT_BATCH_BYTES = Integer.getInteger("io.netty.batch.bytes", 8192);
+   private static final int DEFAULT_WAIT_MILLIS = 10_000;
 
    protected final Channel channel;
-
-   private boolean closed;
-
    private final BaseConnectionLifeCycleListener listener;
-
-   private final boolean batchingEnabled;
-
    private final boolean directDeliver;
-
-   private volatile ActiveMQBuffer batchBuffer;
-
    private final Map<String, Object> configuration;
-
-   private final Semaphore writeLock = new Semaphore(1);
-
-   private RemotingConnection protocolConnection;
-
-   private boolean ready = true;
-
    /**
     * if {@link #isWritable(ReadyListener)} returns false, we add a callback
     * here for when the connection (or Netty Channel) becomes available again.
     */
-   private final Deque<ReadyListener> readyListeners = new LinkedList<>();
+   private final List<ReadyListener> readyListeners = new ArrayList<>();
+   private final ThreadLocal<ArrayList<ReadyListener>> localListenersPool = ThreadLocal.withInitial(ArrayList::new);
 
-   // Static --------------------------------------------------------
+   private final boolean batchingEnabled;
+   private final int writeBufferHighWaterMark;
+   private final int batchLimit;
 
-   // Constructors --------------------------------------------------
+   /**
+    * This counter is splitted in 2 variables to write it with less performance
+    * impact: no volatile get is required to update its value
+    */
+   private final AtomicLong pendingWritesOnEventLoopView = new AtomicLong();
+   private long pendingWritesOnEventLoop = 0;
+
+   private boolean closed;
+   private RemotingConnection protocolConnection;
+
+   private boolean ready = true;
 
    public NettyConnection(final Map<String, Object> configuration,
                           final Channel channel,
@@ -92,28 +86,72 @@ public class NettyConnection implements Connection {
 
       this.listener = listener;
 
+      this.directDeliver = directDeliver;
+
       this.batchingEnabled = batchingEnabled;
 
-      this.directDeliver = directDeliver;
+      this.writeBufferHighWaterMark = this.channel.config().getWriteBufferHighWaterMark();
+
+      this.batchLimit = batchingEnabled ? Math.min(this.writeBufferHighWaterMark, DEFAULT_BATCH_BYTES) : 0;
+   }
+
+   private static void waitFor(ChannelPromise promise, long millis) {
+      try {
+         final boolean completed = promise.await(millis);
+         if (!completed) {
+            ActiveMQClientLogger.LOGGER.timeoutFlushingPacket();
+         }
+      } catch (InterruptedException e) {
+         throw new ActiveMQInterruptedException(e);
+      }
+   }
+
+   /**
+    * Returns an estimation of the current size of the write buffer in the channel.
+    * To obtain a more precise value is necessary to use the unsafe API of the channel to
+    * call the {@link io.netty.channel.ChannelOutboundBuffer#totalPendingWriteBytes()}.
+    * Anyway, both these values are subject to concurrent modifications.
+    */
+   private static int batchBufferSize(Channel channel, int writeBufferHighWaterMark) {
+      //Channel::bytesBeforeUnwritable is performing a volatile load
+      //this is the reason why writeBufferHighWaterMark is passed as an argument
+      final int bytesBeforeUnwritable = (int) channel.bytesBeforeUnwritable();
+      assert bytesBeforeUnwritable >= 0;
+      final int writtenBytes = writeBufferHighWaterMark - bytesBeforeUnwritable;
+      assert writtenBytes >= 0;
+      return writtenBytes;
+   }
+
+   public final int pendingWritesOnChannel() {
+      return batchBufferSize(this.channel, this.writeBufferHighWaterMark);
    }
 
-   // Public --------------------------------------------------------
+   public final long pendingWritesOnEventLoop() {
+      final EventLoop eventLoop = channel.eventLoop();
+      final boolean inEventLoop = eventLoop.inEventLoop();
+      final long pendingWritesOnEventLoop;
+      if (inEventLoop) {
+         pendingWritesOnEventLoop = this.pendingWritesOnEventLoop;
+      } else {
+         pendingWritesOnEventLoop = pendingWritesOnEventLoopView.get();
+      }
+      return pendingWritesOnEventLoop;
+   }
 
-   public Channel getNettyChannel() {
+   public final Channel getNettyChannel() {
       return channel;
    }
-   // Connection implementation ----------------------------
 
    @Override
-   public void setAutoRead(boolean autoRead) {
+   public final void setAutoRead(boolean autoRead) {
       channel.config().setAutoRead(autoRead);
    }
 
    @Override
-   public boolean isWritable(ReadyListener callback) {
+   public final boolean isWritable(ReadyListener callback) {
       synchronized (readyListeners) {
          if (!ready) {
-            readyListeners.push(callback);
+            readyListeners.add(callback);
          }
 
          return ready;
@@ -121,40 +159,44 @@ public class NettyConnection implements Connection {
    }
 
    @Override
-   public void fireReady(final boolean ready) {
-      LinkedList<ReadyListener> readyToCall = null;
+   public final void fireReady(final boolean ready) {
+      final ArrayList<ReadyListener> readyToCall = localListenersPool.get();
       synchronized (readyListeners) {
          this.ready = ready;
 
          if (ready) {
-            for (;;) {
-               ReadyListener readyListener = readyListeners.poll();
-               if (readyListener == null) {
-                  break;
-               }
-
-               if (readyToCall == null) {
-                  readyToCall = new LinkedList<>();
+            final int size = this.readyListeners.size();
+            readyToCall.ensureCapacity(size);
+            try {
+               for (int i = 0; i < size; i++) {
+                  final ReadyListener readyListener = readyListeners.get(i);
+                  if (readyListener == null) {
+                     break;
+                  }
+                  readyToCall.add(readyListener);
                }
-
-               readyToCall.add(readyListener);
+            } finally {
+               readyListeners.clear();
             }
          }
       }
-
-      if (readyToCall != null) {
-         for (ReadyListener readyListener : readyToCall) {
+      try {
+         final int size = readyToCall.size();
+         for (int i = 0; i < size; i++) {
             try {
+               final ReadyListener readyListener = readyToCall.get(i);
                readyListener.readyForWriting();
             } catch (Throwable logOnly) {
                ActiveMQClientLogger.LOGGER.warn(logOnly.getMessage(), logOnly);
             }
          }
+      } finally {
+         readyToCall.clear();
       }
    }
 
    @Override
-   public void forceClose() {
+   public final void forceClose() {
       if (channel != null) {
          try {
             channel.close();
@@ -169,38 +211,35 @@ public class NettyConnection implements Connection {
     *
     * @return
     */
-   public Channel getChannel() {
+   public final Channel getChannel() {
       return channel;
    }
 
    @Override
-   public RemotingConnection getProtocolConnection() {
+   public final RemotingConnection getProtocolConnection() {
       return protocolConnection;
    }
 
    @Override
-   public void setProtocolConnection(RemotingConnection protocolConnection) {
+   public final void setProtocolConnection(RemotingConnection protocolConnection) {
       this.protocolConnection = protocolConnection;
    }
 
    @Override
-   public void close() {
+   public final void close() {
       if (closed) {
          return;
       }
-
-      final SslHandler sslHandler = (SslHandler) channel.pipeline().get("ssl");
       EventLoop eventLoop = channel.eventLoop();
       boolean inEventLoop = eventLoop.inEventLoop();
       //if we are in an event loop we need to close the channel after the writes have finished
       if (!inEventLoop) {
+         final SslHandler sslHandler = (SslHandler) channel.pipeline().get("ssl");
          closeSSLAndChannel(sslHandler, channel, false);
       } else {
-         eventLoop.execute(new Runnable() {
-            @Override
-            public void run() {
-               closeSSLAndChannel(sslHandler, channel, true);
-            }
+         eventLoop.execute(() -> {
+            final SslHandler sslHandler = (SslHandler) channel.pipeline().get("ssl");
+            closeSSLAndChannel(sslHandler, channel, true);
          });
       }
 
@@ -211,143 +250,206 @@ public class NettyConnection implements Connection {
 
    @Override
    public ActiveMQBuffer createTransportBuffer(final int size) {
-      return new ChannelBufferWrapper(PooledByteBufAllocator.DEFAULT.directBuffer(size), true);
+      try {
+         return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true);
+      } catch (OutOfMemoryError oom) {
+         final long totalPendingWriteBytes = batchBufferSize(this.channel, this.writeBufferHighWaterMark);
+         ActiveMQClientLogger.LOGGER.warn("Trying to allocate " + size + " bytes, System is throwing OutOfMemoryError on NettyConnection " + this + ", there are currently " + "pendingWrites: [NETTY] -> " + totalPendingWriteBytes + "[EVENT LOOP] -> " + pendingWritesOnEventLoopView.get() + " causes: " + oom.getMessage(), oom);
+         throw oom;
+      }
    }
 
    @Override
-   public Object getID() {
+   public final Object getID() {
       // TODO: Think of it
       return channel.hashCode();
    }
 
    // This is called periodically to flush the batch buffer
    @Override
-   public void checkFlushBatchBuffer() {
-      if (!batchingEnabled) {
-         return;
-      }
-
-      if (writeLock.tryAcquire()) {
-         try {
-            if (batchBuffer != null && batchBuffer.readable()) {
-               channel.writeAndFlush(batchBuffer.byteBuf());
-
-               batchBuffer = createTransportBuffer(BATCHING_BUFFER_SIZE);
-            }
-         } finally {
-            writeLock.release();
+   public final void checkFlushBatchBuffer() {
+      if (this.batchingEnabled) {
+         //perform the flush only if necessary
+         final int batchBufferSize = batchBufferSize(this.channel, this.writeBufferHighWaterMark);
+         if (batchBufferSize > 0) {
+            this.channel.flush();
          }
       }
    }
 
    @Override
-   public void write(final ActiveMQBuffer buffer) {
+   public final void write(final ActiveMQBuffer buffer) {
       write(buffer, false, false);
    }
 
    @Override
-   public void write(ActiveMQBuffer buffer, final boolean flush, final boolean batched) {
+   public final void write(ActiveMQBuffer buffer, final boolean flush, final boolean batched) {
       write(buffer, flush, batched, null);
    }
 
    @Override
-   public void write(ActiveMQBuffer buffer,
-                     final boolean flush,
-                     final boolean batched,
-                     final ChannelFutureListener futureListener) {
-
-      try {
-         writeLock.acquire();
-
-         try {
-            if (batchBuffer == null && batchingEnabled && batched && !flush) {
-               // Lazily create batch buffer
-
-               batchBuffer = ActiveMQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
-            }
-
-            if (batchBuffer != null) {
-               batchBuffer.writeBytes(buffer, 0, buffer.writerIndex());
-
-               if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE || !batched || flush) {
-                  // If the batch buffer is full or it's flush param or not batched then flush the buffer
-
-                  buffer = batchBuffer;
-               } else {
-                  return;
-               }
-
-               if (!batched || flush) {
-                  batchBuffer = null;
-               } else {
-                  // Create a new buffer
+   public final boolean blockUntilWritable(final int requiredCapacity, final long timeout, final TimeUnit timeUnit) {
+      final boolean isAllowedToBlock = isAllowedToBlock();
+      if (!isAllowedToBlock) {
+         if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
+            ActiveMQClientLogger.LOGGER.debug("Calling blockUntilWritable using a thread where it's not allowed");
+         }
+         return canWrite(requiredCapacity);
+      } else {
+         final long timeoutNanos = timeUnit.toNanos(timeout);
+         final long deadline = System.nanoTime() + timeoutNanos;
+         //choose wait time unit size
+         final long parkNanos;
+         //if is requested to wait more than a millisecond than we could use
+         if (timeoutNanos >= 1_000_000L) {
+            parkNanos = 100_000L;
+         } else {
+            //reduce it doesn't make sense, only a spin loop could be enough precise with the most OS
+            parkNanos = 1000L;
+         }
+         boolean canWrite;
+         while (!(canWrite = canWrite(requiredCapacity)) && System.nanoTime() < deadline) {
+            LockSupport.parkNanos(parkNanos);
+         }
+         return canWrite;
+      }
+   }
 
-                  batchBuffer = ActiveMQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
-               }
-            }
+   private boolean isAllowedToBlock() {
+      final EventLoop eventLoop = channel.eventLoop();
+      final boolean inEventLoop = eventLoop.inEventLoop();
+      return !inEventLoop;
+   }
 
-            // depending on if we need to flush or not we can use a voidPromise or
-            // use a normal promise
-            final ByteBuf buf = buffer.byteBuf();
-            final ChannelPromise promise;
-            if (flush || futureListener != null) {
-               promise = channel.newPromise();
-            } else {
-               promise = channel.voidPromise();
-            }
+   private boolean canWrite(final int requiredCapacity) {
+      //evaluate if the write request could be taken:
+      //there is enough space in the write buffer?
+      //The pending writes on event loop will eventually go into the Netty write buffer, hence consider them
+      //as part of the heuristic!
+      final long pendingWritesOnEventLoop = this.pendingWritesOnEventLoop();
+      final long totalPendingWrites = pendingWritesOnEventLoop + this.pendingWritesOnChannel();
+      final boolean canWrite;
+      if (requiredCapacity > this.writeBufferHighWaterMark) {
+         canWrite = totalPendingWrites == 0;
+      } else {
+         canWrite = (totalPendingWrites + requiredCapacity) <= this.writeBufferHighWaterMark;
+      }
+      return canWrite;
+   }
 
-            EventLoop eventLoop = channel.eventLoop();
-            boolean inEventLoop = eventLoop.inEventLoop();
-            if (!inEventLoop) {
-               if (futureListener != null) {
-                  channel.writeAndFlush(buf, promise).addListener(futureListener);
-               } else {
-                  channel.writeAndFlush(buf, promise);
-               }
-            } else {
-               // create a task which will be picked up by the eventloop and trigger the write.
-               // This is mainly needed as this method is triggered by different threads for the same channel.
-               // if we not do this we may produce out of order writes.
-               final Runnable task = new Runnable() {
-                  @Override
-                  public void run() {
-                     if (futureListener != null) {
-                        channel.writeAndFlush(buf, promise).addListener(futureListener);
-                     } else {
-                        channel.writeAndFlush(buf, promise);
-                     }
-                  }
-               };
-               // execute the task on the eventloop
-               eventLoop.execute(task);
-            }
+   @Override
+   public final void write(ActiveMQBuffer buffer,
+                           final boolean flush,
+                           final boolean batched,
+                           final ChannelFutureListener futureListener) {
+      final int readableBytes = buffer.readableBytes();
+      if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
+         final int remainingBytes = this.writeBufferHighWaterMark - readableBytes;
+         if (remainingBytes < 0) {
+            ActiveMQClientLogger.LOGGER.debug("a write request is exceeding by " + (-remainingBytes) + " bytes the writeBufferHighWaterMark size [ " + this.writeBufferHighWaterMark + " ] : consider to set it at least of " + readableBytes + " bytes");
+         }
+      }
+      //no need to lock because the Netty's channel is thread-safe
+      //and the order of write is ensured by the order of the write calls
+      final EventLoop eventLoop = channel.eventLoop();
+      final boolean inEventLoop = eventLoop.inEventLoop();
+      if (!inEventLoop) {
+         writeNotInEventLoop(buffer, flush, batched, futureListener);
+      } else {
+         // OLD COMMENT:
+         // create a task which will be picked up by the eventloop and trigger the write.
+         // This is mainly needed as this method is triggered by different threads for the same channel.
+         // if we not do this we may produce out of order writes.
+         // NOTE:
+         // the submitted task does not effect in any way the current written size in the batch
+         // until the loop will process it, leading to a longer life for the ActiveMQBuffer buffer!!!
+         // To solve it, will be necessary to manually perform the count of the current batch instead of rely on the
+         // Channel:Config::writeBufferHighWaterMark value.
+         this.pendingWritesOnEventLoop += readableBytes;
+         this.pendingWritesOnEventLoopView.lazySet(pendingWritesOnEventLoop);
+         eventLoop.execute(() -> {
+            this.pendingWritesOnEventLoop -= readableBytes;
+            this.pendingWritesOnEventLoopView.lazySet(pendingWritesOnEventLoop);
+            writeInEventLoop(buffer, flush, batched, futureListener);
+         });
+      }
+   }
 
-            // only try to wait if not in the eventloop otherwise we will produce a deadlock
-            if (flush && !inEventLoop) {
-               while (true) {
-                  try {
-                     boolean ok = promise.await(10000);
+   private void writeNotInEventLoop(ActiveMQBuffer buffer,
+                                    final boolean flush,
+                                    final boolean batched,
+                                    final ChannelFutureListener futureListener) {
+      final Channel channel = this.channel;
+      final ChannelPromise promise;
+      if (flush || (futureListener != null)) {
+         promise = channel.newPromise();
+      } else {
+         promise = channel.voidPromise();
+      }
+      final ChannelFuture future;
+      final ByteBuf bytes = buffer.byteBuf();
+      final int readableBytes = bytes.readableBytes();
+      assert readableBytes >= 0;
+      final int writeBatchSize = this.batchLimit;
+      final boolean batchingEnabled = this.batchingEnabled;
+      if (batchingEnabled && batched && !flush && readableBytes < writeBatchSize) {
+         future = writeBatch(bytes, readableBytes, promise);
+      } else {
+         future = channel.writeAndFlush(bytes, promise);
+      }
+      if (futureListener != null) {
+         future.addListener(futureListener);
+      }
+      if (flush) {
+         //NOTE: this code path seems used only on RemotingConnection::disconnect
+         waitFor(promise, DEFAULT_WAIT_MILLIS);
+      }
+   }
 
-                     if (!ok) {
-                        ActiveMQClientLogger.LOGGER.timeoutFlushingPacket();
-                     }
+   private void writeInEventLoop(ActiveMQBuffer buffer,
+                                 final boolean flush,
+                                 final boolean batched,
+                                 final ChannelFutureListener futureListener) {
+      //no need to lock because the Netty's channel is thread-safe
+      //and the order of write is ensured by the order of the write calls
+      final ChannelPromise promise;
+      if (futureListener != null) {
+         promise = channel.newPromise();
+      } else {
+         promise = channel.voidPromise();
+      }
+      final ChannelFuture future;
+      final ByteBuf bytes = buffer.byteBuf();
+      final int readableBytes = bytes.readableBytes();
+      final int writeBatchSize = this.batchLimit;
+      if (this.batchingEnabled && batched && !flush && readableBytes < writeBatchSize) {
+         future = writeBatch(bytes, readableBytes, promise);
+      } else {
+         future = channel.writeAndFlush(bytes, promise);
+      }
+      if (futureListener != null) {
+         future.addListener(futureListener);
+      }
+   }
 
-                     break;
-                  } catch (InterruptedException e) {
-                     throw new ActiveMQInterruptedException(e);
-                  }
-               }
-            }
-         } finally {
-            writeLock.release();
-         }
-      } catch (InterruptedException e) {
-         throw new ActiveMQInterruptedException(e);
+   private ChannelFuture writeBatch(final ByteBuf bytes, final int readableBytes, final ChannelPromise promise) {
+      final int batchBufferSize = batchBufferSize(channel, this.writeBufferHighWaterMark);
+      final int nextBatchSize = batchBufferSize + readableBytes;
+      if (nextBatchSize > batchLimit) {
+         //request to flush before writing, to create the chance to make the channel writable again
+         channel.flush();
+         //let netty use its write batching ability
+         return channel.write(bytes, promise);
+      } else if (nextBatchSize == batchLimit) {
+         return channel.writeAndFlush(bytes, promise);
+      } else {
+         //let netty use its write batching ability
+         return channel.write(bytes, promise);
       }
    }
 
    @Override
-   public String getRemoteAddress() {
+   public final String getRemoteAddress() {
       SocketAddress address = channel.remoteAddress();
       if (address == null) {
          return null;
@@ -356,7 +458,7 @@ public class NettyConnection implements Connection {
    }
 
    @Override
-   public String getLocalAddress() {
+   public final String getLocalAddress() {
       SocketAddress address = channel.localAddress();
       if (address == null) {
          return null;
@@ -364,18 +466,18 @@ public class NettyConnection implements Connection {
       return "tcp://" + IPV6Util.encloseHost(address.toString());
    }
 
-   public boolean isDirectDeliver() {
+   public final boolean isDirectDeliver() {
       return directDeliver;
    }
 
    //never allow this
    @Override
-   public ActiveMQPrincipal getDefaultActiveMQPrincipal() {
+   public final ActiveMQPrincipal getDefaultActiveMQPrincipal() {
       return null;
    }
 
    @Override
-   public TransportConfiguration getConnectorConfig() {
+   public final TransportConfiguration getConnectorConfig() {
       if (configuration != null) {
          return new TransportConfiguration(NettyConnectorFactory.class.getName(), this.configuration);
       } else {
@@ -384,46 +486,36 @@ public class NettyConnection implements Connection {
    }
 
    @Override
-   public boolean isUsingProtocolHandling() {
+   public final boolean isUsingProtocolHandling() {
       return true;
    }
 
-   // Public --------------------------------------------------------
-
    @Override
-   public String toString() {
+   public final String toString() {
       return super.toString() + "[local= " + channel.localAddress() + ", remote=" + channel.remoteAddress() + "]";
    }
 
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
    private void closeSSLAndChannel(SslHandler sslHandler, final Channel channel, boolean inEventLoop) {
+      checkFlushBatchBuffer();
       if (sslHandler != null) {
          try {
             ChannelFuture sslCloseFuture = sslHandler.close();
-            sslCloseFuture.addListener(new GenericFutureListener<ChannelFuture>() {
-               @Override
-               public void operationComplete(ChannelFuture future) throws Exception {
-                  channel.close();
-               }
-            });
-            if (!inEventLoop && !sslCloseFuture.awaitUninterruptibly(10000)) {
+            sslCloseFuture.addListener(future -> channel.close());
+            if (!inEventLoop && !sslCloseFuture.awaitUninterruptibly(DEFAULT_WAIT_MILLIS)) {
                ActiveMQClientLogger.LOGGER.timeoutClosingSSL();
             }
          } catch (Throwable t) {
             // ignore
+            if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
+               ActiveMQClientLogger.LOGGER.trace(t.getMessage(), t);
+            }
          }
       } else {
          ChannelFuture closeFuture = channel.close();
-         if (!inEventLoop && !closeFuture.awaitUninterruptibly(10000)) {
+         if (!inEventLoop && !closeFuture.awaitUninterruptibly(DEFAULT_WAIT_MILLIS)) {
             ActiveMQClientLogger.LOGGER.timeoutClosingNettyChannel();
          }
       }
    }
-   // Inner classes -------------------------------------------------
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/27cfb2d9/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
index 7ab0c40..56d1bc3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.spi.core.remoting;
 
+import java.util.concurrent.TimeUnit;
+
 import io.netty.channel.ChannelFutureListener;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
@@ -41,6 +43,21 @@ public interface Connection {
 
    boolean isWritable(ReadyListener listener);
 
+   /**
+    * Causes the current thread to wait until the connection can enqueue the required capacity unless the specified waiting time elapses.
+    * The available capacity of the connection could change concurrently hence this method is suitable to perform precise flow-control
+    * only in a single writer case, while its precision decrease inversely proportional with the rate and the number of concurrent writers.
+    * If the current thread is not allowed to block the timeout will be ignored dependently on the connection type.
+    *
+    * @param requiredCapacity the capacity in bytes to be enqueued
+    * @param timeout          the maximum time to wait
+    * @param timeUnit         the time unit of the timeout argument
+    * @return {@code true} if the connection can enqueue {@code requiredCapacity} bytes, {@code false} otherwise
+    */
+   default boolean blockUntilWritable(final int requiredCapacity, final long timeout, final TimeUnit timeUnit) {
+      return true;
+   }
+
    void fireReady(boolean ready);
 
    /**


[2/2] activemq-artemis git commit: This closes #1119

Posted by cl...@apache.org.
This closes #1119


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/25b5ddd9
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/25b5ddd9
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/25b5ddd9

Branch: refs/heads/master
Commit: 25b5ddd90c6e2b5cc8f3bba0b7acf3bdb2f1d141
Parents: d0ae3f2 27cfb2d
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Apr 6 11:25:18 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Apr 6 11:25:18 2017 -0400

----------------------------------------------------------------------
 .../remoting/impl/netty/NettyConnection.java    | 474 +++++++++++--------
 .../artemis/spi/core/remoting/Connection.java   |  17 +
 2 files changed, 300 insertions(+), 191 deletions(-)
----------------------------------------------------------------------