You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/06/09 11:37:46 UTC

[3/5] storm git commit: STORM-1038: Upgrade to latest Netty

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClientState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClientState.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClientState.java
index 45fa2d8..1818dfa 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClientState.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClientState.java
@@ -12,15 +12,10 @@
 
 package org.apache.storm.messaging.netty;
 
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelLocal;
+import org.apache.storm.shade.io.netty.util.AttributeKey;
 
 final class SaslNettyClientState {
 
-    public static final ChannelLocal<SaslNettyClient> getSaslNettyClient = new ChannelLocal<SaslNettyClient>() {
-        protected SaslNettyClient initialValue(Channel channel) {
-            return null;
-        }
-    };
+    public static final AttributeKey<SaslNettyClient> SASL_NETTY_CLIENT = AttributeKey.valueOf("sasl.netty.client");
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyServerState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyServerState.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyServerState.java
index f8cb386..2c79519 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyServerState.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyServerState.java
@@ -12,14 +12,9 @@
 
 package org.apache.storm.messaging.netty;
 
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelLocal;
+import org.apache.storm.shade.io.netty.util.AttributeKey;
 
 final class SaslNettyServerState {
 
-    public static final ChannelLocal<SaslNettyServer> getSaslNettyServer = new ChannelLocal<SaslNettyServer>() {
-        protected SaslNettyServer initialValue(Channel channel) {
-            return null;
-        }
-    };
+    public static final AttributeKey<SaslNettyServer> SASL_NETTY_SERVER = AttributeKey.valueOf("sasl.netty.server");
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormClientHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormClientHandler.java
index 293cd38..25b0aa2 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormClientHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormClientHandler.java
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
@@ -13,21 +14,18 @@
 package org.apache.storm.messaging.netty;
 
 import java.io.IOException;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelStateEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.Channels;
-import org.apache.storm.shade.org.jboss.netty.channel.MessageEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.channel.ChannelInboundHandlerAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
+public class SaslStormClientHandler extends ChannelInboundHandlerAdapter {
 
     private static final Logger LOG = LoggerFactory
         .getLogger(SaslStormClientHandler.class);
-    long start_time;
-    private ISaslClient client;
+    private final long start_time;
+    private final ISaslClient client;
     /**
      * Used for client or server's token to send or receive from each other.
      */
@@ -41,25 +39,22 @@ public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
     }
 
     @Override
-    public void channelConnected(ChannelHandlerContext ctx,
-                                 ChannelStateEvent event) {
-        // register the newly established channel
-        Channel channel = ctx.getChannel();
-        client.channelConnected(channel);
+    public void channelActive(ChannelHandlerContext ctx) {
+        Channel channel = ctx.channel();
 
+        LOG.info("Connection established from " + channel.localAddress()
+            + " to " + channel.remoteAddress());
         try {
-            SaslNettyClient saslNettyClient = SaslNettyClientState.getSaslNettyClient
-                .get(channel);
+            SaslNettyClient saslNettyClient = channel.attr(SaslNettyClientState.SASL_NETTY_CLIENT).get();
 
             if (saslNettyClient == null) {
                 LOG.debug("Creating saslNettyClient now " + "for channel: "
                           + channel);
                 saslNettyClient = new SaslNettyClient(name, token);
-                SaslNettyClientState.getSaslNettyClient.set(channel,
-                                                            saslNettyClient);
+                channel.attr(SaslNettyClientState.SASL_NETTY_CLIENT).set(saslNettyClient);
             }
             LOG.debug("Sending SASL_TOKEN_MESSAGE_REQUEST");
-            channel.write(ControlMessage.SASL_TOKEN_MESSAGE_REQUEST);
+            channel.writeAndFlush(ControlMessage.SASL_TOKEN_MESSAGE_REQUEST, channel.voidPromise());
         } catch (Exception e) {
             LOG.error("Failed to authenticate with server " + "due to error: ",
                       e);
@@ -67,54 +62,65 @@ public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent event)
-        throws Exception {
+    public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception {
         LOG.debug("send/recv time (ms): {}",
                   (System.currentTimeMillis() - start_time));
 
-        Channel channel = ctx.getChannel();
-
+        // examine the response message from server
+        if (message instanceof ControlMessage) {
+            handleControlMessage(ctx, (ControlMessage) message);
+        } else if (message instanceof SaslMessageToken) {
+            handleSaslMessageToken(ctx, (SaslMessageToken) message);
+        } else {
+            LOG.error("Unexpected message from server: {}", message);
+        }
+    }
+    
+    private SaslNettyClient getChannelSaslNettyClient(Channel channel) throws Exception {
         // Generate SASL response to server using Channel-local SASL client.
-        SaslNettyClient saslNettyClient = SaslNettyClientState.getSaslNettyClient
-            .get(channel);
+        SaslNettyClient saslNettyClient = channel.attr(SaslNettyClientState.SASL_NETTY_CLIENT).get();
         if (saslNettyClient == null) {
             throw new Exception("saslNettyClient was unexpectedly "
-                                + "null for channel: " + channel);
+                + "null for channel: " + channel);
         }
+        return saslNettyClient;
+    }
+    
+    private void handleControlMessage(ChannelHandlerContext ctx, ControlMessage controlMessage) throws Exception {
+        SaslNettyClient saslNettyClient = getChannelSaslNettyClient(ctx.channel());
+        if (controlMessage == ControlMessage.SASL_COMPLETE_REQUEST) {
+            LOG.debug("Server has sent us the SaslComplete "
+                + "message. Allowing normal work to proceed.");
 
-        // examine the response message from server
-        if (event.getMessage() instanceof ControlMessage) {
-            ControlMessage msg = (ControlMessage) event.getMessage();
-            if (msg == ControlMessage.SASL_COMPLETE_REQUEST) {
-                LOG.debug("Server has sent us the SaslComplete "
-                          + "message. Allowing normal work to proceed.");
-
-                if (!saslNettyClient.isComplete()) {
-                    LOG.error("Server returned a Sasl-complete message, "
-                              + "but as far as we can tell, we are not authenticated yet.");
-                    throw new Exception("Server returned a "
-                                        + "Sasl-complete message, but as far as "
-                                        + "we can tell, we are not authenticated yet.");
-                }
-                ctx.getPipeline().remove(this);
-                this.client.channelReady();
-
-                // We call fireMessageReceived since the client is allowed to
-                // perform this request. The client's request will now proceed
-                // to the next pipeline component namely StormClientHandler.
-                Channels.fireMessageReceived(ctx, msg);
-                return;
+            if (!saslNettyClient.isComplete()) {
+                LOG.error("Server returned a Sasl-complete message, "
+                    + "but as far as we can tell, we are not authenticated yet.");
+                throw new Exception("Server returned a "
+                    + "Sasl-complete message, but as far as "
+                    + "we can tell, we are not authenticated yet.");
             }
+            ctx.pipeline().remove(this);
+            this.client.channelReady(ctx.channel());
+
+            // We call fireMessageRead since the client is allowed to
+            // perform this request. The client's request will now proceed
+            // to the next pipeline component namely StormClientHandler.
+            ctx.fireChannelRead(controlMessage);
+        } else {
+            LOG.warn("Unexpected control message: {}", controlMessage);
         }
-        SaslMessageToken saslTokenMessage = (SaslMessageToken) event
-            .getMessage();
+    }
+    
+    private void handleSaslMessageToken(ChannelHandlerContext ctx, SaslMessageToken saslMessageToken) throws Exception {
+        Channel channel = ctx.channel();
+        SaslNettyClient saslNettyClient = getChannelSaslNettyClient(channel);
         LOG.debug("Responding to server's token of length: "
-                  + saslTokenMessage.getSaslToken().length);
+                  + saslMessageToken.getSaslToken().length);
 
         // Generate SASL response (but we only actually send the response if
         // it's non-null.
         byte[] responseToServer = saslNettyClient
-            .saslResponse(saslTokenMessage);
+            .saslResponse(saslMessageToken);
         if (responseToServer == null) {
             // If we generate a null response, then authentication has completed
             // (if not, warn), and return without sending a response back to the
@@ -127,7 +133,7 @@ public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
                 throw new Exception("Server response is null, but as far as "
                                     + "we can tell, we are not authenticated yet.");
             }
-            this.client.channelReady();
+            this.client.channelReady(channel);
             return;
         } else {
             LOG.debug("Response to server token has length:"
@@ -136,7 +142,7 @@ public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
         // Construct a message containing the SASL response and send it to the
         // server.
         SaslMessageToken saslResponse = new SaslMessageToken(responseToServer);
-        channel.write(saslResponse);
+        channel.writeAndFlush(saslResponse, channel.voidPromise());
     }
 
     private void getSASLCredentials() throws IOException {

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormServerAuthorizeHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormServerAuthorizeHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormServerAuthorizeHandler.java
index 7db90db..64a8bae 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormServerAuthorizeHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormServerAuthorizeHandler.java
@@ -12,21 +12,18 @@
 
 package org.apache.storm.messaging.netty;
 
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.channel.Channels;
-import org.apache.storm.shade.org.jboss.netty.channel.MessageEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.channel.ChannelInboundHandlerAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Authorize or deny client requests based on existence and completeness of client's SASL authentication.
  */
-public class SaslStormServerAuthorizeHandler extends SimpleChannelUpstreamHandler {
+public class SaslStormServerAuthorizeHandler extends ChannelInboundHandlerAdapter {
 
     private static final Logger LOG = LoggerFactory
-        .getLogger(SaslStormServerHandler.class);
+        .getLogger(SaslStormServerAuthorizeHandler.class);
 
     /**
      * Constructor.
@@ -35,19 +32,16 @@ public class SaslStormServerAuthorizeHandler extends SimpleChannelUpstreamHandle
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
-        Object msg = e.getMessage();
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         if (msg == null) {
             return;
         }
 
-        Channel channel = ctx.getChannel();
         LOG.debug("messageReceived: Checking whether the client is authorized to send messages to the server ");
 
         // Authorize: client is allowed to doRequest() if and only if the client
         // has successfully authenticated with this server.
-        SaslNettyServer saslNettyServer = SaslNettyServerState.getSaslNettyServer
-            .get(channel);
+        SaslNettyServer saslNettyServer = ctx.channel().attr(SaslNettyServerState.SASL_NETTY_SERVER).get();
 
         if (saslNettyServer == null) {
             LOG.warn("messageReceived: This client is *NOT* authorized to perform "
@@ -70,9 +64,9 @@ public class SaslStormServerAuthorizeHandler extends SimpleChannelUpstreamHandle
                   + saslNettyServer.getUserName()
                   + " is authorized to do request " + "on server.");
 
-        // We call fireMessageReceived since the client is allowed to perform
+        // We call fireChannelRead since the client is allowed to perform
         // this request. The client's request will now proceed to the next
         // pipeline component.
-        Channels.fireMessageReceived(ctx, msg);
+        ctx.fireChannelRead(msg);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormServerHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormServerHandler.java
index 40f8916..ce69a6f 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormServerHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormServerHandler.java
@@ -13,20 +13,17 @@
 package org.apache.storm.messaging.netty;
 
 import java.io.IOException;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.channel.Channels;
-import org.apache.storm.shade.org.jboss.netty.channel.ExceptionEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.MessageEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.channel.ChannelInboundHandlerAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
+public class SaslStormServerHandler extends ChannelInboundHandlerAdapter {
 
     private static final Logger LOG = LoggerFactory
         .getLogger(SaslStormServerHandler.class);
-    ISaslServer server;
+    private final ISaslServer server;
     /**
      * Used for client or server's token to send or receive from each other.
      */
@@ -39,41 +36,39 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-        throws Exception {
-        Object msg = e.getMessage();
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         if (msg == null) {
             return;
         }
 
-        Channel channel = ctx.getChannel();
+        Channel channel = ctx.channel();
 
         if (msg instanceof ControlMessage
-            && e.getMessage() == ControlMessage.SASL_TOKEN_MESSAGE_REQUEST) {
+            && msg == ControlMessage.SASL_TOKEN_MESSAGE_REQUEST) {
             // initialize server-side SASL functionality, if we haven't yet
             // (in which case we are looking at the first SASL message from the
             // client).
-            SaslNettyServer saslNettyServer = SaslNettyServerState.getSaslNettyServer
-                .get(channel);
+            SaslNettyServer saslNettyServer = channel.attr(SaslNettyServerState.SASL_NETTY_SERVER).get();
             if (saslNettyServer == null) {
                 LOG.debug("No saslNettyServer for " + channel
-                          + " yet; creating now, with topology token: ");
+                          + " yet; creating now, with topology token: " + topologyName);
                 try {
                     saslNettyServer = new SaslNettyServer(topologyName, token);
-                } catch (IOException ioe) {
+                    LOG.debug("SaslNettyServer for " + channel
+                        + "created with topology token: " + topologyName);
+                } catch (IOException e) {
                     LOG.error("Error occurred while creating saslNettyServer on server "
-                              + channel.getLocalAddress()
+                              + channel.localAddress()
                               + " for client "
-                              + channel.getRemoteAddress());
-                    saslNettyServer = null;
+                              + channel.remoteAddress());
+                    throw new IllegalStateException("Failed to set SaslNettyServerState.SASL_NETTY_SERVER");
                 }
 
-                SaslNettyServerState.getSaslNettyServer.set(channel,
-                                                            saslNettyServer);
+                channel.attr(SaslNettyServerState.SASL_NETTY_SERVER).set(saslNettyServer);
             } else {
                 LOG.debug("Found existing saslNettyServer on server:"
-                          + channel.getLocalAddress() + " for client "
-                          + channel.getRemoteAddress());
+                          + channel.localAddress() + " for client "
+                          + channel.remoteAddress());
             }
 
             LOG.debug("processToken:  With nettyServer: " + saslNettyServer
@@ -83,7 +78,7 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
             saslTokenMessageRequest = new SaslMessageToken(
                 saslNettyServer.response(new byte[0]));
             // Send response to client.
-            channel.write(saslTokenMessageRequest);
+            channel.writeAndFlush(saslTokenMessageRequest, channel.voidPromise());
             // do not send upstream to other handlers: no further action needs
             // to be done for SASL_TOKEN_MESSAGE_REQUEST requests.
             return;
@@ -93,8 +88,7 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
             // initialize server-side SASL functionality, if we haven't yet
             // (in which case we are looking at the first SASL message from the
             // client).
-            SaslNettyServer saslNettyServer = SaslNettyServerState.getSaslNettyServer
-                .get(channel);
+            SaslNettyServer saslNettyServer = channel.attr(SaslNettyServerState.SASL_NETTY_SERVER).get();
             if (saslNettyServer == null) {
                 throw new Exception("saslNettyServer was unexpectedly "
                                     + "null for channel: " + channel);
@@ -104,17 +98,17 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
                                              .getSaslToken()));
 
             // Send response to client.
-            channel.write(saslTokenMessageRequest);
+            channel.writeAndFlush(saslTokenMessageRequest, channel.voidPromise());
 
             if (saslNettyServer.isComplete()) {
                 // If authentication of client is complete, we will also send a
                 // SASL-Complete message to the client.
                 LOG.debug("SASL authentication is complete for client with "
                           + "username: " + saslNettyServer.getUserName());
-                channel.write(ControlMessage.SASL_COMPLETE_REQUEST);
+                channel.writeAndFlush(ControlMessage.SASL_COMPLETE_REQUEST, channel.voidPromise());
                 LOG.debug("Removing SaslServerHandler from pipeline since SASL "
                           + "authentication is complete.");
-                ctx.getPipeline().remove(this);
+                ctx.pipeline().remove(this);
                 server.authenticated(channel);
             }
         } else {
@@ -125,15 +119,13 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
             // authentication has not completed.
             LOG.warn("Sending upstream an unexpected non-SASL message :  "
                      + msg);
-            Channels.fireMessageReceived(ctx, msg);
+            ctx.fireChannelRead(msg);
         }
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-        if (server != null) {
-            server.closeChannel(e.getChannel());
-        }
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        ctx.close();
     }
 
     private void getSASLCredentials() throws IOException {

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
index d5b2c8a..6500abe 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
@@ -13,14 +13,13 @@
 package org.apache.storm.messaging.netty;
 
 import java.net.InetSocketAddress;
-import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
@@ -33,37 +32,45 @@ import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.metric.api.IStatefulObject;
 import org.apache.storm.serialization.KryoValuesDeserializer;
 import org.apache.storm.serialization.KryoValuesSerializer;
-import org.apache.storm.shade.org.jboss.netty.bootstrap.ServerBootstrap;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelFactory;
-import org.apache.storm.shade.org.jboss.netty.channel.group.ChannelGroup;
-import org.apache.storm.shade.org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.apache.storm.shade.org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.apache.storm.shade.io.netty.bootstrap.ServerBootstrap;
+import org.apache.storm.shade.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelFuture;
+import org.apache.storm.shade.io.netty.channel.ChannelOption;
+import org.apache.storm.shade.io.netty.channel.EventLoopGroup;
+import org.apache.storm.shade.io.netty.channel.group.ChannelGroup;
+import org.apache.storm.shade.io.netty.channel.group.DefaultChannelGroup;
+import org.apache.storm.shade.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.storm.shade.io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.storm.shade.io.netty.util.concurrent.GlobalEventExecutor;
 import org.apache.storm.utils.ObjectReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServer {
 
+    public static final int LOAD_METRICS_TASK_ID = -1;
+    
     private static final Logger LOG = LoggerFactory.getLogger(Server.class);
-    final ChannelFactory factory;
-    final ServerBootstrap bootstrap;
+    private final EventLoopGroup bossEventLoopGroup;
+    private final EventLoopGroup workerEventLoopGroup;
+    private final ServerBootstrap bootstrap;
     private final ConcurrentHashMap<String, AtomicInteger> messagesEnqueued = new ConcurrentHashMap<>();
     private final AtomicInteger messagesDequeued = new AtomicInteger(0);
     private final int boundPort;
-    Map<String, Object> topoConf;
-    int port;
-    volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
-    KryoValuesSerializer _ser;
-    KryoValuesDeserializer deser;
+    private final Map<String, Object> topoConf;
+    private final int port;
+    private final ChannelGroup allChannels = new DefaultChannelGroup("storm-server", GlobalEventExecutor.INSTANCE);
+    private final KryoValuesSerializer ser;
+    private final KryoValuesDeserializer deser;
     private volatile boolean closing = false;
-    private IConnectionCallback _cb = null;
+    private IConnectionCallback cb = null;
     private Supplier<Object> newConnectionResponse;
 
     Server(Map<String, Object> topoConf, int port) {
         this.topoConf = topoConf;
         this.port = port;
-        _ser = new KryoValuesSerializer(topoConf);
+        ser = new KryoValuesSerializer(topoConf);
         deser = new KryoValuesDeserializer(topoConf);
 
         // Configure the server.
@@ -74,30 +81,33 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
         ThreadFactory bossFactory = new NettyRenameThreadFactory(netty_name() + "-boss");
         ThreadFactory workerFactory = new NettyRenameThreadFactory(netty_name() + "-worker");
 
-        if (maxWorkers > 0) {
-            factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
-                                                        Executors.newCachedThreadPool(workerFactory), maxWorkers);
-        } else {
-            factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
-                                                        Executors.newCachedThreadPool(workerFactory));
-        }
+        bossEventLoopGroup = new NioEventLoopGroup(1, bossFactory);
+        // 0 means DEFAULT_EVENT_LOOP_THREADS
+        // https://github.com/netty/netty/blob/netty-4.1.24.Final/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java#L40
+        this.workerEventLoopGroup = new NioEventLoopGroup(maxWorkers > 0 ? maxWorkers : 0, workerFactory);
 
         LOG.info("Create Netty Server " + netty_name() + ", buffer_size: " + buffer_size + ", maxWorkers: " + maxWorkers);
 
-        bootstrap = new ServerBootstrap(factory);
-        bootstrap.setOption("child.tcpNoDelay", true);
-        bootstrap.setOption("child.receiveBufferSize", buffer_size);
-        bootstrap.setOption("child.keepAlive", true);
-        bootstrap.setOption("reuseAddress", true);
-        bootstrap.setOption("backlog", backlog);
-
-        // Set up the pipeline factory.
-        bootstrap.setPipelineFactory(new StormServerPipelineFactory(this));
+        bootstrap = new ServerBootstrap()
+            .group(bossEventLoopGroup, workerEventLoopGroup)
+            .channel(NioServerSocketChannel.class)
+            .option(ChannelOption.SO_REUSEADDR, true)
+            .option(ChannelOption.SO_BACKLOG, backlog)
+            .childOption(ChannelOption.TCP_NODELAY, true)
+            .childOption(ChannelOption.SO_RCVBUF, buffer_size)
+            .childOption(ChannelOption.SO_KEEPALIVE, true)
+            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+            .childHandler(new StormServerPipelineFactory(ser, deser, topoConf, this));
 
         // Bind and start to accept incoming connections.
-        Channel channel = bootstrap.bind(new InetSocketAddress(port));
-        boundPort = ((InetSocketAddress) channel.getLocalAddress()).getPort();
-        allChannels.add(channel);
+        try {
+            ChannelFuture bindFuture = bootstrap.bind(new InetSocketAddress(port)).sync();
+            Channel channel = bindFuture.channel();
+            boundPort = ((InetSocketAddress) channel.localAddress()).getPort();
+            allChannels.add(channel);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     private void addReceiveCount(String from, int amount) {
@@ -124,18 +134,18 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
      * @throws InterruptedException
      */
     protected void enqueue(List<TaskMessage> msgs, String from) throws InterruptedException {
-        if (null == msgs || msgs.size() == 0 || closing) {
+        if (null == msgs || msgs.isEmpty() || closing) {
             return;
         }
         addReceiveCount(from, msgs.size());
-        if (_cb != null) {
-            _cb.recv(msgs);
+        if (cb != null) {
+            cb.recv(msgs);
         }
     }
 
     @Override
     public void registerRecv(IConnectionCallback cb) {
-        _cb = cb;
+        this.cb = cb;
     }
 
     @Override
@@ -143,14 +153,6 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
         this.newConnectionResponse = newConnectionResponse;
     }
 
-    /**
-     * @param channel channel to close
-     */
-    public void closeChannel(Channel channel) {
-        channel.close().awaitUninterruptibly();
-        allChannels.remove(channel);
-    }
-
     @Override
     public int getPort() {
         return boundPort;
@@ -159,26 +161,25 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
     /**
      * close all channels, and release resources
      */
-    public synchronized void close() {
-        if (allChannels != null) {
-            allChannels.close().awaitUninterruptibly();
-            factory.releaseExternalResources();
-            allChannels = null;
-        }
+    @Override
+    public void close() {
+        allChannels.close().awaitUninterruptibly();
+        workerEventLoopGroup.shutdownGracefully().awaitUninterruptibly();
+        bossEventLoopGroup.shutdownGracefully().awaitUninterruptibly();
     }
 
     @Override
-    synchronized public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
+    public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
         MessageBatch mb = new MessageBatch(1);
-        mb.add(new TaskMessage(-1, _ser.serialize(Arrays.asList((Object) taskToLoad))));
-        allChannels.write(mb);
+        mb.add(new TaskMessage(LOAD_METRICS_TASK_ID, ser.serialize(Collections.singletonList((Object) taskToLoad))));
+        allChannels.writeAndFlush(mb);
     }
 
     // this method expected to be thread safe
     @Override
-    synchronized public void sendBackPressureStatus(BackPressureStatus bpStatus) {
+    public void sendBackPressureStatus(BackPressureStatus bpStatus) {
         LOG.info("Sending BackPressure status update to connected workers. BPStatus = {}", bpStatus);
-        allChannels.write(bpStatus);
+        allChannels.writeAndFlush(bpStatus);
     }
 
     @Override
@@ -187,16 +188,11 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
     }
 
     @Override
-    public void send(int task, byte[] message) {
-        throw new UnsupportedOperationException("Server connection should not send any messages");
-    }
-
-    @Override
     public void send(Iterator<TaskMessage> msgs) {
         throw new UnsupportedOperationException("Server connection should not send any messages");
     }
 
-    public String netty_name() {
+    public final String netty_name() {
         return "Netty-server-localhost-" + port;
     }
 
@@ -212,7 +208,7 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
     }
 
     private boolean connectionEstablished(Channel channel) {
-        return channel != null && channel.isBound();
+        return channel != null && channel.isActive();
     }
 
     private boolean connectionEstablished(ChannelGroup allChannels) {
@@ -226,11 +222,12 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
         return allEstablished;
     }
 
+    @Override
     public Object getState() {
         LOG.debug("Getting metrics for server on port {}", port);
         HashMap<String, Object> ret = new HashMap<>();
         ret.put("dequeuedMessages", messagesDequeued.getAndSet(0));
-        HashMap<String, Integer> enqueued = new HashMap<String, Integer>();
+        HashMap<String, Integer> enqueued = new HashMap<>();
         Iterator<Map.Entry<String, AtomicInteger>> it = messagesEnqueued.entrySet().iterator();
         while (it.hasNext()) {
             Map.Entry<String, AtomicInteger> ent = it.next();
@@ -245,8 +242,8 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
         ret.put("enqueued", enqueued);
 
         // Report messageSizes metric, if enabled (non-null).
-        if (_cb instanceof IMetric) {
-            Object metrics = ((IMetric) _cb).getValueAndReset();
+        if (cb instanceof IMetric) {
+            Object metrics = ((IMetric) cb).getValueAndReset();
             if (metrics instanceof Map) {
                 ret.put("messageBytes", metrics);
             }
@@ -258,28 +255,32 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
     /**
      * Implementing IServer.
      **/
-    public void channelConnected(Channel c) {
+    @Override
+    public void channelActive(Channel c) {
         if (newConnectionResponse != null) {
-            c.write(newConnectionResponse.get()); // not synchronized since it is not yet in channel grp, so pvt to this thread
+            c.writeAndFlush(newConnectionResponse.get(), c.voidPromise());
         }
         allChannels.add(c);
     }
 
+    @Override
     public void received(Object message, String remote, Channel channel) throws InterruptedException {
         List<TaskMessage> msgs = (List<TaskMessage>) message;
         enqueue(msgs, remote);
     }
 
+    @Override
     public String name() {
         return (String) topoConf.get(Config.TOPOLOGY_NAME);
     }
 
+    @Override
     public String secretKey() {
         return SaslUtils.getSecretKey(topoConf);
     }
 
+    @Override
     public void authenticated(Channel c) {
-        return;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
index 9b63346..4c38344 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
@@ -18,19 +18,16 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.storm.messaging.TaskMessage;
 import org.apache.storm.serialization.KryoValuesDeserializer;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelStateEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.ExceptionEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.MessageEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.channel.ChannelInboundHandlerAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class StormClientHandler extends SimpleChannelUpstreamHandler {
+public class StormClientHandler extends ChannelInboundHandlerAdapter {
     private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class);
-    private Client client;
-    private KryoValuesDeserializer _des;
-    private AtomicBoolean[] remoteBpStatus;
+    private final Client client;
+    private final KryoValuesDeserializer _des;
+    private final AtomicBoolean[] remoteBpStatus;
 
     StormClientHandler(Client client, AtomicBoolean[] remoteBpStatus, Map<String, Object> conf) {
         this.client = client;
@@ -39,9 +36,8 @@ public class StormClientHandler extends SimpleChannelUpstreamHandler {
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) {
+    public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception {
         //examine the response message from server
-        Object message = event.getMessage();
         if (message instanceof ControlMessage) {
             ControlMessage msg = (ControlMessage) message;
             if (msg == ControlMessage.FAILURE_RESPONSE) {
@@ -61,16 +57,17 @@ public class StormClientHandler extends SimpleChannelUpstreamHandler {
             }
             LOG.debug("Received BackPressure status update : {}", status);
         } else if (message instanceof List) {
-            //This should be the metrics, and there should only be one of them
+            //This should be the load metrics. 
+            //There will usually only be one message, but if there are multiple we only process the latest one.
             List<TaskMessage> list = (List<TaskMessage>) message;
             if (list.size() < 1) {
                 throw new RuntimeException("Didn't see enough load metrics (" + client.getDstAddress() + ") " + list);
             }
-            TaskMessage tm = ((List<TaskMessage>) message).get(list.size() - 1);
-            if (tm.task() != -1) {
+            TaskMessage tm = list.get(list.size() - 1);
+            if (tm.task() != Server.LOAD_METRICS_TASK_ID) {
                 throw new RuntimeException("Metrics messages are sent to the system task (" + client.getDstAddress() + ") " + tm);
             }
-            List metrics = _des.deserialize(tm.message());
+            List<Object> metrics = _des.deserialize(tm.message());
             if (metrics.size() < 1) {
                 throw new RuntimeException("No metrics data in the metrics message (" + client.getDstAddress() + ") " + metrics);
             }
@@ -85,13 +82,7 @@ public class StormClientHandler extends SimpleChannelUpstreamHandler {
     }
 
     @Override
-    public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-        client.notifyInterestChanged(e.getChannel());
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
-        Throwable cause = event.getCause();
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
         if (!(cause instanceof ConnectException)) {
             LOG.info("Connection to " + client.getDstAddress() + " failed:", cause);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java
index 1808d3e..2a790ef 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java
@@ -15,14 +15,14 @@ package org.apache.storm.messaging.netty;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.storm.Config;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelPipeline;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelPipelineFactory;
-import org.apache.storm.shade.org.jboss.netty.channel.Channels;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelInitializer;
+import org.apache.storm.shade.io.netty.channel.ChannelPipeline;
 
-class StormClientPipelineFactory implements ChannelPipelineFactory {
-    private Client client;
-    private AtomicBoolean[] remoteBpStatus;
-    private Map<String, Object> conf;
+class StormClientPipelineFactory extends ChannelInitializer<Channel> {
+    private final Client client;
+    private final AtomicBoolean[] remoteBpStatus;
+    private final Map<String, Object> conf;
 
     StormClientPipelineFactory(Client client, AtomicBoolean[] remoteBpStatus, Map<String, Object> conf) {
         this.client = client;
@@ -30,14 +30,15 @@ class StormClientPipelineFactory implements ChannelPipelineFactory {
         this.conf = conf;
     }
 
-    public ChannelPipeline getPipeline() throws Exception {
+    @Override
+    protected void initChannel(Channel ch) throws Exception {
         // Create a default pipeline implementation.
-        ChannelPipeline pipeline = Channels.pipeline();
+        ChannelPipeline pipeline = ch.pipeline();
 
         // Decoder
         pipeline.addLast("decoder", new MessageDecoder(client.deser));
         // Encoder
-        pipeline.addLast("encoder", new MessageEncoder(client.ser));
+        pipeline.addLast("encoder", NettySerializableMessageEncoder.INSTANCE);
 
         boolean isNettyAuth = (Boolean) conf
             .get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
@@ -48,6 +49,5 @@ class StormClientPipelineFactory implements ChannelPipelineFactory {
         }
         // business logic.
         pipeline.addLast("handler", new StormClientHandler(client, remoteBpStatus, conf));
-        return pipeline;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
index 7138f94..55e7058 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
@@ -17,22 +17,18 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelStateEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.ExceptionEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.MessageEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.channel.ChannelInboundHandlerAdapter;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class StormServerHandler extends SimpleChannelUpstreamHandler {
+public class StormServerHandler extends ChannelInboundHandlerAdapter {
     private static final Logger LOG = LoggerFactory.getLogger(StormServerHandler.class);
-    private static final Set<Class> allowedExceptions = new HashSet<>(Arrays.asList(new Class[]{ IOException.class }));
-    IServer server;
-    private AtomicInteger failure_count;
-    private Channel channel;
+    private static final Set<Class> ALLOWED_EXCEPTIONS = new HashSet<>(Arrays.asList(new Class[]{ IOException.class }));
+    private final IServer server;
+    private final AtomicInteger failure_count;
 
     public StormServerHandler(IServer server) {
         this.server = server;
@@ -40,40 +36,35 @@ public class StormServerHandler extends SimpleChannelUpstreamHandler {
     }
 
     @Override
-    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
-        server.channelConnected(e.getChannel());
-        if (channel != null) {
-            LOG.debug("Replacing channel with new channel: {} -> ",
-                      channel, e.getChannel());
-        }
-        channel = e.getChannel();
-        server.channelConnected(channel);
+    public void channelActive(ChannelHandlerContext ctx) {
+        server.channelActive(ctx.channel());
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
-        Object msgs = e.getMessage();
-        if (msgs == null) {
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        if (msg == null) {
             return;
         }
 
+        Channel channel = ctx.channel();
         try {
-            server.received(msgs, e.getRemoteAddress().toString(), channel);
-        } catch (InterruptedException e1) {
+            server.received(msg, channel.remoteAddress().toString(), channel);
+        } catch (InterruptedException e) {
             LOG.info("failed to enqueue a request message", e);
             failure_count.incrementAndGet();
         }
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
         try {
-            LOG.error("server errors in handling the request", e.getCause());
+            LOG.error("server errors in handling the request", cause);
         } catch (Throwable err) {
             // Doing nothing (probably due to an oom issue) and hoping Utils.handleUncaughtException will handle it
         }
         try {
-            Utils.handleUncaughtException(e.getCause(), allowedExceptions);
+            Utils.handleUncaughtException(cause, ALLOWED_EXCEPTIONS);
+            ctx.close();
         } catch (Error error) {
             LOG.info("Received error in netty thread.. terminating server...");
             Runtime.getRuntime().exit(1);

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java
index 068843f..6e22fb7 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
@@ -12,28 +13,41 @@
 
 package org.apache.storm.messaging.netty;
 
+import java.util.Map;
 import org.apache.storm.Config;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelPipeline;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelPipelineFactory;
-import org.apache.storm.shade.org.jboss.netty.channel.Channels;
-
-class StormServerPipelineFactory implements ChannelPipelineFactory {
-    private Server server;
-
-    StormServerPipelineFactory(Server server) {
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelInitializer;
+import org.apache.storm.shade.io.netty.channel.ChannelPipeline;
+
+class StormServerPipelineFactory extends ChannelInitializer<Channel> {
+
+    private final KryoValuesSerializer ser;
+    private final KryoValuesDeserializer deser;
+    private final Map<String, Object> topoConf;
+    private final Server server;
+
+    StormServerPipelineFactory(KryoValuesSerializer ser, KryoValuesDeserializer deser,
+        Map<String, Object> topoConf, Server server) {
+        this.ser = ser;
+        this.deser = deser;
+        this.topoConf = topoConf;
         this.server = server;
     }
 
-    public ChannelPipeline getPipeline() throws Exception {
+    @Override
+    protected void initChannel(Channel ch) throws Exception {
         // Create a default pipeline implementation.
-        ChannelPipeline pipeline = Channels.pipeline();
+        ChannelPipeline pipeline = ch.pipeline();
 
         // Decoder
-        pipeline.addLast("decoder", new MessageDecoder(server.deser));
-        // Encoder
-        pipeline.addLast("encoder", new MessageEncoder(server._ser));
+        pipeline.addLast("decoder", new MessageDecoder(deser));
+        // Encoders
+        pipeline.addLast("netty-serializable-encoder", NettySerializableMessageEncoder.INSTANCE);
+        pipeline.addLast("backpressure-encoder", new BackPressureStatusEncoder(ser));
 
-        boolean isNettyAuth = (Boolean) this.server.topoConf
+        boolean isNettyAuth = (Boolean) topoConf
             .get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
         if (isNettyAuth) {
             // Authenticate: Removed after authentication completes
@@ -41,11 +55,9 @@ class StormServerPipelineFactory implements ChannelPipelineFactory {
                 server));
             // Authorize
             pipeline.addLast("authorizeServerHandler",
-                             new SaslStormServerAuthorizeHandler());
+                new SaslStormServerAuthorizeHandler());
         }
         // business logic.
         pipeline.addLast("handler", new StormServerHandler(server));
-
-        return pipeline;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java b/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
index ccaa0f8..c24d8db 100644
--- a/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
+++ b/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
@@ -16,7 +16,6 @@ import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -28,10 +27,14 @@ import org.apache.storm.messaging.netty.ISaslClient;
 import org.apache.storm.messaging.netty.NettyRenameThreadFactory;
 import org.apache.storm.pacemaker.codec.ThriftNettyClientCodec;
 import org.apache.storm.security.auth.ClientAuthUtils;
-import org.apache.storm.shade.org.jboss.netty.bootstrap.ClientBootstrap;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelPipelineFactory;
-import org.apache.storm.shade.org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.apache.storm.shade.io.netty.bootstrap.Bootstrap;
+import org.apache.storm.shade.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelOption;
+import org.apache.storm.shade.io.netty.channel.EventLoopGroup;
+import org.apache.storm.shade.io.netty.channel.WriteBufferWaterMark;
+import org.apache.storm.shade.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.storm.shade.io.netty.channel.socket.nio.NioSocketChannel;
 import org.apache.storm.utils.StormBoundedExponentialBackoffRetry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,7 +43,8 @@ public class PacemakerClient implements ISaslClient {
 
     private static final Logger LOG = LoggerFactory.getLogger(PacemakerClient.class);
     private static Timer timer = new Timer(true);
-    private final ClientBootstrap bootstrap;
+    private final Bootstrap bootstrap;
+    private final EventLoopGroup workerEventLoopGroup;
     private String client_name;
     private String secret;
     private AtomicBoolean ready;
@@ -56,11 +60,6 @@ public class PacemakerClient implements ISaslClient {
     private StormBoundedExponentialBackoffRetry backoff = new StormBoundedExponentialBackoffRetry(100, 5000, 20);
     private int retryTimes = 0;
 
-    //the constructor is invoked by pacemaker-state-factory-test
-    public PacemakerClient() {
-        bootstrap = new ClientBootstrap();
-    }
-
     public PacemakerClient(Map<String, Object> config, String host) {
 
         int port = (int) config.get(Config.PACEMAKER_PORT);
@@ -68,9 +67,9 @@ public class PacemakerClient implements ISaslClient {
         if (client_name == null) {
             client_name = "pacemaker-client";
         }
+        int maxWorkers = (int)config.get(Config.PACEMAKER_CLIENT_MAX_THREADS);
 
         String auth = (String) config.get(Config.PACEMAKER_AUTH_METHOD);
-        ThriftNettyClientCodec.AuthMethod authMethod;
 
         switch (auth) {
 
@@ -100,23 +99,25 @@ public class PacemakerClient implements ISaslClient {
 
         ready = new AtomicBoolean(false);
         shutdown = new AtomicBoolean(false);
-        channelRef = new AtomicReference<Channel>(null);
+        channelRef = new AtomicReference<>(null);
         setupMessaging();
 
-        ThreadFactory bossFactory = new NettyRenameThreadFactory("client-boss");
         ThreadFactory workerFactory = new NettyRenameThreadFactory("client-worker");
-        NioClientSocketChannelFactory factory =
-            new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), Executors.newCachedThreadPool(workerFactory));
-        bootstrap = new ClientBootstrap(factory);
-        bootstrap.setOption("tcpNoDelay", true);
-        bootstrap.setOption("sendBufferSize", 5242880);
-        bootstrap.setOption("keepAlive", true);
+        // 0 means DEFAULT_EVENT_LOOP_THREADS
+        // https://github.com/netty/netty/blob/netty-4.1.24.Final/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java#L40
+        this.workerEventLoopGroup = new NioEventLoopGroup(maxWorkers > 0 ? maxWorkers : 0, workerFactory);
+        int thriftMessageMaxSize = (Integer) config.get(Config.PACEMAKER_THRIFT_MESSAGE_SIZE_MAX);
+        bootstrap = new Bootstrap()
+            .group(workerEventLoopGroup)
+            .channel(NioSocketChannel.class)
+            .option(ChannelOption.TCP_NODELAY, true)
+            .option(ChannelOption.SO_SNDBUF, 5242880)
+            .option(ChannelOption.SO_KEEPALIVE, true)
+            .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024))
+            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+            .handler(new ThriftNettyClientCodec(this, config, authMethod, host, thriftMessageMaxSize));
 
         remote_addr = new InetSocketAddress(host, port);
-        int thriftMessageMaxSize = (Integer) config.get(Config.PACEMAKER_THRIFT_MESSAGE_SIZE_MAX);
-        ChannelPipelineFactory pipelineFactory =
-            new ThriftNettyClientCodec(this, config, authMethod, host, thriftMessageMaxSize).pipelineFactory();
-        bootstrap.setPipelineFactory(pipelineFactory);
         bootstrap.connect(remote_addr);
     }
 
@@ -129,27 +130,16 @@ public class PacemakerClient implements ISaslClient {
     }
 
     @Override
-    public synchronized void channelConnected(Channel channel) {
+    public synchronized void channelReady(Channel channel) {
         Channel oldChannel = channelRef.get();
         if (oldChannel != null) {
             LOG.debug("Closing oldChannel is connected: {}", oldChannel.toString());
             close_channel();
         }
 
-        LOG.debug("Channel is connected: {}", channel.toString());
         channelRef.set(channel);
-
-        //If we're not going to authenticate, we can begin sending.
-        if (authMethod == ThriftNettyClientCodec.AuthMethod.NONE) {
-            ready.set(true);
-            this.notifyAll();
-        }
         retryTimes = 0;
-    }
-
-    @Override
-    public synchronized void channelReady() {
-        LOG.debug("Channel is ready.");
+        LOG.debug("Channel is ready: {}", channel.toString());
         ready.set(true);
         this.notifyAll();
     }
@@ -176,7 +166,7 @@ public class PacemakerClient implements ISaslClient {
                     waitUntilReady();
                     Channel channel = channelRef.get();
                     if (channel != null) {
-                        channel.write(m);
+                        channel.writeAndFlush(m, channel.voidPromise());
                         m.wait(1000);
                     }
                     if (messages[next] != m && messages[next] != null) {
@@ -257,7 +247,7 @@ public class PacemakerClient implements ISaslClient {
 
     public void shutdown() {
         shutdown.set(true);
-        bootstrap.releaseExternalResources();
+        workerEventLoopGroup.shutdownGracefully().awaitUninterruptibly();
     }
 
     private synchronized void close_channel() {

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClientHandler.java b/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClientHandler.java
index 49525db..b81f02d 100644
--- a/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClientHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClientHandler.java
@@ -15,57 +15,49 @@ package org.apache.storm.pacemaker;
 import java.net.ConnectException;
 import org.apache.storm.generated.HBMessage;
 import org.apache.storm.messaging.netty.ControlMessage;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelStateEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.ExceptionEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.MessageEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.channel.ChannelInboundHandlerAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PacemakerClientHandler extends SimpleChannelUpstreamHandler {
+public class PacemakerClientHandler extends ChannelInboundHandlerAdapter {
     private static final Logger LOG = LoggerFactory.getLogger(PacemakerClientHandler.class);
 
-    private PacemakerClient client;
+    private final PacemakerClient client;
 
     public PacemakerClientHandler(PacemakerClient client) {
         this.client = client;
     }
 
     @Override
-    public void channelConnected(ChannelHandlerContext ctx,
-                                 ChannelStateEvent event) {
+    public void channelActive(ChannelHandlerContext ctx) {
         // register the newly established channel
-        Channel channel = ctx.getChannel();
-        client.channelConnected(channel);
-
+        Channel channel = ctx.channel();
         LOG.info("Connection established from {} to {}",
-                 channel.getLocalAddress(), channel.getRemoteAddress());
+                 channel.localAddress(), channel.remoteAddress());
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) {
-        LOG.debug("Got Message: {}", event.getMessage().toString());
-        Object evm = event.getMessage();
+    public void channelRead(ChannelHandlerContext ctx, Object message) {
+        LOG.debug("Got Message: {}", message.toString());
 
-        if (evm instanceof ControlMessage) {
-            LOG.debug("Got control message: {}", evm.toString());
+        if (message instanceof ControlMessage) {
+            LOG.debug("Got control message: {}", message.toString());
             return;
-        } else if (evm instanceof HBMessage) {
-            client.gotMessage((HBMessage) evm);
+        } else if (message instanceof HBMessage) {
+            client.gotMessage((HBMessage) message);
         } else {
-            LOG.warn("Got unexpected message: {} from server.", evm);
+            LOG.warn("Got unexpected message: {} from server.", message);
         }
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
-        Throwable t = event.getCause();
-        if (t instanceof ConnectException) {
-            LOG.warn("Connection to pacemaker failed. Trying to reconnect {}", t.getMessage());
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        if (cause instanceof ConnectException) {
+            LOG.warn("Connection to pacemaker failed. Trying to reconnect {}", cause.getMessage());
         } else {
-            LOG.error("Exception occurred in Pacemaker.", t);
+            LOG.error("Exception occurred in Pacemaker.", cause);
         }
         client.reconnect();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java b/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java
index 2d9d5c9..c88ecf2 100644
--- a/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java
+++ b/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java
@@ -13,17 +13,17 @@
 package org.apache.storm.pacemaker.codec;
 
 import java.io.IOException;
+import java.util.List;
 import org.apache.storm.generated.HBMessage;
 import org.apache.storm.generated.HBServerMessageType;
 import org.apache.storm.messaging.netty.ControlMessage;
 import org.apache.storm.messaging.netty.SaslMessageToken;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBuffer;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.handler.codec.frame.FrameDecoder;
+import org.apache.storm.shade.io.netty.buffer.ByteBuf;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.handler.codec.ByteToMessageDecoder;
 import org.apache.storm.utils.Utils;
 
-public class ThriftDecoder extends FrameDecoder {
+public class ThriftDecoder extends ByteToMessageDecoder {
 
     private static final int INTEGER_SIZE = 4;
 
@@ -40,11 +40,10 @@ public class ThriftDecoder extends FrameDecoder {
     }
 
     @Override
-    protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
-
+    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf buf, List<Object> out) throws Exception {
         long available = buf.readableBytes();
         if (available < INTEGER_SIZE) {
-            return null;
+            return;
         }
 
         buf.markReaderIndex();
@@ -61,7 +60,7 @@ public class ThriftDecoder extends FrameDecoder {
         if (available < thriftLen) {
             // We haven't received the entire object yet, return and wait for more bytes.
             buf.resetReaderIndex();
-            return null;
+            return;
         }
 
         byte serialized[] = new byte[thriftLen];
@@ -70,12 +69,12 @@ public class ThriftDecoder extends FrameDecoder {
 
         if (m.get_type() == HBServerMessageType.CONTROL_MESSAGE) {
             ControlMessage cm = ControlMessage.read(m.get_data().get_message_blob());
-            return cm;
+            out.add(cm);
         } else if (m.get_type() == HBServerMessageType.SASL_MESSAGE_TOKEN) {
             SaslMessageToken sm = SaslMessageToken.read(m.get_data().get_message_blob());
-            return sm;
+            out.add(sm);
         } else {
-            return m;
+            out.add(m);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftEncoder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftEncoder.java b/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftEncoder.java
index d726159..9dffd04 100644
--- a/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftEncoder.java
+++ b/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftEncoder.java
@@ -12,59 +12,56 @@
 
 package org.apache.storm.pacemaker.codec;
 
-import java.io.IOException;
+import java.util.List;
 import org.apache.storm.generated.HBMessage;
 import org.apache.storm.generated.HBMessageData;
 import org.apache.storm.generated.HBServerMessageType;
 import org.apache.storm.messaging.netty.ControlMessage;
 import org.apache.storm.messaging.netty.INettySerializable;
 import org.apache.storm.messaging.netty.SaslMessageToken;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBuffer;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBuffers;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+import org.apache.storm.shade.io.netty.buffer.ByteBuf;
+import org.apache.storm.shade.io.netty.buffer.ByteBufAllocator;
+import org.apache.storm.shade.io.netty.buffer.Unpooled;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.handler.codec.MessageToMessageEncoder;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ThriftEncoder extends OneToOneEncoder {
+public class ThriftEncoder extends MessageToMessageEncoder<Object> {
 
     private static final Logger LOG = LoggerFactory
         .getLogger(ThriftEncoder.class);
 
-    private HBMessage encodeNettySerializable(INettySerializable netty_message,
-                                              HBServerMessageType mType) {
+    private HBMessage encodeNettySerializable(ByteBufAllocator alloc,
+        INettySerializable netty_message, HBServerMessageType mType) {
 
         HBMessageData message_data = new HBMessageData();
         HBMessage m = new HBMessage();
+        byte[] messageBuffer = new byte[netty_message.encodeLength()];
+        ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(messageBuffer);
         try {
-            ChannelBuffer cbuffer = netty_message.buffer();
-            if (cbuffer.hasArray()) {
-                message_data.set_message_blob(cbuffer.array());
-            } else {
-                byte buff[] = new byte[netty_message.encodeLength()];
-                cbuffer.readBytes(buff, 0, netty_message.encodeLength());
-                message_data.set_message_blob(buff);
-            }
+            netty_message.write(wrappedBuffer);
+            
+            message_data.set_message_blob(messageBuffer);
             m.set_type(mType);
             m.set_data(message_data);
             return m;
-        } catch (IOException e) {
-            LOG.error("Failed to encode NettySerializable: ", e);
-            throw new RuntimeException(e);
+        } finally {
+            wrappedBuffer.release();
         }
     }
 
     @Override
-    protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) {
+    protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, List<Object> out) throws Exception {
         if (msg == null) {
-            return null;
+            return;
         }
 
         LOG.debug("Trying to encode: " + msg.getClass().toString() + " : " + msg.toString());
 
         HBMessage m;
+        ByteBufAllocator alloc = channelHandlerContext.alloc();
         if (msg instanceof INettySerializable) {
             INettySerializable nettyMsg = (INettySerializable) msg;
 
@@ -77,19 +74,19 @@ public class ThriftEncoder extends OneToOneEncoder {
                 LOG.error("Didn't recognise INettySerializable: " + nettyMsg.toString());
                 throw new RuntimeException("Unrecognized INettySerializable.");
             }
-            m = encodeNettySerializable(nettyMsg, type);
+            m = encodeNettySerializable(alloc, nettyMsg, type);
         } else {
             m = (HBMessage) msg;
         }
 
         try {
             byte serialized[] = Utils.thriftSerialize(m);
-            ChannelBuffer ret = ChannelBuffers.directBuffer(serialized.length + 4);
+            ByteBuf ret = alloc.ioBuffer(serialized.length + 4);
 
             ret.writeInt(serialized.length);
             ret.writeBytes(serialized);
 
-            return ret;
+            out.add(ret);
         } catch (RuntimeException e) {
             LOG.error("Failed to serialize.", e);
             throw e;

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyClientCodec.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyClientCodec.java b/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyClientCodec.java
index 853ce74..00e1bc9 100644
--- a/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyClientCodec.java
+++ b/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyClientCodec.java
@@ -19,25 +19,24 @@ import org.apache.storm.messaging.netty.SaslStormClientHandler;
 import org.apache.storm.pacemaker.PacemakerClient;
 import org.apache.storm.pacemaker.PacemakerClientHandler;
 import org.apache.storm.security.auth.ClientAuthUtils;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelPipeline;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelPipelineFactory;
-import org.apache.storm.shade.org.jboss.netty.channel.Channels;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelInitializer;
+import org.apache.storm.shade.io.netty.channel.ChannelPipeline;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ThriftNettyClientCodec {
+public class ThriftNettyClientCodec extends ChannelInitializer<Channel> {
 
     public static final String SASL_HANDLER = "sasl-handler";
     public static final String KERBEROS_HANDLER = "kerberos-handler";
     private static final Logger LOG = LoggerFactory
         .getLogger(ThriftNettyClientCodec.class);
 
-    ;
     private final int thriftMessageMaxSize;
-    private PacemakerClient client;
-    private AuthMethod authMethod;
-    private Map<String, Object> topoConf;
-    private String host;
+    private final PacemakerClient client;
+    private final AuthMethod authMethod;
+    private final Map<String, Object> topoConf;
+    private final String host;
 
     public ThriftNettyClientCodec(PacemakerClient pacemaker_client, Map<String, Object> topoConf,
                                   AuthMethod authMethod, String host, int thriftMessageMaxSizeBytes) {
@@ -48,10 +47,9 @@ public class ThriftNettyClientCodec {
         thriftMessageMaxSize = thriftMessageMaxSizeBytes;
     }
 
-    public ChannelPipelineFactory pipelineFactory() {
-        return new ChannelPipelineFactory() {
-            public ChannelPipeline getPipeline() {
-                ChannelPipeline pipeline = Channels.pipeline();
+    @Override
+    protected void initChannel(Channel ch) throws Exception {
+        ChannelPipeline pipeline = ch.pipeline();
                 pipeline.addLast("encoder", new ThriftEncoder());
                 pipeline.addLast("decoder", new ThriftDecoder(thriftMessageMaxSize));
 
@@ -74,13 +72,10 @@ public class ThriftNettyClientCodec {
                         throw new RuntimeException(e);
                     }
                 } else {
-                    client.channelReady();
+            client.channelReady(ch);
                 }
 
                 pipeline.addLast("PacemakerClientHandler", new PacemakerClientHandler(client));
-                return pipeline;
-            }
-        };
     }
 
     public enum AuthMethod {

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java b/storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java
index 3d41766..540d691 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
@@ -17,6 +18,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.stream.Stream;
 import org.apache.storm.generated.NodeInfo;
 import org.apache.storm.messaging.IConnection;
 import org.apache.storm.messaging.TaskMessage;
@@ -26,7 +28,7 @@ import org.slf4j.LoggerFactory;
 public class TransferDrainer {
 
     private static final Logger LOG = LoggerFactory.getLogger(TransferDrainer.class);
-    private Map<Integer, ArrayList<TaskMessage>> bundles = new HashMap();
+    private final Map<Integer, ArrayList<TaskMessage>> bundles = new HashMap<>();
 
     // Cache the msgs grouped by destination node
     public void add(TaskMessage taskMsg) {
@@ -40,26 +42,24 @@ public class TransferDrainer {
     }
 
     public void send(Map<Integer, NodeInfo> taskToNode, Map<NodeInfo, IConnection> connections) {
-        HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> bundleMapByDestination = groupBundleByDestination(taskToNode);
+        HashMap<NodeInfo, Stream<TaskMessage>> bundleMapByDestination = groupBundleByDestination(taskToNode);
 
-        for (Map.Entry<NodeInfo, ArrayList<ArrayList<TaskMessage>>> entry : bundleMapByDestination.entrySet()) {
+        for (Map.Entry<NodeInfo, Stream<TaskMessage>> entry : bundleMapByDestination.entrySet()) {
             NodeInfo node = entry.getKey();
             IConnection conn = connections.get(node);
             if (conn != null) {
-                ArrayList<ArrayList<TaskMessage>> bundle = entry.getValue();
-                Iterator<TaskMessage> iter = getBundleIterator(bundle);
-                if (null != iter && iter.hasNext()) {
+                Iterator<TaskMessage> iter = entry.getValue().iterator();
+                if (iter.hasNext()) {
                     conn.send(iter);
                 }
-                entry.getValue().clear();
             } else {
                 LOG.warn("Connection not available for hostPort {}", node);
             }
         }
     }
 
-    private HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> groupBundleByDestination(Map<Integer, NodeInfo> taskToNode) {
-        HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> result = new HashMap<>();
+    private HashMap<NodeInfo, Stream<TaskMessage>> groupBundleByDestination(Map<Integer, NodeInfo> taskToNode) {
+        HashMap<NodeInfo, Stream<TaskMessage>> result = new HashMap<>();
 
         for (Entry<Integer, ArrayList<TaskMessage>> entry : bundles.entrySet()) {
             if (entry.getValue().isEmpty()) {
@@ -67,12 +67,7 @@ public class TransferDrainer {
             }
             NodeInfo node = taskToNode.get(entry.getKey());
             if (node != null) {
-                ArrayList<ArrayList<TaskMessage>> msgs = result.get(node);
-                if (msgs == null) {
-                    msgs = new ArrayList<>();
-                    result.put(node, msgs);
-                }
-                msgs.add(entry.getValue());
+                result.merge(node, entry.getValue().stream(), Stream::concat);
             } else {
                 LOG.warn("No remote destination available for task {}", entry.getKey());
             }
@@ -80,54 +75,6 @@ public class TransferDrainer {
         return result;
     }
 
-    private Iterator<TaskMessage> getBundleIterator(final ArrayList<ArrayList<TaskMessage>> bundle) {
-
-        if (null == bundle) {
-            return null;
-        }
-
-        return new Iterator<TaskMessage>() {
-
-            private int offset = 0;
-            private int size = 0;
-            private int bundleOffset = 0;
-            private Iterator<TaskMessage> iter = bundle.get(bundleOffset).iterator();
-
-            {
-                for (ArrayList<TaskMessage> list : bundle) {
-                    size += list.size();
-                }
-            }
-
-            @Override
-            public boolean hasNext() {
-                return offset < size;
-            }
-
-            @Override
-            public TaskMessage next() {
-                TaskMessage msg;
-                if (iter.hasNext()) {
-                    msg = iter.next();
-                } else {
-                    bundleOffset++;
-                    iter = bundle.get(bundleOffset).iterator();
-                    msg = iter.next();
-                }
-                if (null != msg) {
-                    offset++;
-                }
-                return msg;
-            }
-
-            @Override
-            public void remove() {
-                throw new RuntimeException("not supported");
-            }
-        };
-    }
-
-
     public void clear() {
         for (ArrayList<TaskMessage> taskMessages : bundles.values()) {
             taskMessages.clear();

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/test/jvm/org/apache/storm/PaceMakerStateStorageFactoryTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/PaceMakerStateStorageFactoryTest.java b/storm-client/test/jvm/org/apache/storm/PaceMakerStateStorageFactoryTest.java
index f4031f1..39db4ef 100644
--- a/storm-client/test/jvm/org/apache/storm/PaceMakerStateStorageFactoryTest.java
+++ b/storm-client/test/jvm/org/apache/storm/PaceMakerStateStorageFactoryTest.java
@@ -18,6 +18,10 @@
 
 package org.apache.storm;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -30,18 +34,30 @@ import org.apache.storm.generated.HBPulse;
 import org.apache.storm.generated.HBServerMessageType;
 import org.apache.storm.pacemaker.PacemakerClient;
 import org.apache.storm.pacemaker.PacemakerClientPool;
+import org.apache.storm.pacemaker.PacemakerConnectionException;
 import org.apache.storm.utils.Utils;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
 
+@RunWith(MockitoJUnitRunner.class)
 public class PaceMakerStateStorageFactoryTest {
-    private PaceMakerClientProxy clientProxy;
+    
+    @Captor
+    private ArgumentCaptor<HBMessage> hbMessageCaptor;
+    
+    @Mock
+    private PacemakerClient clientMock;
     private PacemakerClientPoolProxy clientPoolProxy;
     private PaceMakerStateStorage stateStorage;
 
     public void createPaceMakerStateStorage(HBServerMessageType messageType, HBMessageData messageData) throws Exception {
         HBMessage response = new HBMessage(messageType, messageData);
-        clientProxy = new PaceMakerClientProxy(response, null);
+        when(clientMock.send(any())).thenReturn(response);
         clientPoolProxy = new PacemakerClientPoolProxy();
         stateStorage = new PaceMakerStateStorage(clientPoolProxy, null);
     }
@@ -50,7 +66,8 @@ public class PaceMakerStateStorageFactoryTest {
     public void testSetWorkerHb() throws Exception {
         createPaceMakerStateStorage(HBServerMessageType.SEND_PULSE_RESPONSE, null);
         stateStorage.set_worker_hb("/foo", Utils.javaSerialize("data"), null);
-        HBMessage sent = clientProxy.checkCaptured();
+        verify(clientMock).send(hbMessageCaptor.capture());
+        HBMessage sent = hbMessageCaptor.getValue();
         HBPulse pulse = sent.get_data().get_pulse();
         Assert.assertEquals(HBServerMessageType.SEND_PULSE, sent.get_type());
         Assert.assertEquals("/foo", pulse.get_id());
@@ -67,7 +84,8 @@ public class PaceMakerStateStorageFactoryTest {
     public void testDeleteWorkerHb() throws Exception {
         createPaceMakerStateStorage(HBServerMessageType.DELETE_PATH_RESPONSE, null);
         stateStorage.delete_worker_hb("/foo/bar");
-        HBMessage sent = clientProxy.checkCaptured();
+        verify(clientMock).send(hbMessageCaptor.capture());
+        HBMessage sent = hbMessageCaptor.getValue();
         Assert.assertEquals(HBServerMessageType.DELETE_PATH, sent.get_type());
         Assert.assertEquals("/foo/bar", sent.get_data().get_path());
     }
@@ -86,7 +104,8 @@ public class PaceMakerStateStorageFactoryTest {
         hbPulse.set_details(Utils.serialize(cwh));
         createPaceMakerStateStorage(HBServerMessageType.GET_PULSE_RESPONSE, HBMessageData.pulse(hbPulse));
         stateStorage.get_worker_hb("/foo", false);
-        HBMessage sent = clientProxy.checkCaptured();
+        verify(clientMock).send(hbMessageCaptor.capture());
+        HBMessage sent = hbMessageCaptor.getValue();
         Assert.assertEquals(HBServerMessageType.GET_PULSE, sent.get_type());
         Assert.assertEquals("/foo", sent.get_data().get_path());
     }
@@ -107,7 +126,8 @@ public class PaceMakerStateStorageFactoryTest {
     public void testGetWorkerHbChildren() throws Exception {
         createPaceMakerStateStorage(HBServerMessageType.GET_ALL_NODES_FOR_PATH_RESPONSE, HBMessageData.nodes(new HBNodes()));
         stateStorage.get_worker_hb_children("/foo", false);
-        HBMessage sent = clientProxy.checkCaptured();
+        verify(clientMock).send(hbMessageCaptor.capture());
+        HBMessage sent = hbMessageCaptor.getValue();
         Assert.assertEquals(HBServerMessageType.GET_ALL_NODES_FOR_PATH, sent.get_type());
         Assert.assertEquals("/foo", sent.get_data().get_path());
     }
@@ -118,42 +138,22 @@ public class PaceMakerStateStorageFactoryTest {
         stateStorage.get_worker_hb_children("/foo", false);
     }
 
-    private class PaceMakerClientProxy extends PacemakerClient {
-        private HBMessage response;
-        private HBMessage captured;
-
-        public PaceMakerClientProxy(HBMessage response, HBMessage captured) {
-            this.response = response;
-            this.captured = captured;
-        }
-
-        @Override
-        public HBMessage send(HBMessage m) {
-            captured = m;
-            return response;
-        }
-
-        public HBMessage checkCaptured() {
-            return captured;
-        }
-    }
-
     private class PacemakerClientPoolProxy extends PacemakerClientPool {
         public PacemakerClientPoolProxy() {
             super(new HashMap<>());
         }
 
         public PacemakerClient getWriteClient() {
-            return clientProxy;
+            return clientMock;
         }
 
-        public HBMessage send(HBMessage m) {
-            return clientProxy.send(m);
+        public HBMessage send(HBMessage m) throws PacemakerConnectionException, InterruptedException {
+            return clientMock.send(m);
         }
 
-        public List<HBMessage> sendAll(HBMessage m) {
+        public List<HBMessage> sendAll(HBMessage m) throws PacemakerConnectionException, InterruptedException {
             List<HBMessage> response = new ArrayList<>();
-            response.add(clientProxy.send(m));
+            response.add(clientMock.send(m));
             return response;
         }
     }