You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/06/09 11:37:47 UTC

[4/5] storm git commit: STORM-1038: Upgrade to latest Netty

STORM-1038: Upgrade to latest Netty

Use manually allocated heap buffer for thrift encoder
Fix flaky test
Set correct checkstyle violations
Whitespace


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e2308285
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e2308285
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e2308285

Branch: refs/heads/master
Commit: e2308285d28a2df4a1435a1f4fb304630749c29a
Parents: d293dd3
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Fri May 11 16:55:35 2018 +0200
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Jun 9 20:34:51 2018 +0900

----------------------------------------------------------------------
 conf/defaults.yaml                              |   2 -
 pom.xml                                         |  18 +--
 shaded-deps/pom.xml                             |   8 +-
 storm-client/pom.xml                            |   2 +-
 .../src/jvm/org/apache/storm/Config.java        |   8 +
 .../org/apache/storm/daemon/worker/Worker.java  |   9 +-
 .../org/apache/storm/messaging/IConnection.java |   8 -
 .../apache/storm/messaging/local/Context.java   |  19 ---
 .../messaging/netty/BackPressureStatus.java     |  11 +-
 .../netty/BackPressureStatusEncoder.java        |  37 +++++
 .../apache/storm/messaging/netty/Client.java    | 158 ++++++++-----------
 .../apache/storm/messaging/netty/Context.java   |  31 ++--
 .../storm/messaging/netty/ControlMessage.java   |  32 ++--
 .../messaging/netty/INettySerializable.java     |   9 +-
 .../storm/messaging/netty/ISaslClient.java      |   6 +-
 .../storm/messaging/netty/ISaslServer.java      |   2 +-
 .../apache/storm/messaging/netty/IServer.java   |   6 +-
 .../netty/KerberosSaslClientHandler.java        | 107 ++++++-------
 .../netty/KerberosSaslNettyClientState.java     |   9 +-
 .../netty/KerberosSaslNettyServerState.java     |   9 +-
 .../netty/KerberosSaslServerHandler.java        |  53 +++----
 .../storm/messaging/netty/MessageBatch.java     |  40 +++--
 .../storm/messaging/netty/MessageDecoder.java   |  53 ++++---
 .../storm/messaging/netty/MessageEncoder.java   |  50 ------
 .../netty/NettyRenameThreadFactory.java         |  18 +--
 .../netty/NettySerializableMessageEncoder.java  |  37 +++++
 .../storm/messaging/netty/SaslMessageToken.java |  32 ++--
 .../messaging/netty/SaslNettyClientState.java   |   9 +-
 .../messaging/netty/SaslNettyServerState.java   |   9 +-
 .../messaging/netty/SaslStormClientHandler.java | 114 ++++++-------
 .../netty/SaslStormServerAuthorizeHandler.java  |  22 +--
 .../messaging/netty/SaslStormServerHandler.java |  62 ++++----
 .../apache/storm/messaging/netty/Server.java    | 145 ++++++++---------
 .../messaging/netty/StormClientHandler.java     |  35 ++--
 .../netty/StormClientPipelineFactory.java       |  22 +--
 .../messaging/netty/StormServerHandler.java     |  45 +++---
 .../netty/StormServerPipelineFactory.java       |  46 ++++--
 .../apache/storm/pacemaker/PacemakerClient.java |  68 ++++----
 .../storm/pacemaker/PacemakerClientHandler.java |  46 +++---
 .../storm/pacemaker/codec/ThriftDecoder.java    |  23 ++-
 .../storm/pacemaker/codec/ThriftEncoder.java    |  47 +++---
 .../pacemaker/codec/ThriftNettyClientCodec.java |  29 ++--
 .../org/apache/storm/utils/TransferDrainer.java |  73 ++-------
 .../storm/PaceMakerStateStorageFactoryTest.java |  62 ++++----
 storm-core/pom.xml                              | 134 ++++++++--------
 .../apache/storm/messaging/netty/NettyTest.java |  41 +++--
 storm-server/pom.xml                            |   2 +-
 .../storm/pacemaker/IServerMessageHandler.java  |   2 +-
 .../apache/storm/pacemaker/PacemakerServer.java | 108 +++++++------
 .../pacemaker/codec/ThriftNettyServerCodec.java |  34 ++--
 50 files changed, 905 insertions(+), 1047 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 4b3b1d0..f544903 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -226,8 +226,6 @@ storm.messaging.netty.buffer.low.watermark: 8388608 # 8 MB
 storm.messaging.netty.max_retries: 300
 storm.messaging.netty.max_wait_ms: 1000
 storm.messaging.netty.min_wait_ms: 100
-io.netty.noPreferDirect: false
-io.netty.allocator.type: "pooled"
 
 # If the Netty messaging layer is busy(netty internal buffer not writable), the Netty client will try to batch message as more as possible up to the size of storm.messaging.netty.transfer.batch.size bytes, otherwise it will try to flush message as soon as possible to reduce latency.
 storm.messaging.netty.transfer.batch.size: 262144

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3da30fc..009d0f0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -275,7 +275,7 @@
         <jgrapht.version>0.9.0</jgrapht.version>
         <guava.version>16.0.1</guava.version>
         <auto-service.version>1.0-rc3</auto-service.version>
-        <netty.version>4.0.33.Final</netty.version>
+        <netty.version>4.1.25.Final</netty.version>
         <sysout-over-slf4j.version>1.0.2</sysout-over-slf4j.version>
         <log4j-over-slf4j.version>1.6.6</log4j-over-slf4j.version>
         <log4j.version>2.8.2</log4j.version>
@@ -643,7 +643,7 @@
                 <java.unit.test.exclude>org.apache.storm.testing.IntegrationTest</java.unit.test.exclude>
             </properties>
         </profile> 
-	<profile>
+        <profile>
             <id>integration-tests-only</id>
             <properties>
                 <!--Java-->
@@ -937,12 +937,12 @@
                 <artifactId>guava</artifactId>
                 <version>${guava.version}</version>
             </dependency>
-                <dependency>
-                    <groupId>com.google.auto.service</groupId>
-                    <artifactId>auto-service</artifactId>
-                    <version>${auto-service.version}</version>
-                    <optional>true</optional>
-                </dependency>
+            <dependency>
+                <groupId>com.google.auto.service</groupId>
+                <artifactId>auto-service</artifactId>
+                <version>${auto-service.version}</version>
+                <optional>true</optional>
+            </dependency>
             <dependency>
                 <groupId>org.apache.logging.log4j</groupId>
                 <artifactId>log4j-api</artifactId>
@@ -1063,7 +1063,7 @@
                 <groupId>org.apache.calcite</groupId>
                 <artifactId>calcite-core</artifactId>
                 <version>${calcite.version}</version>
-            </dependency> 
+            </dependency>
 
             <!-- kafka artifact dependency needed for storm-kafka -->
             <dependency>

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/shaded-deps/pom.xml
----------------------------------------------------------------------
diff --git a/shaded-deps/pom.xml b/shaded-deps/pom.xml
index 8b33c70..97ecebd 100644
--- a/shaded-deps/pom.xml
+++ b/shaded-deps/pom.xml
@@ -112,7 +112,7 @@
         </dependency>
         <dependency>
             <groupId>io.netty</groupId>
-            <artifactId>netty</artifactId>
+            <artifactId>netty-all</artifactId>
             <optional>true</optional>
         </dependency>
         <dependency>
@@ -185,7 +185,9 @@
                             <include>commons-collections:commons-collections</include>
                             <include>commons-io:commons-io</include>
                             <include>commons-lang:commons-lang</include>
+                            <!-- Pulled in by Zookeeper -->
                             <include>io.netty:netty</include>
+                            <include>io.netty:netty-all</include>
                             <include>org.apache.curator:*</include>
                             <include>org.apache.httpcomponents:httpclient</include>
                             <include>org.apache.thrift:*</include>
@@ -252,6 +254,10 @@
                             <shadedPattern>org.apache.storm.shade.org.jboss.netty</shadedPattern>
                         </relocation>
                         <relocation>
+                            <pattern>io.netty</pattern>
+                            <shadedPattern>org.apache.storm.shade.io.netty</shadedPattern>
+                        </relocation>
+                        <relocation>
                             <pattern>org.jgrapht</pattern>
                             <shadedPattern>org.apache.storm.shade.org.jgrapht</shadedPattern>
                         </relocation>

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/pom.xml
----------------------------------------------------------------------
diff --git a/storm-client/pom.xml b/storm-client/pom.xml
index a13359d..2bf0ed1 100644
--- a/storm-client/pom.xml
+++ b/storm-client/pom.xml
@@ -172,7 +172,7 @@
                 <!--Note - the version would be inherited-->
                 <configuration>
                     <excludes>**/generated/**</excludes>
-                    <maxAllowedViolations>3298</maxAllowedViolations>
+                    <maxAllowedViolations>3285</maxAllowedViolations>
                 </configuration>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index db809f6..8dfcee9 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -875,6 +875,14 @@ public class Config extends HashMap<String, Object> {
     @isPositiveNumber
     public static final String PACEMAKER_PORT = "pacemaker.port";
     /**
+     * The maximum number of threads that should be used by the Pacemaker client.
+     * When Pacemaker gets loaded it will spawn new threads, up to
+     * this many total, to handle the load.
+     */
+    @isNumber
+    @isPositiveNumber
+    public static final String PACEMAKER_CLIENT_MAX_THREADS = "pacemaker.client.max.threads";
+    /**
      * This should be one of "DIGEST", "KERBEROS", or "NONE" Determines the mode of authentication the pacemaker server and client use. The
      * client must either match the server, or be NONE. In the case of NONE, no authentication is performed for the client, and if the
      * server is running with DIGEST or KERBEROS, the client can only write to the server (no reads). This is intended to provide a

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 1a7869e..233dfac 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -455,13 +455,9 @@ public class Worker implements Shutdownable, DaemonCommon {
             }
             LOG.info("Shut down executors");
 
-            // this is fine because the only time this is shared is when it's a local context,
-            // in which case it's a noop
-            workerState.mqContext.term();
             LOG.info("Shutting down transfer thread");
             workerState.haltWorkerTransfer();
 
-
             if (transferThread != null) {
                 transferThread.interrupt();
                 transferThread.join();
@@ -479,6 +475,11 @@ public class Worker implements Shutdownable, DaemonCommon {
             workerState.resetLogLevelsTimer.close();
             workerState.flushTupleTimer.close();
             workerState.backPressureCheckTimer.close();
+            
+            // this is fine because the only time this is shared is when it's a local context,
+            // in which case it's a noop
+            workerState.mqContext.term();
+            
             workerState.closeResources();
 
             StormMetricRegistry.stop();

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java b/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
index 93c8e8f..c2e156c 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
@@ -48,14 +48,6 @@ public interface IConnection extends AutoCloseable {
     void sendBackPressureStatus(BackPressureStatus bpStatus);
 
     /**
-     * send a message with taskId and payload
-     *
-     * @param taskId  task ID
-     * @param payload
-     */
-    void send(int taskId, byte[] payload);
-
-    /**
      * send batch messages
      *
      * @param msgs

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java b/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
index e49ac11..6071cbe 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
@@ -39,8 +39,6 @@ public class Context implements IContext {
     private static final Logger LOG = LoggerFactory.getLogger(Context.class);
     private static ConcurrentHashMap<String, LocalServer> _registry = new ConcurrentHashMap<>();
 
-    ;
-
     private static LocalServer getLocalServer(String nodeId, int port) {
         String key = nodeId + "-" + port;
         LocalServer ret = _registry.get(key);
@@ -96,11 +94,6 @@ public class Context implements IContext {
         }
 
         @Override
-        public void send(int taskId, byte[] payload) {
-            throw new IllegalArgumentException("SHOULD NOT HAPPEN");
-        }
-
-        @Override
         public void send(Iterator<TaskMessage> msgs) {
             throw new IllegalArgumentException("SHOULD NOT HAPPEN");
         }
@@ -190,18 +183,6 @@ public class Context implements IContext {
         }
 
         @Override
-        public void send(int taskId, byte[] payload) {
-            TaskMessage message = new TaskMessage(taskId, payload);
-            IConnectionCallback serverCb = _server._cb;
-            if (serverCb != null) {
-                flushPending();
-                serverCb.recv(Arrays.asList(message));
-            } else {
-                _pendingDueToUnregisteredServer.add(message);
-            }
-        }
-
-        @Override
         public void send(Iterator<TaskMessage> msgs) {
             IConnectionCallback serverCb = _server._cb;
             if (serverCb != null) {

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java
index 4798445..5ead740 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java
@@ -23,8 +23,8 @@ import java.util.Collection;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.storm.serialization.KryoValuesDeserializer;
 import org.apache.storm.serialization.KryoValuesSerializer;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBuffer;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBuffers;
+import org.apache.storm.shade.io.netty.buffer.ByteBuf;
+import org.apache.storm.shade.io.netty.buffer.ByteBufAllocator;
 
 // Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation
 public class BackPressureStatus {
@@ -42,6 +42,9 @@ public class BackPressureStatus {
         this.id = bpCount.incrementAndGet();
     }
 
+    /**
+     * Constructor.
+     */
     public BackPressureStatus(String workerId, Collection<Integer> bpTasks, Collection<Integer> nonBpTasks) {
         this.workerId = workerId;
         this.id = bpCount.incrementAndGet();
@@ -61,9 +64,9 @@ public class BackPressureStatus {
     /**
      * Encoded as -600 ... short(2) len ... int(4) payload ... byte[]     *
      */
-    public ChannelBuffer buffer(KryoValuesSerializer ser) throws IOException {
+    public ByteBuf buffer(ByteBufAllocator alloc, KryoValuesSerializer ser) throws IOException {
         byte[] serializedBytes = ser.serializeObject(this);
-        ChannelBuffer buff = ChannelBuffers.buffer(SIZE_OF_ID + SIZE_OF_INT + serializedBytes.length);
+        ByteBuf buff = alloc.ioBuffer(SIZE_OF_ID + SIZE_OF_INT + serializedBytes.length);
         buff.writeShort(IDENTIFIER);
         buff.writeInt(serializedBytes.length);
         buff.writeBytes(serializedBytes);

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatusEncoder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatusEncoder.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatusEncoder.java
new file mode 100644
index 0000000..be4fff9
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatusEncoder.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.messaging.netty;
+
+import java.util.List;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.handler.codec.MessageToMessageEncoder;
+
+public class BackPressureStatusEncoder extends MessageToMessageEncoder<BackPressureStatus> {
+
+    private final KryoValuesSerializer ser;
+
+    public BackPressureStatusEncoder(KryoValuesSerializer ser) {
+        this.ser = ser;
+    }
+
+    @Override
+    protected void encode(ChannelHandlerContext ctx, BackPressureStatus msg, List<Object> out) throws Exception {
+        out.add(msg.buffer(ctx.alloc(), ser));
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
index 91f70af..1ecdd26 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
@@ -12,14 +12,14 @@
 
 package org.apache.storm.messaging.netty;
 
+import static org.apache.storm.shade.com.google.common.base.Preconditions.checkState;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.Timer;
 import java.util.concurrent.TimeUnit;
@@ -39,44 +39,51 @@ import org.apache.storm.policy.IWaitStrategy.WAIT_SITUATION;
 import org.apache.storm.policy.WaitStrategyProgressive;
 import org.apache.storm.serialization.KryoValuesDeserializer;
 import org.apache.storm.serialization.KryoValuesSerializer;
-import org.apache.storm.shade.org.jboss.netty.bootstrap.ClientBootstrap;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelFactory;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelFuture;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelFutureListener;
-import org.apache.storm.shade.org.jboss.netty.util.HashedWheelTimer;
-import org.apache.storm.shade.org.jboss.netty.util.Timeout;
-import org.apache.storm.shade.org.jboss.netty.util.TimerTask;
+import org.apache.storm.shade.io.netty.bootstrap.Bootstrap;
+import org.apache.storm.shade.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelFuture;
+import org.apache.storm.shade.io.netty.channel.ChannelFutureListener;
+import org.apache.storm.shade.io.netty.channel.ChannelOption;
+import org.apache.storm.shade.io.netty.channel.EventLoopGroup;
+import org.apache.storm.shade.io.netty.channel.WriteBufferWaterMark;
+import org.apache.storm.shade.io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.storm.shade.io.netty.util.HashedWheelTimer;
+import org.apache.storm.shade.io.netty.util.Timeout;
+import org.apache.storm.shade.io.netty.util.TimerTask;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ReflectionUtils;
 import org.apache.storm.utils.StormBoundedExponentialBackoffRetry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.storm.shade.com.google.common.base.Preconditions.checkState;
-
 /**
  * A Netty client for sending task messages to a remote destination (Netty server).
  *
- * Implementation details:
+ * <p>Implementation details:
  *
- * - Sending messages, i.e. writing to the channel, is performed asynchronously. - Messages are sent in batches to optimize for network
- * throughput at the expense of network latency.  The message batch size is configurable. - Connecting and reconnecting are performed
- * asynchronously. - Note: The current implementation drops any messages that are being enqueued for sending if the connection to the remote
+ * <p>Sending messages, i.e. writing to the channel, is performed asynchronously. Messages are sent in batches to optimize for network
+ * throughput at the expense of network latency.  The message batch size is configurable. Connecting and reconnecting are performed
+ * asynchronously. Note: The current implementation drops any messages that are being enqueued for sending if the connection to the remote
  * destination is currently unavailable.
  */
 public class Client extends ConnectionWithStatus implements IStatefulObject, ISaslClient {
     private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
     private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;
+    /**
+     * Periodically checks for connected channel in order to avoid loss of messages.
+     */
+    private static final long CHANNEL_ALIVE_INTERVAL_MS = 30000L;
 
     private static final Logger LOG = LoggerFactory.getLogger(Client.class);
     private static final String PREFIX = "Netty-Client-";
     private static final long NO_DELAY_MS = 0L;
-    private static final Timer timer = new Timer("Netty-ChannelAlive-Timer", true);
+    private static final Timer TIMER = new Timer("Netty-ChannelAlive-Timer", true);
     protected final String dstAddressPrefixedName;
     private final Map<String, Object> topoConf;
     private final StormBoundedExponentialBackoffRetry retryPolicy;
-    private final ClientBootstrap bootstrap;
+    private final EventLoopGroup eventLoopGroup;
+    private final Bootstrap bootstrap;
     private final InetSocketAddress dstAddress;
 
     /**
@@ -100,10 +107,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
      */
     private final AtomicInteger messagesLost = new AtomicInteger(0);
     /**
-     * Periodically checks for connected channel in order to avoid loss of messages
-     */
-    private final long CHANNEL_ALIVE_INTERVAL_MS = 30000L;
-    /**
      * Number of messages buffered in memory.
      */
     private final AtomicLong pendingMessages = new AtomicLong(0);
@@ -123,7 +126,8 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
      */
     private volatile boolean closing = false;
 
-    Client(Map<String, Object> topoConf, AtomicBoolean[] remoteBpStatus, ChannelFactory factory, HashedWheelTimer scheduler, String host,
+    Client(Map<String, Object> topoConf, AtomicBoolean[] remoteBpStatus,
+        EventLoopGroup eventLoopGroup, HashedWheelTimer scheduler, String host,
            int port) {
         this.topoConf = topoConf;
         closing = false;
@@ -135,7 +139,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
         saslChannelReady.set(!ObjectReader.getBoolean(topoConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION), false));
         LOG.info("Creating Netty Client, connecting to {}:{}, bufferSize: {}, lowWatermark: {}, highWatermark: {}",
                  host, port, bufferSize, lowWatermark, highWatermark);
-        int messageBatchSize = ObjectReader.getInt(topoConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
 
         int maxReconnectionAttempts = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES));
         int minWaitMs = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
@@ -143,11 +146,22 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
         retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, maxWaitMs, maxReconnectionAttempts);
 
         // Initiate connection to remote destination
-        bootstrap = createClientBootstrap(factory, bufferSize, lowWatermark, highWatermark, topoConf, remoteBpStatus);
+        this.eventLoopGroup = eventLoopGroup;
+        // Initiate connection to remote destination
+        bootstrap = new Bootstrap()
+            .group(this.eventLoopGroup)
+            .channel(NioSocketChannel.class)
+            .option(ChannelOption.TCP_NODELAY, true)
+            .option(ChannelOption.SO_SNDBUF, bufferSize)
+            .option(ChannelOption.SO_KEEPALIVE, true)
+            .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(lowWatermark, highWatermark))
+            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+            .handler(new StormClientPipelineFactory(this, remoteBpStatus, topoConf));
         dstAddress = new InetSocketAddress(host, port);
         dstAddressPrefixedName = prefixedName(dstAddress);
         launchChannelAliveThread();
         scheduleConnect(NO_DELAY_MS);
+        int messageBatchSize = ObjectReader.getInt(topoConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
         batcher = new MessageBuffer(messageBatchSize);
         String clazz = (String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY);
         if (clazz == null) {
@@ -167,7 +181,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
     private void launchChannelAliveThread() {
         // netty TimerTask is already defined and hence a fully
         // qualified name
-        timer.schedule(new java.util.TimerTask() {
+        TIMER.schedule(new java.util.TimerTask() {
             public void run() {
                 try {
                     LOG.debug("running timer task, address {}", dstAddress);
@@ -183,20 +197,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
         }, 0, CHANNEL_ALIVE_INTERVAL_MS);
     }
 
-    private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize,
-                                                  int lowWatermark, int highWatermark,
-                                                  Map<String, Object> topoConf,
-                                                  AtomicBoolean[] remoteBpStatus) {
-        ClientBootstrap bootstrap = new ClientBootstrap(factory);
-        bootstrap.setOption("tcpNoDelay", true);
-        bootstrap.setOption("sendBufferSize", bufferSize);
-        bootstrap.setOption("keepAlive", true);
-        bootstrap.setOption("writeBufferLowWaterMark", lowWatermark);
-        bootstrap.setOption("writeBufferHighWaterMark", highWatermark);
-        bootstrap.setPipelineFactory(new StormClientPipelineFactory(this, remoteBpStatus, topoConf));
-        return bootstrap;
-    }
-
     private String prefixedName(InetSocketAddress dstAddress) {
         if (null != dstAddress) {
             return PREFIX + dstAddress.toString();
@@ -205,7 +205,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
     }
 
     /**
-     * Enqueue a task message to be sent to server
+     * Enqueue a task message to be sent to server.
      */
     private void scheduleConnect(long delayMs) {
         scheduler.newTimeout(new Connect(dstAddress), delayMs, TimeUnit.MILLISECONDS);
@@ -216,14 +216,10 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
     }
 
     private boolean connectionEstablished(Channel channel) {
-        // Because we are using TCP (which is a connection-oriented transport unlike UDP), a connection is only fully
-        // established iff the channel is connected.  That is, a TCP-based channel must be in the CONNECTED state before
-        // anything can be read or written to the channel.
-        //
+        // The connection is ready once the channel is active.
         // See:
-        // - http://netty.io/3.9/api/org/jboss/netty/channel/ChannelEvent.html
-        // - http://stackoverflow.com/questions/13356622/what-are-the-netty-channel-state-transitions
-        return channel != null && channel.isConnected();
+        // - http://netty.io/wiki/new-and-noteworthy-in-4.0.html#wiki-h4-19
+        return channel != null && channel.isActive();
     }
 
     /**
@@ -270,14 +266,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
         throw new RuntimeException("Client connection should not send BackPressure status");
     }
 
-    @Override
-    public void send(int taskId, byte[] payload) {
-        TaskMessage msg = new TaskMessage(taskId, payload);
-        List<TaskMessage> wrapper = new ArrayList<TaskMessage>(1);
-        wrapper.add(msg);
-        send(wrapper.iterator());
-    }
-
     /**
      * Enqueue task messages to be sent to the remote destination (cf. `host` and `port`).
      */
@@ -331,7 +319,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
                 if (idleCounter == 0) { // check avoids multiple log msgs when in a idle loop
                     LOG.debug("Experiencing Back Pressure from Netty. Entering BackPressure Wait");
                 }
-                if (!channel.isConnected()) {
+                if (!channel.isActive()) {
                     throw new IOException("Connection disconnected");
                 }
                 idleCounter = waitStrategy.idle(idleCounter);
@@ -386,9 +374,9 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
     /**
      * Asynchronously writes the message batch to the channel.
      *
-     * If the write operation fails, then we will close the channel and trigger a reconnect.
+     * <p>If the write operation fails, then we will close the channel and trigger a reconnect.
      */
-    private void flushMessages(Channel channel, final MessageBatch batch) {
+    private void flushMessages(final Channel channel, final MessageBatch batch) {
         if (null == batch || batch.isEmpty()) {
             return;
         }
@@ -397,8 +385,9 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
         LOG.debug("writing {} messages to channel {}", batch.size(), channel.toString());
         pendingMessages.addAndGet(numMessages);
 
-        ChannelFuture future = channel.write(batch);
+        ChannelFuture future = channel.writeAndFlush(batch);
         future.addListener(new ChannelFutureListener() {
+            @Override
             public void operationComplete(ChannelFuture future) throws Exception {
                 pendingMessages.addAndGet(0 - numMessages);
                 if (future.isSuccess()) {
@@ -406,8 +395,8 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
                     messagesSent.getAndAdd(batch.size());
                 } else {
                     LOG.error("failed to send {} messages to {}: {}", numMessages, dstAddressPrefixedName,
-                              future.getCause());
-                    closeChannelAndReconnect(future.getChannel());
+                              future.cause());
+                    closeChannelAndReconnect(future.channel());
                     messagesLost.getAndAdd(numMessages);
                 }
             }
@@ -417,9 +406,9 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
 
     /**
      * Schedule a reconnect if we closed a non-null channel, and acquired the right to provide a replacement by successfully setting a null
-     * to the channel field
+     * to the channel field.
      *
-     * @param channel
+     * @param channel the channel to close
      * @return if the call scheduled a re-connect task
      */
     private boolean closeChannelAndReconnect(Channel channel) {
@@ -461,8 +450,8 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
             try {
                 long deltaMs = System.currentTimeMillis() - startMs;
                 if (deltaMs > PENDING_MESSAGES_FLUSH_TIMEOUT_MS) {
-                    LOG.error("failed to send all pending messages to {} within timeout, {} of {} messages were not " +
-                              "sent", dstAddressPrefixedName, pendingMessages.get(), totalPendingMsgs);
+                    LOG.error("failed to send all pending messages to {} within timeout, {} of {} messages were not "
+                        + "sent", dstAddressPrefixedName, pendingMessages.get(), totalPendingMsgs);
                     break;
                 }
                 Thread.sleep(PENDING_MESSAGES_FLUSH_INTERVAL_MS);
@@ -488,7 +477,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
     @Override
     public Map<Integer, Load> getLoad(Collection<Integer> tasks) {
         Map<Integer, Double> loadCache = serverLoad;
-        Map<Integer, Load> ret = new HashMap<Integer, Load>();
+        Map<Integer, Load> ret = new HashMap<>();
         if (loadCache != null) {
             double clientLoad = Math.min(pendingMessages.get(), 1024) / 1024.0;
             for (Integer task : tasks) {
@@ -521,34 +510,26 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
         return topoConf;
     }
 
-    /**
-     * ISaslClient interface
-     **/
-    public void channelConnected(Channel channel) {
-        //        setChannel(channel);
-    }
-
-    public void channelReady() {
+    @Override
+    public void channelReady(Channel channel) {
         saslChannelReady.set(true);
     }
 
+    @Override
     public String name() {
         return (String) topoConf.get(Config.TOPOLOGY_NAME);
     }
 
+    @Override
     public String secretKey() {
         return SaslUtils.getSecretKey(topoConf);
     }
 
-    /**
-     * end
-     **/
-
     private String srcAddressName() {
         String name = null;
         Channel channel = channelRef.get();
         if (channel != null) {
-            SocketAddress address = channel.getLocalAddress();
+            SocketAddress address = channel.localAddress();
             if (address != null) {
                 name = address.toString();
             }
@@ -562,15 +543,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
     }
 
     /**
-     * Called by Netty thread on change in channel interest
-     *
-     * @param channel
-     */
-    public void notifyInterestChanged(Channel channel) {
-        // NOOP since we are checking channel.isWritable in writeMessage
-    }
-
-    /**
      * Asynchronously establishes a Netty connection to the remote address This task runs on a single thread shared among all clients, and
      * thus should not perform operations that block.
      */
@@ -604,7 +576,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
                     @Override
                     public void operationComplete(ChannelFuture future) throws Exception {
                         // This call returns immediately
-                        Channel newChannel = future.getChannel();
+                        Channel newChannel = future.channel();
 
                         if (future.isSuccess() && connectionEstablished(newChannel)) {
                             boolean setChannel = channelRef.compareAndSet(null, newChannel);
@@ -616,7 +588,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
                                          messagesLost.get());
                             }
                         } else {
-                            Throwable cause = future.getCause();
+                            Throwable cause = future.cause();
                             reschedule(cause);
                             if (newChannel != null) {
                                 newChannel.close();
@@ -626,8 +598,8 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
                 });
             } else {
                 close();
-                throw new RuntimeException("Giving up to scheduleConnect to " + dstAddressPrefixedName + " after " +
-                                           connectionAttempts + " failed attempts. " + messagesLost.get() + " messages were lost");
+                throw new RuntimeException("Giving up to scheduleConnect to " + dstAddressPrefixedName + " after "
+                    + connectionAttempts + " failed attempts. " + messagesLost.get() + " messages were lost");
 
             }
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
index e3bc8a5..27ccd04 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
@@ -15,40 +15,36 @@ package org.apache.storm.messaging.netty;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.storm.Config;
 import org.apache.storm.messaging.IConnection;
 import org.apache.storm.messaging.IContext;
-import org.apache.storm.shade.org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.apache.storm.shade.org.jboss.netty.util.HashedWheelTimer;
+import org.apache.storm.shade.io.netty.channel.EventLoopGroup;
+import org.apache.storm.shade.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.storm.shade.io.netty.util.HashedWheelTimer;
 import org.apache.storm.utils.ObjectReader;
 
 public class Context implements IContext {
     private Map<String, Object> topoConf;
     private List<Server> serverConnections;
-    private NioClientSocketChannelFactory clientChannelFactory;
+    private EventLoopGroup workerEventLoopGroup;
     private HashedWheelTimer clientScheduleService;
 
     /**
      * initialization per Storm configuration
      */
+    @Override
     public void prepare(Map<String, Object> topoConf) {
         this.topoConf = topoConf;
         serverConnections = new ArrayList<>();
 
-        //each context will have a single client channel factory
+        //each context will have a single client channel worker event loop group
         int maxWorkers = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
-        ThreadFactory bossFactory = new NettyRenameThreadFactory("client" + "-boss");
         ThreadFactory workerFactory = new NettyRenameThreadFactory("client" + "-worker");
-        if (maxWorkers > 0) {
-            clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
-                                                                     Executors.newCachedThreadPool(workerFactory), maxWorkers);
-        } else {
-            clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
-                                                                     Executors.newCachedThreadPool(workerFactory));
-        }
+        // 0 means DEFAULT_EVENT_LOOP_THREADS
+        // https://github.com/netty/netty/blob/netty-4.1.24.Final/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java#L40
+        this.workerEventLoopGroup = new NioEventLoopGroup(maxWorkers > 0 ? maxWorkers : 0, workerFactory);
 
         clientScheduleService = new HashedWheelTimer(new NettyRenameThreadFactory("client-schedule-service"));
     }
@@ -56,6 +52,7 @@ public class Context implements IContext {
     /**
      * establish a server with a binding port
      */
+    @Override
     public synchronized IConnection bind(String storm_id, int port) {
         Server server = new Server(topoConf, port);
         serverConnections.add(server);
@@ -65,14 +62,16 @@ public class Context implements IContext {
     /**
      * establish a connection to a remote server
      */
+    @Override
     public IConnection connect(String storm_id, String host, int port, AtomicBoolean[] remoteBpStatus) {
-        return new Client(topoConf, remoteBpStatus, clientChannelFactory,
+        return new Client(topoConf, remoteBpStatus, workerEventLoopGroup,
                                         clientScheduleService, host, port);
     }
 
     /**
      * terminate this context
      */
+    @Override
     public synchronized void term() {
         clientScheduleService.stop();
 
@@ -81,8 +80,8 @@ public class Context implements IContext {
         }
         serverConnections = null;
 
-        //we need to release resources associated with client channel factory
-        clientChannelFactory.releaseExternalResources();
+        //we need to release resources associated with the worker event loop group
+        workerEventLoopGroup.shutdownGracefully().awaitUninterruptibly();
 
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/ControlMessage.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/ControlMessage.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/ControlMessage.java
index c8b8c2c..3836faf 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/ControlMessage.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/ControlMessage.java
@@ -12,10 +12,8 @@
 
 package org.apache.storm.messaging.netty;
 
-import java.io.IOException;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBuffer;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBufferOutputStream;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBuffers;
+import org.apache.storm.shade.io.netty.buffer.ByteBuf;
+import org.apache.storm.shade.io.netty.buffer.Unpooled;
 
 public enum ControlMessage implements INettySerializable {
     CLOSE_MESSAGE((short) -100),
@@ -25,7 +23,7 @@ public enum ControlMessage implements INettySerializable {
     SASL_TOKEN_MESSAGE_REQUEST((short) -202),
     SASL_COMPLETE_REQUEST((short) -203);
 
-    private short code;
+    private final short code;
 
     //private constructor
     private ControlMessage(short code) {
@@ -46,27 +44,21 @@ public enum ControlMessage implements INettySerializable {
     }
 
     public static ControlMessage read(byte[] serial) {
-        ChannelBuffer cm_buffer = ChannelBuffers.copiedBuffer(serial);
+        ByteBuf cm_buffer = Unpooled.wrappedBuffer(serial);
+        try{
         return mkMessage(cm_buffer.getShort(0));
+        } finally {
+            cm_buffer.release();
+        }
     }
 
+    @Override
     public int encodeLength() {
         return 2; //short
     }
 
-    /**
-     * encode the current Control Message into a channel buffer
-     *
-     * @throws IOException
-     */
-    public ChannelBuffer buffer() throws IOException {
-        ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encodeLength()));
-        write(bout);
-        bout.close();
-        return bout.buffer();
-    }
-
-    public void write(ChannelBufferOutputStream bout) throws IOException {
-        bout.writeShort(code);
+    @Override
+    public void write(ByteBuf buf) {
+        buf.writeShort(code);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/INettySerializable.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/INettySerializable.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/INettySerializable.java
index a804abc..a12a96c 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/INettySerializable.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/INettySerializable.java
@@ -12,11 +12,14 @@
 
 package org.apache.storm.messaging.netty;
 
-import java.io.IOException;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBuffer;
+import org.apache.storm.shade.io.netty.buffer.ByteBuf;
 
 public interface INettySerializable {
-    ChannelBuffer buffer() throws IOException;
+    /**
+     * Serialize this object to ByteBuf.
+     * @param dest The ByteBuf to serialize to
+     */
+    void write(ByteBuf dest);
 
     int encodeLength();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslClient.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslClient.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslClient.java
index 3ebe0dc..813448a 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslClient.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslClient.java
@@ -12,12 +12,10 @@
 
 package org.apache.storm.messaging.netty;
 
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.Channel;
 
 public interface ISaslClient {
-    void channelConnected(Channel channel);
-
-    void channelReady();
+    void channelReady(Channel channel);
 
     String name();
 

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslServer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslServer.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslServer.java
index 32af385..07f47a5 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslServer.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslServer.java
@@ -12,7 +12,7 @@
 
 package org.apache.storm.messaging.netty;
 
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.Channel;
 
 public interface ISaslServer extends IServer {
     String name();

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/IServer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/IServer.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/IServer.java
index 977a38b..4b095d6 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/IServer.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/IServer.java
@@ -12,12 +12,10 @@
 
 package org.apache.storm.messaging.netty;
 
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.Channel;
 
 public interface IServer {
-    void channelConnected(Channel c);
+    void channelActive(Channel c);
 
     void received(Object message, String remote, Channel channel) throws InterruptedException;
-
-    void closeChannel(Channel c);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java
index 5dbf4b6..472f45b 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java
@@ -14,27 +14,24 @@ package org.apache.storm.messaging.netty;
 
 import java.io.IOException;
 import java.util.Map;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelStateEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.Channels;
-import org.apache.storm.shade.org.jboss.netty.channel.MessageEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.channel.ChannelInboundHandlerAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class KerberosSaslClientHandler extends SimpleChannelUpstreamHandler {
+public class KerberosSaslClientHandler extends ChannelInboundHandlerAdapter {
 
     private static final Logger LOG = LoggerFactory
         .getLogger(KerberosSaslClientHandler.class);
-    long start_time;
-    private ISaslClient client;
+    private final long start_time;
+    private final ISaslClient client;
     /**
      * Used for client or server's token to send or receive from each other.
      */
-    private Map<String, Object> topoConf;
-    private String jaas_section;
-    private String host;
+    private final Map<String, Object> topoConf;
+    private final String jaas_section;
+    private final String host;
 
     public KerberosSaslClientHandler(ISaslClient client, Map<String, Object> topoConf, String jaas_section, String host) throws
         IOException {
@@ -46,85 +43,89 @@ public class KerberosSaslClientHandler extends SimpleChannelUpstreamHandler {
     }
 
     @Override
-    public void channelConnected(ChannelHandlerContext ctx,
-                                 ChannelStateEvent event) {
+    public void channelActive(ChannelHandlerContext ctx) {
         // register the newly established channel
-        Channel channel = ctx.getChannel();
-        client.channelConnected(channel);
+        Channel channel = ctx.channel();
 
         LOG.info("Connection established from {} to {}",
-                 channel.getLocalAddress(), channel.getRemoteAddress());
+                 channel.localAddress(), channel.remoteAddress());
 
         try {
-            KerberosSaslNettyClient saslNettyClient = KerberosSaslNettyClientState.getKerberosSaslNettyClient
-                .get(channel);
+            KerberosSaslNettyClient saslNettyClient = channel.attr(KerberosSaslNettyClientState.KERBEROS_SASL_NETTY_CLIENT).get();
 
             if (saslNettyClient == null) {
                 LOG.debug("Creating saslNettyClient now for channel: {}",
                           channel);
                 saslNettyClient = new KerberosSaslNettyClient(topoConf, jaas_section, host);
-                KerberosSaslNettyClientState.getKerberosSaslNettyClient.set(channel,
-                                                                            saslNettyClient);
+                channel.attr(KerberosSaslNettyClientState.KERBEROS_SASL_NETTY_CLIENT).set(saslNettyClient);
             }
             LOG.debug("Going to initiate Kerberos negotiations.");
             byte[] initialChallenge = saslNettyClient.saslResponse(new SaslMessageToken(new byte[0]));
             LOG.debug("Sending initial challenge: {}", initialChallenge);
-            channel.write(new SaslMessageToken(initialChallenge));
+            channel.writeAndFlush(new SaslMessageToken(initialChallenge), channel.voidPromise());
         } catch (Exception e) {
             LOG.error("Failed to authenticate with server due to error: ",
                       e);
         }
-        return;
-
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent event)
-        throws Exception {
-        LOG.debug("send/recv time (ms): {}",
-                  (System.currentTimeMillis() - start_time));
+    public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception {
+        LOG.debug("send/recv time (ms): {}", (System.currentTimeMillis() - start_time));
 
-        Channel channel = ctx.getChannel();
+        // examine the response message from server
+        if (message instanceof ControlMessage) {
+            handleControlMessage(ctx, (ControlMessage) message);
+        } else if (message instanceof SaslMessageToken) {
+            handleSaslMessageToken(ctx, (SaslMessageToken) message);
+        } else {
+            LOG.error("Unexpected message from server: {}", message);
+        }
+    }
 
+    private KerberosSaslNettyClient getChannelSaslClient(Channel channel) throws Exception {
         // Generate SASL response to server using Channel-local SASL client.
-        KerberosSaslNettyClient saslNettyClient = KerberosSaslNettyClientState.getKerberosSaslNettyClient
-            .get(channel);
+        KerberosSaslNettyClient saslNettyClient = channel.attr(KerberosSaslNettyClientState.KERBEROS_SASL_NETTY_CLIENT).get();
         if (saslNettyClient == null) {
             throw new Exception("saslNettyClient was unexpectedly null for channel:" + channel);
         }
+        return saslNettyClient;
+    }
 
-        // examine the response message from server
-        if (event.getMessage() instanceof ControlMessage) {
-            ControlMessage msg = (ControlMessage) event.getMessage();
-            if (msg == ControlMessage.SASL_COMPLETE_REQUEST) {
+    private void handleControlMessage(ChannelHandlerContext ctx, ControlMessage controlMessage) throws Exception {
+        Channel channel = ctx.channel();
+        KerberosSaslNettyClient saslNettyClient = getChannelSaslClient(channel);
+        if (controlMessage == ControlMessage.SASL_COMPLETE_REQUEST) {
                 LOG.debug("Server has sent us the SaslComplete message. Allowing normal work to proceed.");
 
                 if (!saslNettyClient.isComplete()) {
-                    String message = "Server returned a Sasl-complete message, but as far as we can tell, we are not authenticated yet.";
-                    LOG.error(message);
-                    throw new Exception(message);
+                String errorMessage =
+                    "Server returned a Sasl-complete message, but as far as we can tell, we are not authenticated yet.";
+                LOG.error(errorMessage);
+                throw new Exception(errorMessage);
                 }
-                ctx.getPipeline().remove(this);
-                this.client.channelReady();
+            ctx.pipeline().remove(this);
+            this.client.channelReady(channel);
 
-                // We call fireMessageReceived since the client is allowed to
+            // We call fireChannelRead since the client is allowed to
                 // perform this request. The client's request will now proceed
                 // to the next pipeline component namely StormClientHandler.
-                Channels.fireMessageReceived(ctx, msg);
+            ctx.fireChannelRead(controlMessage);
             } else {
-                LOG.warn("Unexpected control message: {}", msg);
+            LOG.warn("Unexpected control message: {}", controlMessage);
             }
-            return;
-        } else if (event.getMessage() instanceof SaslMessageToken) {
-            SaslMessageToken saslTokenMessage = (SaslMessageToken) event
-                .getMessage();
+    }
+
+    private void handleSaslMessageToken(ChannelHandlerContext ctx, SaslMessageToken saslMessageToken) throws Exception {
+        Channel channel = ctx.channel();
+        KerberosSaslNettyClient saslNettyClient = getChannelSaslClient(channel);
             LOG.debug("Responding to server's token of length: {}",
-                      saslTokenMessage.getSaslToken().length);
+            saslMessageToken.getSaslToken().length);
 
             // Generate SASL response (but we only actually send the response if
             // it's non-null.
             byte[] responseToServer = saslNettyClient
-                .saslResponse(saslTokenMessage);
+            .saslResponse(saslMessageToken);
             if (responseToServer == null) {
                 // If we generate a null response, then authentication has completed
                 // (if not, warn), and return without sending a response back to the
@@ -134,18 +135,14 @@ public class KerberosSaslClientHandler extends SimpleChannelUpstreamHandler {
                     LOG.warn("Generated a null response, but authentication is not complete.");
                     throw new Exception("Our reponse to the server is null, but as far as we can tell, we are not authenticated yet.");
                 }
-                this.client.channelReady();
-                return;
+            this.client.channelReady(channel);
             } else {
                 LOG.debug("Response to server token has length: {}",
                           responseToServer.length);
-            }
             // Construct a message containing the SASL response and send it to the
             // server.
             SaslMessageToken saslResponse = new SaslMessageToken(responseToServer);
-            channel.write(saslResponse);
-        } else {
-            LOG.error("Unexpected message from server: {}", event.getMessage());
+            channel.writeAndFlush(saslResponse, channel.voidPromise());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClientState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClientState.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClientState.java
index fbfd785..29746ff 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClientState.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClientState.java
@@ -12,15 +12,10 @@
 
 package org.apache.storm.messaging.netty;
 
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelLocal;
+import org.apache.storm.shade.io.netty.util.AttributeKey;
 
 final class KerberosSaslNettyClientState {
 
-    public static final ChannelLocal<KerberosSaslNettyClient> getKerberosSaslNettyClient = new ChannelLocal<KerberosSaslNettyClient>() {
-        protected KerberosSaslNettyClient initialValue(Channel channel) {
-            return null;
-        }
-    };
+    public static final AttributeKey<KerberosSaslNettyClient> KERBEROS_SASL_NETTY_CLIENT = AttributeKey.valueOf("kerberos.sasl.netty.client");
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServerState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServerState.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServerState.java
index 302903d..1816ffc 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServerState.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServerState.java
@@ -12,14 +12,9 @@
 
 package org.apache.storm.messaging.netty;
 
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelLocal;
+import org.apache.storm.shade.io.netty.util.AttributeKey;
 
 final class KerberosSaslNettyServerState {
 
-    public static final ChannelLocal<KerberosSaslNettyServer> getKerberosSaslNettyServer = new ChannelLocal<KerberosSaslNettyServer>() {
-        protected KerberosSaslNettyServer initialValue(Channel channel) {
-            return null;
-        }
-    };
+    public static final AttributeKey<KerberosSaslNettyServer> KERBOROS_SASL_NETTY_SERVER = AttributeKey.valueOf("kerboros.sasl.netty.server");
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslServerHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslServerHandler.java
index da94d0b..0356538 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslServerHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslServerHandler.java
@@ -15,26 +15,23 @@ package org.apache.storm.messaging.netty;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.channel.Channels;
-import org.apache.storm.shade.org.jboss.netty.channel.ExceptionEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.MessageEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.channel.ChannelInboundHandlerAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class KerberosSaslServerHandler extends SimpleChannelUpstreamHandler {
+public class KerberosSaslServerHandler extends ChannelInboundHandlerAdapter {
 
     private static final Logger LOG = LoggerFactory
         .getLogger(KerberosSaslServerHandler.class);
-    ISaslServer server;
+    private final ISaslServer server;
     /**
      * Used for client or server's token to send or receive from each other.
      */
-    private Map<String, Object> topoConf;
-    private String jaas_section;
-    private List<String> authorizedUsers;
+    private final Map<String, Object> topoConf;
+    private final String jaas_section;
+    private final List<String> authorizedUsers;
 
     public KerberosSaslServerHandler(ISaslServer server, Map<String, Object> topoConf, String jaas_section,
                                      List<String> authorizedUsers) throws IOException {
@@ -45,15 +42,12 @@ public class KerberosSaslServerHandler extends SimpleChannelUpstreamHandler {
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-        throws Exception {
-        Object msg = e.getMessage();
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         if (msg == null) {
             return;
         }
 
-        Channel channel = ctx.getChannel();
-
+        Channel channel = ctx.channel();
 
         if (msg instanceof SaslMessageToken) {
             // initialize server-side SASL functionality, if we haven't yet
@@ -62,22 +56,20 @@ public class KerberosSaslServerHandler extends SimpleChannelUpstreamHandler {
             try {
                 LOG.debug("Got SaslMessageToken!");
 
-                KerberosSaslNettyServer saslNettyServer = KerberosSaslNettyServerState.getKerberosSaslNettyServer
-                    .get(channel);
+                KerberosSaslNettyServer saslNettyServer = channel.attr(KerberosSaslNettyServerState.KERBOROS_SASL_NETTY_SERVER).get();
                 if (saslNettyServer == null) {
                     LOG.debug("No saslNettyServer for {}  yet; creating now, with topology token: ", channel);
                     try {
                         saslNettyServer = new KerberosSaslNettyServer(topoConf, jaas_section, authorizedUsers);
-                        KerberosSaslNettyServerState.getKerberosSaslNettyServer.set(channel,
-                                                                                    saslNettyServer);
+                        channel.attr(KerberosSaslNettyServerState.KERBOROS_SASL_NETTY_SERVER).set(saslNettyServer);
                     } catch (RuntimeException ioe) {
                         LOG.error("Error occurred while creating saslNettyServer on server {} for client {}",
-                                  channel.getLocalAddress(), channel.getRemoteAddress());
+                                  channel.localAddress(), channel.remoteAddress());
                         throw ioe;
                     }
                 } else {
                     LOG.debug("Found existing saslNettyServer on server: {} for client {}",
-                              channel.getLocalAddress(), channel.getRemoteAddress());
+                              channel.localAddress(), channel.remoteAddress());
                 }
 
                 byte[] responseBytes = saslNettyServer.response(((SaslMessageToken) msg)
@@ -86,10 +78,10 @@ public class KerberosSaslServerHandler extends SimpleChannelUpstreamHandler {
                 SaslMessageToken saslTokenMessageRequest = new SaslMessageToken(responseBytes);
 
                 if (saslTokenMessageRequest.getSaslToken() == null) {
-                    channel.write(ControlMessage.SASL_COMPLETE_REQUEST);
+                    channel.writeAndFlush(ControlMessage.SASL_COMPLETE_REQUEST, channel.voidPromise());
                 } else {
                     // Send response to client.
-                    channel.write(saslTokenMessageRequest);
+                    channel.writeAndFlush(saslTokenMessageRequest, channel.voidPromise());
                 }
 
                 if (saslNettyServer.isComplete()) {
@@ -97,12 +89,11 @@ public class KerberosSaslServerHandler extends SimpleChannelUpstreamHandler {
                     // SASL-Complete message to the client.
                     LOG.info("SASL authentication is complete for client with username: {}",
                              saslNettyServer.getUserName());
-                    channel.write(ControlMessage.SASL_COMPLETE_REQUEST);
+                    channel.writeAndFlush(ControlMessage.SASL_COMPLETE_REQUEST, channel.voidPromise());
                     LOG.debug("Removing SaslServerHandler from pipeline since SASL authentication is complete.");
-                    ctx.getPipeline().remove(this);
+                    ctx.pipeline().remove(this);
                     server.authenticated(channel);
                 }
-                return;
             } catch (Exception ex) {
                 LOG.error("Failed to handle SaslMessageToken: ", ex);
                 throw ex;
@@ -115,15 +106,13 @@ public class KerberosSaslServerHandler extends SimpleChannelUpstreamHandler {
             // authentication has not completed.
             LOG.warn("Sending upstream an unexpected non-SASL message : {}",
                      msg);
-            Channels.fireMessageReceived(ctx, msg);
+            ctx.fireChannelRead(msg);
         }
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-        if (server != null) {
-            server.closeChannel(e.getChannel());
-        }
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        ctx.close();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageBatch.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageBatch.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageBatch.java
index acf3893..e29d43c 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageBatch.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageBatch.java
@@ -14,13 +14,12 @@ package org.apache.storm.messaging.netty;
 
 import java.util.ArrayList;
 import org.apache.storm.messaging.TaskMessage;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBuffer;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBufferOutputStream;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBuffers;
+import org.apache.storm.shade.io.netty.buffer.ByteBuf;
 
-class MessageBatch {
-    private int buffer_size;
-    private ArrayList<TaskMessage> msgs;
+class MessageBatch implements INettySerializable {
+
+    private final int buffer_size;
+    private final ArrayList<TaskMessage> msgs;
     private int encoded_length;
 
     MessageBatch(int buffer_size) {
@@ -38,7 +37,6 @@ class MessageBatch {
         encoded_length += msgEncodeLength(msg);
     }
 
-
     private int msgEncodeLength(TaskMessage taskMsg) {
         if (taskMsg == null) {
             return 0;
@@ -72,30 +70,30 @@ class MessageBatch {
         return msgs.size();
     }
 
+    @Override
+    public int encodeLength() {
+        return encoded_length;
+    }
+    
     /**
      * create a buffer containing the encoding of this batch
      */
-    ChannelBuffer buffer() throws Exception {
-        ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encoded_length));
-
+    @Override
+    public void write(ByteBuf dest) {
         for (TaskMessage msg : msgs) {
-            writeTaskMessage(bout, msg);
+            writeTaskMessage(dest, msg);
         }
 
         //add a END_OF_BATCH indicator
-        ControlMessage.EOB_MESSAGE.write(bout);
-
-        bout.close();
-
-        return bout.buffer();
+        ControlMessage.EOB_MESSAGE.write(dest);
     }
 
     /**
-     * write a TaskMessage into a stream
+     * write a TaskMessage into a buffer
      *
      * Each TaskMessage is encoded as: task ... short(2) len ... int(4) payload ... byte[]     *
      */
-    private void writeTaskMessage(ChannelBufferOutputStream bout, TaskMessage message) throws Exception {
+    private void writeTaskMessage(ByteBuf buf, TaskMessage message) {
         int payload_len = 0;
         if (message.message() != null) {
             payload_len = message.message().length;
@@ -106,10 +104,10 @@ class MessageBatch {
             throw new RuntimeException("Task ID should not exceed " + Short.MAX_VALUE);
         }
 
-        bout.writeShort((short) task_id);
-        bout.writeInt(payload_len);
+        buf.writeShort((short) task_id);
+        buf.writeInt(payload_len);
         if (payload_len > 0) {
-            bout.write(message.message());
+            buf.writeBytes(message.message());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java
index 5b800f1..fcd8f0e 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java
@@ -16,14 +16,13 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.storm.messaging.TaskMessage;
 import org.apache.storm.serialization.KryoValuesDeserializer;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBuffer;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.handler.codec.frame.FrameDecoder;
+import org.apache.storm.shade.io.netty.buffer.ByteBuf;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.handler.codec.ByteToMessageDecoder;
 
-public class MessageDecoder extends FrameDecoder {
+public class MessageDecoder extends ByteToMessageDecoder {
 
-    private KryoValuesDeserializer deser;
+    private final KryoValuesDeserializer deser;
 
     public MessageDecoder(KryoValuesDeserializer deser) {
         this.deser = deser;
@@ -37,12 +36,13 @@ public class MessageDecoder extends FrameDecoder {
      *  len ... int(4)
      *  payload ... byte[]     *
      */
-    protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
         // Make sure that we have received at least a short 
         long available = buf.readableBytes();
         if (available < 2) {
             //need more data
-            return null;
+            return;
         }
 
         List<Object> ret = new ArrayList<>();
@@ -67,7 +67,8 @@ public class MessageDecoder extends FrameDecoder {
                 if (ctrl_msg == ControlMessage.EOB_MESSAGE) {
                     continue;
                 } else {
-                    return ctrl_msg;
+                    out.add(ctrl_msg);
+                    return;
                 }
             }
 
@@ -77,28 +78,30 @@ public class MessageDecoder extends FrameDecoder {
                 if (buf.readableBytes() < 4) {
                     //need more data
                     buf.resetReaderIndex();
-                    return null;
+                    return;
                 }
 
                 // Read the length field.
                 int length = buf.readInt();
                 if (length <= 0) {
-                    return new SaslMessageToken(null);
+                    out.add(new SaslMessageToken(null));
+                    return;
                 }
 
                 // Make sure if there's enough bytes in the buffer.
                 if (buf.readableBytes() < length) {
                     // The whole bytes were not received yet - return null.
                     buf.resetReaderIndex();
-                    return null;
+                    return;
                 }
 
                 // There's enough bytes in the buffer. Read it.  
-                ChannelBuffer payload = buf.readBytes(length);
-
+                byte[] bytes = new byte[length];
+                buf.readBytes(bytes);
                 // Successfully decoded a frame.
                 // Return a SaslTokenMessageRequest object
-                return new SaslMessageToken(payload.array());
+                out.add(new SaslMessageToken(bytes));
+                return;
             }
 
             // case 3: BackPressureStatus
@@ -107,18 +110,18 @@ public class MessageDecoder extends FrameDecoder {
                 if (available < 4) {
                     //Need  more data
                     buf.resetReaderIndex();
-                    return null;
+                    return;
                 }
                 int dataLen = buf.readInt();
                 if (available < 4 + dataLen) {
                     // need more data
                     buf.resetReaderIndex();
-                    return null;
+                    return;
                 }
                 byte[] bytes = new byte[dataLen];
                 buf.readBytes(bytes);
-                return BackPressureStatus.read(bytes, deser);
-
+                out.add(BackPressureStatus.read(bytes, deser));
+                return;
             }
 
             // case 4: task Message
@@ -149,18 +152,16 @@ public class MessageDecoder extends FrameDecoder {
             available -= length;
 
             // There's enough bytes in the buffer. Read it.
-            ChannelBuffer payload = buf.readBytes(length);
-
+            byte[] bytes = new byte[length];
+            buf.readBytes(bytes);
 
             // Successfully decoded a frame.
             // Return a TaskMessage object
-            ret.add(new TaskMessage(code, payload.array()));
+            ret.add(new TaskMessage(code, bytes));
         }
 
-        if (ret.size() == 0) {
-            return null;
-        } else {
-            return ret;
+        if (!ret.isEmpty()) {
+            out.add(ret);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageEncoder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageEncoder.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageEncoder.java
deleted file mode 100644
index d2e528c..0000000
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageEncoder.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.messaging.netty;
-
-import org.apache.storm.serialization.KryoValuesSerializer;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
-
-public class MessageEncoder extends OneToOneEncoder {
-
-    private KryoValuesSerializer ser;
-
-    public MessageEncoder(KryoValuesSerializer ser) {
-        this.ser = ser;
-    }
-
-    @Override
-    protected Object encode(ChannelHandlerContext ctx, Channel channel, Object obj) throws Exception {
-        if (obj instanceof ControlMessage) {
-            return ((ControlMessage) obj).buffer();
-        }
-
-        if (obj instanceof MessageBatch) {
-            return ((MessageBatch) obj).buffer();
-        }
-
-        if (obj instanceof BackPressureStatus) {
-            return ((BackPressureStatus) obj).buffer(ser);
-        }
-
-        if (obj instanceof SaslMessageToken) {
-            return ((SaslMessageToken) obj).buffer();
-        }
-
-        throw new RuntimeException("Unsupported encoding of object of class " + obj.getClass().getName());
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyRenameThreadFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyRenameThreadFactory.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyRenameThreadFactory.java
index 3fe98c4..f66510e 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyRenameThreadFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyRenameThreadFactory.java
@@ -14,21 +14,14 @@ package org.apache.storm.messaging.netty;
 
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.storm.shade.org.jboss.netty.util.ThreadNameDeterminer;
-import org.apache.storm.shade.org.jboss.netty.util.ThreadRenamingRunnable;
 
 public class NettyRenameThreadFactory implements ThreadFactory {
 
-    static final NettyUncaughtExceptionHandler uncaughtExceptionHandler = new NettyUncaughtExceptionHandler();
+    private static final NettyUncaughtExceptionHandler UNCAUGHT_EXCEPTION_HANDLER = new NettyUncaughtExceptionHandler();
 
-    static {
-        //Rename Netty threads
-        ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
-    }
-
-    final ThreadGroup group;
-    final AtomicInteger index = new AtomicInteger(1);
-    final String name;
+    private final ThreadGroup group;
+    private final AtomicInteger index = new AtomicInteger(1);
+    private final String name;
 
     public NettyRenameThreadFactory(String name) {
         SecurityManager s = System.getSecurityManager();
@@ -37,6 +30,7 @@ public class NettyRenameThreadFactory implements ThreadFactory {
         this.name = name;
     }
 
+    @Override
     public Thread newThread(Runnable r) {
         Thread t = new Thread(group, r, name + "-" + index.getAndIncrement(), 0);
         if (t.isDaemon()) {
@@ -45,7 +39,7 @@ public class NettyRenameThreadFactory implements ThreadFactory {
         if (t.getPriority() != Thread.NORM_PRIORITY) {
             t.setPriority(Thread.NORM_PRIORITY);
         }
-        t.setUncaughtExceptionHandler(uncaughtExceptionHandler);
+        t.setUncaughtExceptionHandler(UNCAUGHT_EXCEPTION_HANDLER);
         return t;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/NettySerializableMessageEncoder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/NettySerializableMessageEncoder.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/NettySerializableMessageEncoder.java
new file mode 100644
index 0000000..0eab349
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/NettySerializableMessageEncoder.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.shade.io.netty.buffer.ByteBuf;
+import org.apache.storm.shade.io.netty.channel.ChannelHandler;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.handler.codec.MessageToByteEncoder;
+
+@ChannelHandler.Sharable
+public class NettySerializableMessageEncoder extends MessageToByteEncoder<INettySerializable> {
+
+    public static final NettySerializableMessageEncoder INSTANCE = new NettySerializableMessageEncoder();
+    
+    private NettySerializableMessageEncoder() {}
+    
+    @Override
+    protected void encode(ChannelHandlerContext ctx, INettySerializable msg, ByteBuf out) throws Exception {
+        msg.write(out);
+    }
+
+    @Override
+    protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, INettySerializable msg, boolean preferDirect) throws Exception {
+        return ctx.alloc().ioBuffer(msg.encodeLength());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslMessageToken.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslMessageToken.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslMessageToken.java
index cafc109..669c88a 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslMessageToken.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslMessageToken.java
@@ -12,10 +12,8 @@
 
 package org.apache.storm.messaging.netty;
 
-import java.io.IOException;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBuffer;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBufferOutputStream;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBuffers;
+import org.apache.storm.shade.io.netty.buffer.ByteBuf;
+import org.apache.storm.shade.io.netty.buffer.Unpooled;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,7 +50,8 @@ public class SaslMessageToken implements INettySerializable {
     }
 
     public static SaslMessageToken read(byte[] serial) {
-        ChannelBuffer sm_buffer = ChannelBuffers.copiedBuffer(serial);
+        ByteBuf sm_buffer = Unpooled.wrappedBuffer(serial);
+        try {
         short identifier = sm_buffer.readShort();
         int payload_len = sm_buffer.readInt();
         if (identifier != IDENTIFIER) {
@@ -61,6 +60,9 @@ public class SaslMessageToken implements INettySerializable {
         byte token[] = new byte[payload_len];
         sm_buffer.readBytes(token, 0, payload_len);
         return new SaslMessageToken(token);
+        } finally {
+            sm_buffer.release();
+        }
     }
 
     /**
@@ -81,31 +83,29 @@ public class SaslMessageToken implements INettySerializable {
         this.token = token;
     }
 
+    @Override
     public int encodeLength() {
         return 2 + 4 + token.length;
     }
 
     /**
-     * encode the current SaslToken Message into a channel buffer SaslTokenMessageRequest is encoded as: identifier .... short(2) payload
+     * encode the current SaslToken Message into a ByteBuf.
+     * 
+     * <p>SaslTokenMessageRequest is encoded as: identifier .... short(2) payload
      * length .... int payload .... byte[]
-     *
-     * @throws IOException
      */
-    public ChannelBuffer buffer() throws IOException {
-        ChannelBufferOutputStream bout = new ChannelBufferOutputStream(
-            ChannelBuffers.directBuffer(encodeLength()));
+    @Override
+    public void write(ByteBuf dest) {
         int payload_len = 0;
         if (token != null) {
             payload_len = token.length;
         }
 
-        bout.writeShort(IDENTIFIER);
-        bout.writeInt(payload_len);
+        dest.writeShort(IDENTIFIER);
+        dest.writeInt(payload_len);
 
         if (payload_len > 0) {
-            bout.write(token);
+            dest.writeBytes(token);
         }
-        bout.close();
-        return bout.buffer();
     }
 }