You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/06/09 11:37:44 UTC

[1/5] storm git commit: STORM-1038: Upgrade to Netty 4.x. See https://github.com/apache/storm/pull/728 for the original contribution.

Repository: storm
Updated Branches:
  refs/heads/master c24cf9764 -> 4aa6b5e18


STORM-1038: Upgrade to Netty 4.x. See https://github.com/apache/storm/pull/728 for the original contribution.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d293dd32
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d293dd32
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d293dd32

Branch: refs/heads/master
Commit: d293dd32ed64669164620ce984af53441508eae5
Parents: c24cf97
Author: Hang Sun <hs...@shopzilla.com>
Authored: Fri May 11 15:57:47 2018 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Fri Jun 8 19:05:02 2018 +0200

----------------------------------------------------------------------
 conf/defaults.yaml |  3 +++
 pom.xml            | 20 ++++++++++----------
 2 files changed, 13 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d293dd32/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index a304aa5..4b3b1d0 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -226,6 +226,8 @@ storm.messaging.netty.buffer.low.watermark: 8388608 # 8 MB
 storm.messaging.netty.max_retries: 300
 storm.messaging.netty.max_wait_ms: 1000
 storm.messaging.netty.min_wait_ms: 100
+io.netty.noPreferDirect: false
+io.netty.allocator.type: "pooled"
 
 # If the Netty messaging layer is busy(netty internal buffer not writable), the Netty client will try to batch message as more as possible up to the size of storm.messaging.netty.transfer.batch.size bytes, otherwise it will try to flush message as soon as possible to reduce latency.
 storm.messaging.netty.transfer.batch.size: 262144
@@ -341,6 +343,7 @@ pacemaker.servers: []
 pacemaker.port: 6699
 pacemaker.base.threads: 10
 pacemaker.max.threads: 50
+pacemaker.client.max.threads: 2
 pacemaker.thread.timeout: 10
 pacemaker.childopts: "-Xmx1024m"
 pacemaker.auth.method: "NONE"

http://git-wip-us.apache.org/repos/asf/storm/blob/d293dd32/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8a6df9e..3da30fc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -222,7 +222,7 @@
             <id>Ethanlm</id>
             <name>Ethan Li</name>
             <email>ethanli@apache.org</email>
-            <roles>
+            <roles>	
                 <role>Committer</role>
             </roles>
             <timezone>-6</timezone>
@@ -275,7 +275,7 @@
         <jgrapht.version>0.9.0</jgrapht.version>
         <guava.version>16.0.1</guava.version>
         <auto-service.version>1.0-rc3</auto-service.version>
-        <netty.version>3.9.9.Final</netty.version>
+        <netty.version>4.0.33.Final</netty.version>
         <sysout-over-slf4j.version>1.0.2</sysout-over-slf4j.version>
         <log4j-over-slf4j.version>1.6.6</log4j-over-slf4j.version>
         <log4j.version>2.8.2</log4j.version>
@@ -643,7 +643,7 @@
                 <java.unit.test.exclude>org.apache.storm.testing.IntegrationTest</java.unit.test.exclude>
             </properties>
         </profile> 
-        <profile>
+	<profile>
             <id>integration-tests-only</id>
             <properties>
                 <!--Java-->
@@ -937,12 +937,12 @@
                 <artifactId>guava</artifactId>
                 <version>${guava.version}</version>
             </dependency>
-            <dependency>
-                <groupId>com.google.auto.service</groupId>
-                <artifactId>auto-service</artifactId>
-                <version>${auto-service.version}</version>
-                <optional>true</optional>
-            </dependency>
+                <dependency>
+                    <groupId>com.google.auto.service</groupId>
+                    <artifactId>auto-service</artifactId>
+                    <version>${auto-service.version}</version>
+                    <optional>true</optional>
+                </dependency>
             <dependency>
                 <groupId>org.apache.logging.log4j</groupId>
                 <artifactId>log4j-api</artifactId>
@@ -970,7 +970,7 @@
             </dependency>
             <dependency>
                 <groupId>io.netty</groupId>
-                <artifactId>netty</artifactId>
+                <artifactId>netty-all</artifactId>
                 <version>${netty.version}</version>
             </dependency>
             <dependency>


[4/5] storm git commit: STORM-1038: Upgrade to latest Netty

Posted by ka...@apache.org.
STORM-1038: Upgrade to latest Netty

Use manually allocated heap buffer for thrift encoder
Fix flaky test
Set correct checkstyle violations
Whitespace


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e2308285
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e2308285
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e2308285

Branch: refs/heads/master
Commit: e2308285d28a2df4a1435a1f4fb304630749c29a
Parents: d293dd3
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Fri May 11 16:55:35 2018 +0200
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Jun 9 20:34:51 2018 +0900

----------------------------------------------------------------------
 conf/defaults.yaml                              |   2 -
 pom.xml                                         |  18 +--
 shaded-deps/pom.xml                             |   8 +-
 storm-client/pom.xml                            |   2 +-
 .../src/jvm/org/apache/storm/Config.java        |   8 +
 .../org/apache/storm/daemon/worker/Worker.java  |   9 +-
 .../org/apache/storm/messaging/IConnection.java |   8 -
 .../apache/storm/messaging/local/Context.java   |  19 ---
 .../messaging/netty/BackPressureStatus.java     |  11 +-
 .../netty/BackPressureStatusEncoder.java        |  37 +++++
 .../apache/storm/messaging/netty/Client.java    | 158 ++++++++-----------
 .../apache/storm/messaging/netty/Context.java   |  31 ++--
 .../storm/messaging/netty/ControlMessage.java   |  32 ++--
 .../messaging/netty/INettySerializable.java     |   9 +-
 .../storm/messaging/netty/ISaslClient.java      |   6 +-
 .../storm/messaging/netty/ISaslServer.java      |   2 +-
 .../apache/storm/messaging/netty/IServer.java   |   6 +-
 .../netty/KerberosSaslClientHandler.java        | 107 ++++++-------
 .../netty/KerberosSaslNettyClientState.java     |   9 +-
 .../netty/KerberosSaslNettyServerState.java     |   9 +-
 .../netty/KerberosSaslServerHandler.java        |  53 +++----
 .../storm/messaging/netty/MessageBatch.java     |  40 +++--
 .../storm/messaging/netty/MessageDecoder.java   |  53 ++++---
 .../storm/messaging/netty/MessageEncoder.java   |  50 ------
 .../netty/NettyRenameThreadFactory.java         |  18 +--
 .../netty/NettySerializableMessageEncoder.java  |  37 +++++
 .../storm/messaging/netty/SaslMessageToken.java |  32 ++--
 .../messaging/netty/SaslNettyClientState.java   |   9 +-
 .../messaging/netty/SaslNettyServerState.java   |   9 +-
 .../messaging/netty/SaslStormClientHandler.java | 114 ++++++-------
 .../netty/SaslStormServerAuthorizeHandler.java  |  22 +--
 .../messaging/netty/SaslStormServerHandler.java |  62 ++++----
 .../apache/storm/messaging/netty/Server.java    | 145 ++++++++---------
 .../messaging/netty/StormClientHandler.java     |  35 ++--
 .../netty/StormClientPipelineFactory.java       |  22 +--
 .../messaging/netty/StormServerHandler.java     |  45 +++---
 .../netty/StormServerPipelineFactory.java       |  46 ++++--
 .../apache/storm/pacemaker/PacemakerClient.java |  68 ++++----
 .../storm/pacemaker/PacemakerClientHandler.java |  46 +++---
 .../storm/pacemaker/codec/ThriftDecoder.java    |  23 ++-
 .../storm/pacemaker/codec/ThriftEncoder.java    |  47 +++---
 .../pacemaker/codec/ThriftNettyClientCodec.java |  29 ++--
 .../org/apache/storm/utils/TransferDrainer.java |  73 ++-------
 .../storm/PaceMakerStateStorageFactoryTest.java |  62 ++++----
 storm-core/pom.xml                              | 134 ++++++++--------
 .../apache/storm/messaging/netty/NettyTest.java |  41 +++--
 storm-server/pom.xml                            |   2 +-
 .../storm/pacemaker/IServerMessageHandler.java  |   2 +-
 .../apache/storm/pacemaker/PacemakerServer.java | 108 +++++++------
 .../pacemaker/codec/ThriftNettyServerCodec.java |  34 ++--
 50 files changed, 905 insertions(+), 1047 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 4b3b1d0..f544903 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -226,8 +226,6 @@ storm.messaging.netty.buffer.low.watermark: 8388608 # 8 MB
 storm.messaging.netty.max_retries: 300
 storm.messaging.netty.max_wait_ms: 1000
 storm.messaging.netty.min_wait_ms: 100
-io.netty.noPreferDirect: false
-io.netty.allocator.type: "pooled"
 
 # If the Netty messaging layer is busy(netty internal buffer not writable), the Netty client will try to batch message as more as possible up to the size of storm.messaging.netty.transfer.batch.size bytes, otherwise it will try to flush message as soon as possible to reduce latency.
 storm.messaging.netty.transfer.batch.size: 262144

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3da30fc..009d0f0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -275,7 +275,7 @@
         <jgrapht.version>0.9.0</jgrapht.version>
         <guava.version>16.0.1</guava.version>
         <auto-service.version>1.0-rc3</auto-service.version>
-        <netty.version>4.0.33.Final</netty.version>
+        <netty.version>4.1.25.Final</netty.version>
         <sysout-over-slf4j.version>1.0.2</sysout-over-slf4j.version>
         <log4j-over-slf4j.version>1.6.6</log4j-over-slf4j.version>
         <log4j.version>2.8.2</log4j.version>
@@ -643,7 +643,7 @@
                 <java.unit.test.exclude>org.apache.storm.testing.IntegrationTest</java.unit.test.exclude>
             </properties>
         </profile> 
-	<profile>
+        <profile>
             <id>integration-tests-only</id>
             <properties>
                 <!--Java-->
@@ -937,12 +937,12 @@
                 <artifactId>guava</artifactId>
                 <version>${guava.version}</version>
             </dependency>
-                <dependency>
-                    <groupId>com.google.auto.service</groupId>
-                    <artifactId>auto-service</artifactId>
-                    <version>${auto-service.version}</version>
-                    <optional>true</optional>
-                </dependency>
+            <dependency>
+                <groupId>com.google.auto.service</groupId>
+                <artifactId>auto-service</artifactId>
+                <version>${auto-service.version}</version>
+                <optional>true</optional>
+            </dependency>
             <dependency>
                 <groupId>org.apache.logging.log4j</groupId>
                 <artifactId>log4j-api</artifactId>
@@ -1063,7 +1063,7 @@
                 <groupId>org.apache.calcite</groupId>
                 <artifactId>calcite-core</artifactId>
                 <version>${calcite.version}</version>
-            </dependency> 
+            </dependency>
 
             <!-- kafka artifact dependency needed for storm-kafka -->
             <dependency>

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/shaded-deps/pom.xml
----------------------------------------------------------------------
diff --git a/shaded-deps/pom.xml b/shaded-deps/pom.xml
index 8b33c70..97ecebd 100644
--- a/shaded-deps/pom.xml
+++ b/shaded-deps/pom.xml
@@ -112,7 +112,7 @@
         </dependency>
         <dependency>
             <groupId>io.netty</groupId>
-            <artifactId>netty</artifactId>
+            <artifactId>netty-all</artifactId>
             <optional>true</optional>
         </dependency>
         <dependency>
@@ -185,7 +185,9 @@
                             <include>commons-collections:commons-collections</include>
                             <include>commons-io:commons-io</include>
                             <include>commons-lang:commons-lang</include>
+                            <!-- Pulled in by Zookeeper -->
                             <include>io.netty:netty</include>
+                            <include>io.netty:netty-all</include>
                             <include>org.apache.curator:*</include>
                             <include>org.apache.httpcomponents:httpclient</include>
                             <include>org.apache.thrift:*</include>
@@ -252,6 +254,10 @@
                             <shadedPattern>org.apache.storm.shade.org.jboss.netty</shadedPattern>
                         </relocation>
                         <relocation>
+                            <pattern>io.netty</pattern>
+                            <shadedPattern>org.apache.storm.shade.io.netty</shadedPattern>
+                        </relocation>
+                        <relocation>
                             <pattern>org.jgrapht</pattern>
                             <shadedPattern>org.apache.storm.shade.org.jgrapht</shadedPattern>
                         </relocation>

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/pom.xml
----------------------------------------------------------------------
diff --git a/storm-client/pom.xml b/storm-client/pom.xml
index a13359d..2bf0ed1 100644
--- a/storm-client/pom.xml
+++ b/storm-client/pom.xml
@@ -172,7 +172,7 @@
                 <!--Note - the version would be inherited-->
                 <configuration>
                     <excludes>**/generated/**</excludes>
-                    <maxAllowedViolations>3298</maxAllowedViolations>
+                    <maxAllowedViolations>3285</maxAllowedViolations>
                 </configuration>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index db809f6..8dfcee9 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -875,6 +875,14 @@ public class Config extends HashMap<String, Object> {
     @isPositiveNumber
     public static final String PACEMAKER_PORT = "pacemaker.port";
     /**
+     * The maximum number of threads that should be used by the Pacemaker client.
+     * When Pacemaker gets loaded it will spawn new threads, up to
+     * this many total, to handle the load.
+     */
+    @isNumber
+    @isPositiveNumber
+    public static final String PACEMAKER_CLIENT_MAX_THREADS = "pacemaker.client.max.threads";
+    /**
      * This should be one of "DIGEST", "KERBEROS", or "NONE" Determines the mode of authentication the pacemaker server and client use. The
      * client must either match the server, or be NONE. In the case of NONE, no authentication is performed for the client, and if the
      * server is running with DIGEST or KERBEROS, the client can only write to the server (no reads). This is intended to provide a

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 1a7869e..233dfac 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -455,13 +455,9 @@ public class Worker implements Shutdownable, DaemonCommon {
             }
             LOG.info("Shut down executors");
 
-            // this is fine because the only time this is shared is when it's a local context,
-            // in which case it's a noop
-            workerState.mqContext.term();
             LOG.info("Shutting down transfer thread");
             workerState.haltWorkerTransfer();
 
-
             if (transferThread != null) {
                 transferThread.interrupt();
                 transferThread.join();
@@ -479,6 +475,11 @@ public class Worker implements Shutdownable, DaemonCommon {
             workerState.resetLogLevelsTimer.close();
             workerState.flushTupleTimer.close();
             workerState.backPressureCheckTimer.close();
+            
+            // this is fine because the only time this is shared is when it's a local context,
+            // in which case it's a noop
+            workerState.mqContext.term();
+            
             workerState.closeResources();
 
             StormMetricRegistry.stop();

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java b/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
index 93c8e8f..c2e156c 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
@@ -48,14 +48,6 @@ public interface IConnection extends AutoCloseable {
     void sendBackPressureStatus(BackPressureStatus bpStatus);
 
     /**
-     * send a message with taskId and payload
-     *
-     * @param taskId  task ID
-     * @param payload
-     */
-    void send(int taskId, byte[] payload);
-
-    /**
      * send batch messages
      *
      * @param msgs

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java b/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
index e49ac11..6071cbe 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
@@ -39,8 +39,6 @@ public class Context implements IContext {
     private static final Logger LOG = LoggerFactory.getLogger(Context.class);
     private static ConcurrentHashMap<String, LocalServer> _registry = new ConcurrentHashMap<>();
 
-    ;
-
     private static LocalServer getLocalServer(String nodeId, int port) {
         String key = nodeId + "-" + port;
         LocalServer ret = _registry.get(key);
@@ -96,11 +94,6 @@ public class Context implements IContext {
         }
 
         @Override
-        public void send(int taskId, byte[] payload) {
-            throw new IllegalArgumentException("SHOULD NOT HAPPEN");
-        }
-
-        @Override
         public void send(Iterator<TaskMessage> msgs) {
             throw new IllegalArgumentException("SHOULD NOT HAPPEN");
         }
@@ -190,18 +183,6 @@ public class Context implements IContext {
         }
 
         @Override
-        public void send(int taskId, byte[] payload) {
-            TaskMessage message = new TaskMessage(taskId, payload);
-            IConnectionCallback serverCb = _server._cb;
-            if (serverCb != null) {
-                flushPending();
-                serverCb.recv(Arrays.asList(message));
-            } else {
-                _pendingDueToUnregisteredServer.add(message);
-            }
-        }
-
-        @Override
         public void send(Iterator<TaskMessage> msgs) {
             IConnectionCallback serverCb = _server._cb;
             if (serverCb != null) {

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java
index 4798445..5ead740 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java
@@ -23,8 +23,8 @@ import java.util.Collection;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.storm.serialization.KryoValuesDeserializer;
 import org.apache.storm.serialization.KryoValuesSerializer;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBuffer;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBuffers;
+import org.apache.storm.shade.io.netty.buffer.ByteBuf;
+import org.apache.storm.shade.io.netty.buffer.ByteBufAllocator;
 
 // Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation
 public class BackPressureStatus {
@@ -42,6 +42,9 @@ public class BackPressureStatus {
         this.id = bpCount.incrementAndGet();
     }
 
+    /**
+     * Constructor.
+     */
     public BackPressureStatus(String workerId, Collection<Integer> bpTasks, Collection<Integer> nonBpTasks) {
         this.workerId = workerId;
         this.id = bpCount.incrementAndGet();
@@ -61,9 +64,9 @@ public class BackPressureStatus {
     /**
      * Encoded as -600 ... short(2) len ... int(4) payload ... byte[]     *
      */
-    public ChannelBuffer buffer(KryoValuesSerializer ser) throws IOException {
+    public ByteBuf buffer(ByteBufAllocator alloc, KryoValuesSerializer ser) throws IOException {
         byte[] serializedBytes = ser.serializeObject(this);
-        ChannelBuffer buff = ChannelBuffers.buffer(SIZE_OF_ID + SIZE_OF_INT + serializedBytes.length);
+        ByteBuf buff = alloc.ioBuffer(SIZE_OF_ID + SIZE_OF_INT + serializedBytes.length);
         buff.writeShort(IDENTIFIER);
         buff.writeInt(serializedBytes.length);
         buff.writeBytes(serializedBytes);

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatusEncoder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatusEncoder.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatusEncoder.java
new file mode 100644
index 0000000..be4fff9
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatusEncoder.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.messaging.netty;
+
+import java.util.List;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.handler.codec.MessageToMessageEncoder;
+
+public class BackPressureStatusEncoder extends MessageToMessageEncoder<BackPressureStatus> {
+
+    private final KryoValuesSerializer ser;
+
+    public BackPressureStatusEncoder(KryoValuesSerializer ser) {
+        this.ser = ser;
+    }
+
+    @Override
+    protected void encode(ChannelHandlerContext ctx, BackPressureStatus msg, List<Object> out) throws Exception {
+        out.add(msg.buffer(ctx.alloc(), ser));
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
index 91f70af..1ecdd26 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
@@ -12,14 +12,14 @@
 
 package org.apache.storm.messaging.netty;
 
+import static org.apache.storm.shade.com.google.common.base.Preconditions.checkState;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.Timer;
 import java.util.concurrent.TimeUnit;
@@ -39,44 +39,51 @@ import org.apache.storm.policy.IWaitStrategy.WAIT_SITUATION;
 import org.apache.storm.policy.WaitStrategyProgressive;
 import org.apache.storm.serialization.KryoValuesDeserializer;
 import org.apache.storm.serialization.KryoValuesSerializer;
-import org.apache.storm.shade.org.jboss.netty.bootstrap.ClientBootstrap;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelFactory;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelFuture;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelFutureListener;
-import org.apache.storm.shade.org.jboss.netty.util.HashedWheelTimer;
-import org.apache.storm.shade.org.jboss.netty.util.Timeout;
-import org.apache.storm.shade.org.jboss.netty.util.TimerTask;
+import org.apache.storm.shade.io.netty.bootstrap.Bootstrap;
+import org.apache.storm.shade.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelFuture;
+import org.apache.storm.shade.io.netty.channel.ChannelFutureListener;
+import org.apache.storm.shade.io.netty.channel.ChannelOption;
+import org.apache.storm.shade.io.netty.channel.EventLoopGroup;
+import org.apache.storm.shade.io.netty.channel.WriteBufferWaterMark;
+import org.apache.storm.shade.io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.storm.shade.io.netty.util.HashedWheelTimer;
+import org.apache.storm.shade.io.netty.util.Timeout;
+import org.apache.storm.shade.io.netty.util.TimerTask;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ReflectionUtils;
 import org.apache.storm.utils.StormBoundedExponentialBackoffRetry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.storm.shade.com.google.common.base.Preconditions.checkState;
-
 /**
  * A Netty client for sending task messages to a remote destination (Netty server).
  *
- * Implementation details:
+ * <p>Implementation details:
  *
- * - Sending messages, i.e. writing to the channel, is performed asynchronously. - Messages are sent in batches to optimize for network
- * throughput at the expense of network latency.  The message batch size is configurable. - Connecting and reconnecting are performed
- * asynchronously. - Note: The current implementation drops any messages that are being enqueued for sending if the connection to the remote
+ * <p>Sending messages, i.e. writing to the channel, is performed asynchronously. Messages are sent in batches to optimize for network
+ * throughput at the expense of network latency.  The message batch size is configurable. Connecting and reconnecting are performed
+ * asynchronously. Note: The current implementation drops any messages that are being enqueued for sending if the connection to the remote
  * destination is currently unavailable.
  */
 public class Client extends ConnectionWithStatus implements IStatefulObject, ISaslClient {
     private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
     private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;
+    /**
+     * Periodically checks for connected channel in order to avoid loss of messages.
+     */
+    private static final long CHANNEL_ALIVE_INTERVAL_MS = 30000L;
 
     private static final Logger LOG = LoggerFactory.getLogger(Client.class);
     private static final String PREFIX = "Netty-Client-";
     private static final long NO_DELAY_MS = 0L;
-    private static final Timer timer = new Timer("Netty-ChannelAlive-Timer", true);
+    private static final Timer TIMER = new Timer("Netty-ChannelAlive-Timer", true);
     protected final String dstAddressPrefixedName;
     private final Map<String, Object> topoConf;
     private final StormBoundedExponentialBackoffRetry retryPolicy;
-    private final ClientBootstrap bootstrap;
+    private final EventLoopGroup eventLoopGroup;
+    private final Bootstrap bootstrap;
     private final InetSocketAddress dstAddress;
 
     /**
@@ -100,10 +107,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
      */
     private final AtomicInteger messagesLost = new AtomicInteger(0);
     /**
-     * Periodically checks for connected channel in order to avoid loss of messages
-     */
-    private final long CHANNEL_ALIVE_INTERVAL_MS = 30000L;
-    /**
      * Number of messages buffered in memory.
      */
     private final AtomicLong pendingMessages = new AtomicLong(0);
@@ -123,7 +126,8 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
      */
     private volatile boolean closing = false;
 
-    Client(Map<String, Object> topoConf, AtomicBoolean[] remoteBpStatus, ChannelFactory factory, HashedWheelTimer scheduler, String host,
+    Client(Map<String, Object> topoConf, AtomicBoolean[] remoteBpStatus,
+        EventLoopGroup eventLoopGroup, HashedWheelTimer scheduler, String host,
            int port) {
         this.topoConf = topoConf;
         closing = false;
@@ -135,7 +139,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
         saslChannelReady.set(!ObjectReader.getBoolean(topoConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION), false));
         LOG.info("Creating Netty Client, connecting to {}:{}, bufferSize: {}, lowWatermark: {}, highWatermark: {}",
                  host, port, bufferSize, lowWatermark, highWatermark);
-        int messageBatchSize = ObjectReader.getInt(topoConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
 
         int maxReconnectionAttempts = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES));
         int minWaitMs = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
@@ -143,11 +146,22 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
         retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, maxWaitMs, maxReconnectionAttempts);
 
         // Initiate connection to remote destination
-        bootstrap = createClientBootstrap(factory, bufferSize, lowWatermark, highWatermark, topoConf, remoteBpStatus);
+        this.eventLoopGroup = eventLoopGroup;
+        // Initiate connection to remote destination
+        bootstrap = new Bootstrap()
+            .group(this.eventLoopGroup)
+            .channel(NioSocketChannel.class)
+            .option(ChannelOption.TCP_NODELAY, true)
+            .option(ChannelOption.SO_SNDBUF, bufferSize)
+            .option(ChannelOption.SO_KEEPALIVE, true)
+            .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(lowWatermark, highWatermark))
+            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+            .handler(new StormClientPipelineFactory(this, remoteBpStatus, topoConf));
         dstAddress = new InetSocketAddress(host, port);
         dstAddressPrefixedName = prefixedName(dstAddress);
         launchChannelAliveThread();
         scheduleConnect(NO_DELAY_MS);
+        int messageBatchSize = ObjectReader.getInt(topoConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
         batcher = new MessageBuffer(messageBatchSize);
         String clazz = (String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY);
         if (clazz == null) {
@@ -167,7 +181,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
     private void launchChannelAliveThread() {
         // netty TimerTask is already defined and hence a fully
         // qualified name
-        timer.schedule(new java.util.TimerTask() {
+        TIMER.schedule(new java.util.TimerTask() {
             public void run() {
                 try {
                     LOG.debug("running timer task, address {}", dstAddress);
@@ -183,20 +197,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
         }, 0, CHANNEL_ALIVE_INTERVAL_MS);
     }
 
-    private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize,
-                                                  int lowWatermark, int highWatermark,
-                                                  Map<String, Object> topoConf,
-                                                  AtomicBoolean[] remoteBpStatus) {
-        ClientBootstrap bootstrap = new ClientBootstrap(factory);
-        bootstrap.setOption("tcpNoDelay", true);
-        bootstrap.setOption("sendBufferSize", bufferSize);
-        bootstrap.setOption("keepAlive", true);
-        bootstrap.setOption("writeBufferLowWaterMark", lowWatermark);
-        bootstrap.setOption("writeBufferHighWaterMark", highWatermark);
-        bootstrap.setPipelineFactory(new StormClientPipelineFactory(this, remoteBpStatus, topoConf));
-        return bootstrap;
-    }
-
     private String prefixedName(InetSocketAddress dstAddress) {
         if (null != dstAddress) {
             return PREFIX + dstAddress.toString();
@@ -205,7 +205,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
     }
 
     /**
-     * Enqueue a task message to be sent to server
+     * Enqueue a task message to be sent to server.
      */
     private void scheduleConnect(long delayMs) {
         scheduler.newTimeout(new Connect(dstAddress), delayMs, TimeUnit.MILLISECONDS);
@@ -216,14 +216,10 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
     }
 
     private boolean connectionEstablished(Channel channel) {
-        // Because we are using TCP (which is a connection-oriented transport unlike UDP), a connection is only fully
-        // established iff the channel is connected.  That is, a TCP-based channel must be in the CONNECTED state before
-        // anything can be read or written to the channel.
-        //
+        // The connection is ready once the channel is active.
         // See:
-        // - http://netty.io/3.9/api/org/jboss/netty/channel/ChannelEvent.html
-        // - http://stackoverflow.com/questions/13356622/what-are-the-netty-channel-state-transitions
-        return channel != null && channel.isConnected();
+        // - http://netty.io/wiki/new-and-noteworthy-in-4.0.html#wiki-h4-19
+        return channel != null && channel.isActive();
     }
 
     /**
@@ -270,14 +266,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
         throw new RuntimeException("Client connection should not send BackPressure status");
     }
 
-    @Override
-    public void send(int taskId, byte[] payload) {
-        TaskMessage msg = new TaskMessage(taskId, payload);
-        List<TaskMessage> wrapper = new ArrayList<TaskMessage>(1);
-        wrapper.add(msg);
-        send(wrapper.iterator());
-    }
-
     /**
      * Enqueue task messages to be sent to the remote destination (cf. `host` and `port`).
      */
@@ -331,7 +319,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
                 if (idleCounter == 0) { // check avoids multiple log msgs when in a idle loop
                     LOG.debug("Experiencing Back Pressure from Netty. Entering BackPressure Wait");
                 }
-                if (!channel.isConnected()) {
+                if (!channel.isActive()) {
                     throw new IOException("Connection disconnected");
                 }
                 idleCounter = waitStrategy.idle(idleCounter);
@@ -386,9 +374,9 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
     /**
      * Asynchronously writes the message batch to the channel.
      *
-     * If the write operation fails, then we will close the channel and trigger a reconnect.
+     * <p>If the write operation fails, then we will close the channel and trigger a reconnect.
      */
-    private void flushMessages(Channel channel, final MessageBatch batch) {
+    private void flushMessages(final Channel channel, final MessageBatch batch) {
         if (null == batch || batch.isEmpty()) {
             return;
         }
@@ -397,8 +385,9 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
         LOG.debug("writing {} messages to channel {}", batch.size(), channel.toString());
         pendingMessages.addAndGet(numMessages);
 
-        ChannelFuture future = channel.write(batch);
+        ChannelFuture future = channel.writeAndFlush(batch);
         future.addListener(new ChannelFutureListener() {
+            @Override
             public void operationComplete(ChannelFuture future) throws Exception {
                 pendingMessages.addAndGet(0 - numMessages);
                 if (future.isSuccess()) {
@@ -406,8 +395,8 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
                     messagesSent.getAndAdd(batch.size());
                 } else {
                     LOG.error("failed to send {} messages to {}: {}", numMessages, dstAddressPrefixedName,
-                              future.getCause());
-                    closeChannelAndReconnect(future.getChannel());
+                              future.cause());
+                    closeChannelAndReconnect(future.channel());
                     messagesLost.getAndAdd(numMessages);
                 }
             }
@@ -417,9 +406,9 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
 
     /**
      * Schedule a reconnect if we closed a non-null channel, and acquired the right to provide a replacement by successfully setting a null
-     * to the channel field
+     * to the channel field.
      *
-     * @param channel
+     * @param channel the channel to close
      * @return if the call scheduled a re-connect task
      */
     private boolean closeChannelAndReconnect(Channel channel) {
@@ -461,8 +450,8 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
             try {
                 long deltaMs = System.currentTimeMillis() - startMs;
                 if (deltaMs > PENDING_MESSAGES_FLUSH_TIMEOUT_MS) {
-                    LOG.error("failed to send all pending messages to {} within timeout, {} of {} messages were not " +
-                              "sent", dstAddressPrefixedName, pendingMessages.get(), totalPendingMsgs);
+                    LOG.error("failed to send all pending messages to {} within timeout, {} of {} messages were not "
+                        + "sent", dstAddressPrefixedName, pendingMessages.get(), totalPendingMsgs);
                     break;
                 }
                 Thread.sleep(PENDING_MESSAGES_FLUSH_INTERVAL_MS);
@@ -488,7 +477,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
     @Override
     public Map<Integer, Load> getLoad(Collection<Integer> tasks) {
         Map<Integer, Double> loadCache = serverLoad;
-        Map<Integer, Load> ret = new HashMap<Integer, Load>();
+        Map<Integer, Load> ret = new HashMap<>();
         if (loadCache != null) {
             double clientLoad = Math.min(pendingMessages.get(), 1024) / 1024.0;
             for (Integer task : tasks) {
@@ -521,34 +510,26 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
         return topoConf;
     }
 
-    /**
-     * ISaslClient interface
-     **/
-    public void channelConnected(Channel channel) {
-        //        setChannel(channel);
-    }
-
-    public void channelReady() {
+    @Override
+    public void channelReady(Channel channel) {
         saslChannelReady.set(true);
     }
 
+    @Override
     public String name() {
         return (String) topoConf.get(Config.TOPOLOGY_NAME);
     }
 
+    @Override
     public String secretKey() {
         return SaslUtils.getSecretKey(topoConf);
     }
 
-    /**
-     * end
-     **/
-
     private String srcAddressName() {
         String name = null;
         Channel channel = channelRef.get();
         if (channel != null) {
-            SocketAddress address = channel.getLocalAddress();
+            SocketAddress address = channel.localAddress();
             if (address != null) {
                 name = address.toString();
             }
@@ -562,15 +543,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
     }
 
     /**
-     * Called by Netty thread on change in channel interest
-     *
-     * @param channel
-     */
-    public void notifyInterestChanged(Channel channel) {
-        // NOOP since we are checking channel.isWritable in writeMessage
-    }
-
-    /**
      * Asynchronously establishes a Netty connection to the remote address This task runs on a single thread shared among all clients, and
      * thus should not perform operations that block.
      */
@@ -604,7 +576,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
                     @Override
                     public void operationComplete(ChannelFuture future) throws Exception {
                         // This call returns immediately
-                        Channel newChannel = future.getChannel();
+                        Channel newChannel = future.channel();
 
                         if (future.isSuccess() && connectionEstablished(newChannel)) {
                             boolean setChannel = channelRef.compareAndSet(null, newChannel);
@@ -616,7 +588,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
                                          messagesLost.get());
                             }
                         } else {
-                            Throwable cause = future.getCause();
+                            Throwable cause = future.cause();
                             reschedule(cause);
                             if (newChannel != null) {
                                 newChannel.close();
@@ -626,8 +598,8 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
                 });
             } else {
                 close();
-                throw new RuntimeException("Giving up to scheduleConnect to " + dstAddressPrefixedName + " after " +
-                                           connectionAttempts + " failed attempts. " + messagesLost.get() + " messages were lost");
+                throw new RuntimeException("Giving up to scheduleConnect to " + dstAddressPrefixedName + " after "
+                    + connectionAttempts + " failed attempts. " + messagesLost.get() + " messages were lost");
 
             }
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
index e3bc8a5..27ccd04 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
@@ -15,40 +15,36 @@ package org.apache.storm.messaging.netty;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.storm.Config;
 import org.apache.storm.messaging.IConnection;
 import org.apache.storm.messaging.IContext;
-import org.apache.storm.shade.org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.apache.storm.shade.org.jboss.netty.util.HashedWheelTimer;
+import org.apache.storm.shade.io.netty.channel.EventLoopGroup;
+import org.apache.storm.shade.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.storm.shade.io.netty.util.HashedWheelTimer;
 import org.apache.storm.utils.ObjectReader;
 
 public class Context implements IContext {
     private Map<String, Object> topoConf;
     private List<Server> serverConnections;
-    private NioClientSocketChannelFactory clientChannelFactory;
+    private EventLoopGroup workerEventLoopGroup;
     private HashedWheelTimer clientScheduleService;
 
     /**
      * initialization per Storm configuration
      */
+    @Override
     public void prepare(Map<String, Object> topoConf) {
         this.topoConf = topoConf;
         serverConnections = new ArrayList<>();
 
-        //each context will have a single client channel factory
+        //each context will have a single client channel worker event loop group
         int maxWorkers = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
-        ThreadFactory bossFactory = new NettyRenameThreadFactory("client" + "-boss");
         ThreadFactory workerFactory = new NettyRenameThreadFactory("client" + "-worker");
-        if (maxWorkers > 0) {
-            clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
-                                                                     Executors.newCachedThreadPool(workerFactory), maxWorkers);
-        } else {
-            clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
-                                                                     Executors.newCachedThreadPool(workerFactory));
-        }
+        // 0 means DEFAULT_EVENT_LOOP_THREADS
+        // https://github.com/netty/netty/blob/netty-4.1.24.Final/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java#L40
+        this.workerEventLoopGroup = new NioEventLoopGroup(maxWorkers > 0 ? maxWorkers : 0, workerFactory);
 
         clientScheduleService = new HashedWheelTimer(new NettyRenameThreadFactory("client-schedule-service"));
     }
@@ -56,6 +52,7 @@ public class Context implements IContext {
     /**
      * establish a server with a binding port
      */
+    @Override
     public synchronized IConnection bind(String storm_id, int port) {
         Server server = new Server(topoConf, port);
         serverConnections.add(server);
@@ -65,14 +62,16 @@ public class Context implements IContext {
     /**
      * establish a connection to a remote server
      */
+    @Override
     public IConnection connect(String storm_id, String host, int port, AtomicBoolean[] remoteBpStatus) {
-        return new Client(topoConf, remoteBpStatus, clientChannelFactory,
+        return new Client(topoConf, remoteBpStatus, workerEventLoopGroup,
                                         clientScheduleService, host, port);
     }
 
     /**
      * terminate this context
      */
+    @Override
     public synchronized void term() {
         clientScheduleService.stop();
 
@@ -81,8 +80,8 @@ public class Context implements IContext {
         }
         serverConnections = null;
 
-        //we need to release resources associated with client channel factory
-        clientChannelFactory.releaseExternalResources();
+        //we need to release resources associated with the worker event loop group
+        workerEventLoopGroup.shutdownGracefully().awaitUninterruptibly();
 
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/ControlMessage.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/ControlMessage.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/ControlMessage.java
index c8b8c2c..3836faf 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/ControlMessage.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/ControlMessage.java
@@ -12,10 +12,8 @@
 
 package org.apache.storm.messaging.netty;
 
-import java.io.IOException;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBuffer;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBufferOutputStream;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBuffers;
+import org.apache.storm.shade.io.netty.buffer.ByteBuf;
+import org.apache.storm.shade.io.netty.buffer.Unpooled;
 
 public enum ControlMessage implements INettySerializable {
     CLOSE_MESSAGE((short) -100),
@@ -25,7 +23,7 @@ public enum ControlMessage implements INettySerializable {
     SASL_TOKEN_MESSAGE_REQUEST((short) -202),
     SASL_COMPLETE_REQUEST((short) -203);
 
-    private short code;
+    private final short code;
 
     //private constructor
     private ControlMessage(short code) {
@@ -46,27 +44,21 @@ public enum ControlMessage implements INettySerializable {
     }
 
     public static ControlMessage read(byte[] serial) {
-        ChannelBuffer cm_buffer = ChannelBuffers.copiedBuffer(serial);
+        ByteBuf cm_buffer = Unpooled.wrappedBuffer(serial);
+        try{
         return mkMessage(cm_buffer.getShort(0));
+        } finally {
+            cm_buffer.release();
+        }
     }
 
+    @Override
     public int encodeLength() {
         return 2; //short
     }
 
-    /**
-     * encode the current Control Message into a channel buffer
-     *
-     * @throws IOException
-     */
-    public ChannelBuffer buffer() throws IOException {
-        ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encodeLength()));
-        write(bout);
-        bout.close();
-        return bout.buffer();
-    }
-
-    public void write(ChannelBufferOutputStream bout) throws IOException {
-        bout.writeShort(code);
+    @Override
+    public void write(ByteBuf buf) {
+        buf.writeShort(code);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/INettySerializable.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/INettySerializable.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/INettySerializable.java
index a804abc..a12a96c 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/INettySerializable.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/INettySerializable.java
@@ -12,11 +12,14 @@
 
 package org.apache.storm.messaging.netty;
 
-import java.io.IOException;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBuffer;
+import org.apache.storm.shade.io.netty.buffer.ByteBuf;
 
 public interface INettySerializable {
-    ChannelBuffer buffer() throws IOException;
+    /**
+     * Serialize this object to ByteBuf.
+     * @param dest The ByteBuf to serialize to
+     */
+    void write(ByteBuf dest);
 
     int encodeLength();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslClient.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslClient.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslClient.java
index 3ebe0dc..813448a 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslClient.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslClient.java
@@ -12,12 +12,10 @@
 
 package org.apache.storm.messaging.netty;
 
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.Channel;
 
 public interface ISaslClient {
-    void channelConnected(Channel channel);
-
-    void channelReady();
+    void channelReady(Channel channel);
 
     String name();
 

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslServer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslServer.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslServer.java
index 32af385..07f47a5 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslServer.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslServer.java
@@ -12,7 +12,7 @@
 
 package org.apache.storm.messaging.netty;
 
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.Channel;
 
 public interface ISaslServer extends IServer {
     String name();

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/IServer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/IServer.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/IServer.java
index 977a38b..4b095d6 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/IServer.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/IServer.java
@@ -12,12 +12,10 @@
 
 package org.apache.storm.messaging.netty;
 
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.Channel;
 
 public interface IServer {
-    void channelConnected(Channel c);
+    void channelActive(Channel c);
 
     void received(Object message, String remote, Channel channel) throws InterruptedException;
-
-    void closeChannel(Channel c);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java
index 5dbf4b6..472f45b 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java
@@ -14,27 +14,24 @@ package org.apache.storm.messaging.netty;
 
 import java.io.IOException;
 import java.util.Map;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelStateEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.Channels;
-import org.apache.storm.shade.org.jboss.netty.channel.MessageEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.channel.ChannelInboundHandlerAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class KerberosSaslClientHandler extends SimpleChannelUpstreamHandler {
+public class KerberosSaslClientHandler extends ChannelInboundHandlerAdapter {
 
     private static final Logger LOG = LoggerFactory
         .getLogger(KerberosSaslClientHandler.class);
-    long start_time;
-    private ISaslClient client;
+    private final long start_time;
+    private final ISaslClient client;
     /**
      * Used for client or server's token to send or receive from each other.
      */
-    private Map<String, Object> topoConf;
-    private String jaas_section;
-    private String host;
+    private final Map<String, Object> topoConf;
+    private final String jaas_section;
+    private final String host;
 
     public KerberosSaslClientHandler(ISaslClient client, Map<String, Object> topoConf, String jaas_section, String host) throws
         IOException {
@@ -46,85 +43,89 @@ public class KerberosSaslClientHandler extends SimpleChannelUpstreamHandler {
     }
 
     @Override
-    public void channelConnected(ChannelHandlerContext ctx,
-                                 ChannelStateEvent event) {
+    public void channelActive(ChannelHandlerContext ctx) {
         // register the newly established channel
-        Channel channel = ctx.getChannel();
-        client.channelConnected(channel);
+        Channel channel = ctx.channel();
 
         LOG.info("Connection established from {} to {}",
-                 channel.getLocalAddress(), channel.getRemoteAddress());
+                 channel.localAddress(), channel.remoteAddress());
 
         try {
-            KerberosSaslNettyClient saslNettyClient = KerberosSaslNettyClientState.getKerberosSaslNettyClient
-                .get(channel);
+            KerberosSaslNettyClient saslNettyClient = channel.attr(KerberosSaslNettyClientState.KERBEROS_SASL_NETTY_CLIENT).get();
 
             if (saslNettyClient == null) {
                 LOG.debug("Creating saslNettyClient now for channel: {}",
                           channel);
                 saslNettyClient = new KerberosSaslNettyClient(topoConf, jaas_section, host);
-                KerberosSaslNettyClientState.getKerberosSaslNettyClient.set(channel,
-                                                                            saslNettyClient);
+                channel.attr(KerberosSaslNettyClientState.KERBEROS_SASL_NETTY_CLIENT).set(saslNettyClient);
             }
             LOG.debug("Going to initiate Kerberos negotiations.");
             byte[] initialChallenge = saslNettyClient.saslResponse(new SaslMessageToken(new byte[0]));
             LOG.debug("Sending initial challenge: {}", initialChallenge);
-            channel.write(new SaslMessageToken(initialChallenge));
+            channel.writeAndFlush(new SaslMessageToken(initialChallenge), channel.voidPromise());
         } catch (Exception e) {
             LOG.error("Failed to authenticate with server due to error: ",
                       e);
         }
-        return;
-
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent event)
-        throws Exception {
-        LOG.debug("send/recv time (ms): {}",
-                  (System.currentTimeMillis() - start_time));
+    public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception {
+        LOG.debug("send/recv time (ms): {}", (System.currentTimeMillis() - start_time));
 
-        Channel channel = ctx.getChannel();
+        // examine the response message from server
+        if (message instanceof ControlMessage) {
+            handleControlMessage(ctx, (ControlMessage) message);
+        } else if (message instanceof SaslMessageToken) {
+            handleSaslMessageToken(ctx, (SaslMessageToken) message);
+        } else {
+            LOG.error("Unexpected message from server: {}", message);
+        }
+    }
 
+    private KerberosSaslNettyClient getChannelSaslClient(Channel channel) throws Exception {
         // Generate SASL response to server using Channel-local SASL client.
-        KerberosSaslNettyClient saslNettyClient = KerberosSaslNettyClientState.getKerberosSaslNettyClient
-            .get(channel);
+        KerberosSaslNettyClient saslNettyClient = channel.attr(KerberosSaslNettyClientState.KERBEROS_SASL_NETTY_CLIENT).get();
         if (saslNettyClient == null) {
             throw new Exception("saslNettyClient was unexpectedly null for channel:" + channel);
         }
+        return saslNettyClient;
+    }
 
-        // examine the response message from server
-        if (event.getMessage() instanceof ControlMessage) {
-            ControlMessage msg = (ControlMessage) event.getMessage();
-            if (msg == ControlMessage.SASL_COMPLETE_REQUEST) {
+    private void handleControlMessage(ChannelHandlerContext ctx, ControlMessage controlMessage) throws Exception {
+        Channel channel = ctx.channel();
+        KerberosSaslNettyClient saslNettyClient = getChannelSaslClient(channel);
+        if (controlMessage == ControlMessage.SASL_COMPLETE_REQUEST) {
                 LOG.debug("Server has sent us the SaslComplete message. Allowing normal work to proceed.");
 
                 if (!saslNettyClient.isComplete()) {
-                    String message = "Server returned a Sasl-complete message, but as far as we can tell, we are not authenticated yet.";
-                    LOG.error(message);
-                    throw new Exception(message);
+                String errorMessage =
+                    "Server returned a Sasl-complete message, but as far as we can tell, we are not authenticated yet.";
+                LOG.error(errorMessage);
+                throw new Exception(errorMessage);
                 }
-                ctx.getPipeline().remove(this);
-                this.client.channelReady();
+            ctx.pipeline().remove(this);
+            this.client.channelReady(channel);
 
-                // We call fireMessageReceived since the client is allowed to
+            // We call fireChannelRead since the client is allowed to
                 // perform this request. The client's request will now proceed
                 // to the next pipeline component namely StormClientHandler.
-                Channels.fireMessageReceived(ctx, msg);
+            ctx.fireChannelRead(controlMessage);
             } else {
-                LOG.warn("Unexpected control message: {}", msg);
+            LOG.warn("Unexpected control message: {}", controlMessage);
             }
-            return;
-        } else if (event.getMessage() instanceof SaslMessageToken) {
-            SaslMessageToken saslTokenMessage = (SaslMessageToken) event
-                .getMessage();
+    }
+
+    private void handleSaslMessageToken(ChannelHandlerContext ctx, SaslMessageToken saslMessageToken) throws Exception {
+        Channel channel = ctx.channel();
+        KerberosSaslNettyClient saslNettyClient = getChannelSaslClient(channel);
             LOG.debug("Responding to server's token of length: {}",
-                      saslTokenMessage.getSaslToken().length);
+            saslMessageToken.getSaslToken().length);
 
             // Generate SASL response (but we only actually send the response if
             // it's non-null.
             byte[] responseToServer = saslNettyClient
-                .saslResponse(saslTokenMessage);
+            .saslResponse(saslMessageToken);
             if (responseToServer == null) {
                 // If we generate a null response, then authentication has completed
                 // (if not, warn), and return without sending a response back to the
@@ -134,18 +135,14 @@ public class KerberosSaslClientHandler extends SimpleChannelUpstreamHandler {
                     LOG.warn("Generated a null response, but authentication is not complete.");
                     throw new Exception("Our reponse to the server is null, but as far as we can tell, we are not authenticated yet.");
                 }
-                this.client.channelReady();
-                return;
+            this.client.channelReady(channel);
             } else {
                 LOG.debug("Response to server token has length: {}",
                           responseToServer.length);
-            }
             // Construct a message containing the SASL response and send it to the
             // server.
             SaslMessageToken saslResponse = new SaslMessageToken(responseToServer);
-            channel.write(saslResponse);
-        } else {
-            LOG.error("Unexpected message from server: {}", event.getMessage());
+            channel.writeAndFlush(saslResponse, channel.voidPromise());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClientState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClientState.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClientState.java
index fbfd785..29746ff 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClientState.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClientState.java
@@ -12,15 +12,10 @@
 
 package org.apache.storm.messaging.netty;
 
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelLocal;
+import org.apache.storm.shade.io.netty.util.AttributeKey;
 
 final class KerberosSaslNettyClientState {
 
-    public static final ChannelLocal<KerberosSaslNettyClient> getKerberosSaslNettyClient = new ChannelLocal<KerberosSaslNettyClient>() {
-        protected KerberosSaslNettyClient initialValue(Channel channel) {
-            return null;
-        }
-    };
+    public static final AttributeKey<KerberosSaslNettyClient> KERBEROS_SASL_NETTY_CLIENT = AttributeKey.valueOf("kerberos.sasl.netty.client");
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServerState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServerState.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServerState.java
index 302903d..1816ffc 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServerState.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServerState.java
@@ -12,14 +12,9 @@
 
 package org.apache.storm.messaging.netty;
 
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelLocal;
+import org.apache.storm.shade.io.netty.util.AttributeKey;
 
 final class KerberosSaslNettyServerState {
 
-    public static final ChannelLocal<KerberosSaslNettyServer> getKerberosSaslNettyServer = new ChannelLocal<KerberosSaslNettyServer>() {
-        protected KerberosSaslNettyServer initialValue(Channel channel) {
-            return null;
-        }
-    };
+    public static final AttributeKey<KerberosSaslNettyServer> KERBOROS_SASL_NETTY_SERVER = AttributeKey.valueOf("kerboros.sasl.netty.server");
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslServerHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslServerHandler.java
index da94d0b..0356538 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslServerHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslServerHandler.java
@@ -15,26 +15,23 @@ package org.apache.storm.messaging.netty;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.channel.Channels;
-import org.apache.storm.shade.org.jboss.netty.channel.ExceptionEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.MessageEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.channel.ChannelInboundHandlerAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class KerberosSaslServerHandler extends SimpleChannelUpstreamHandler {
+public class KerberosSaslServerHandler extends ChannelInboundHandlerAdapter {
 
     private static final Logger LOG = LoggerFactory
         .getLogger(KerberosSaslServerHandler.class);
-    ISaslServer server;
+    private final ISaslServer server;
     /**
      * Used for client or server's token to send or receive from each other.
      */
-    private Map<String, Object> topoConf;
-    private String jaas_section;
-    private List<String> authorizedUsers;
+    private final Map<String, Object> topoConf;
+    private final String jaas_section;
+    private final List<String> authorizedUsers;
 
     public KerberosSaslServerHandler(ISaslServer server, Map<String, Object> topoConf, String jaas_section,
                                      List<String> authorizedUsers) throws IOException {
@@ -45,15 +42,12 @@ public class KerberosSaslServerHandler extends SimpleChannelUpstreamHandler {
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-        throws Exception {
-        Object msg = e.getMessage();
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         if (msg == null) {
             return;
         }
 
-        Channel channel = ctx.getChannel();
-
+        Channel channel = ctx.channel();
 
         if (msg instanceof SaslMessageToken) {
             // initialize server-side SASL functionality, if we haven't yet
@@ -62,22 +56,20 @@ public class KerberosSaslServerHandler extends SimpleChannelUpstreamHandler {
             try {
                 LOG.debug("Got SaslMessageToken!");
 
-                KerberosSaslNettyServer saslNettyServer = KerberosSaslNettyServerState.getKerberosSaslNettyServer
-                    .get(channel);
+                KerberosSaslNettyServer saslNettyServer = channel.attr(KerberosSaslNettyServerState.KERBOROS_SASL_NETTY_SERVER).get();
                 if (saslNettyServer == null) {
                     LOG.debug("No saslNettyServer for {}  yet; creating now, with topology token: ", channel);
                     try {
                         saslNettyServer = new KerberosSaslNettyServer(topoConf, jaas_section, authorizedUsers);
-                        KerberosSaslNettyServerState.getKerberosSaslNettyServer.set(channel,
-                                                                                    saslNettyServer);
+                        channel.attr(KerberosSaslNettyServerState.KERBOROS_SASL_NETTY_SERVER).set(saslNettyServer);
                     } catch (RuntimeException ioe) {
                         LOG.error("Error occurred while creating saslNettyServer on server {} for client {}",
-                                  channel.getLocalAddress(), channel.getRemoteAddress());
+                                  channel.localAddress(), channel.remoteAddress());
                         throw ioe;
                     }
                 } else {
                     LOG.debug("Found existing saslNettyServer on server: {} for client {}",
-                              channel.getLocalAddress(), channel.getRemoteAddress());
+                              channel.localAddress(), channel.remoteAddress());
                 }
 
                 byte[] responseBytes = saslNettyServer.response(((SaslMessageToken) msg)
@@ -86,10 +78,10 @@ public class KerberosSaslServerHandler extends SimpleChannelUpstreamHandler {
                 SaslMessageToken saslTokenMessageRequest = new SaslMessageToken(responseBytes);
 
                 if (saslTokenMessageRequest.getSaslToken() == null) {
-                    channel.write(ControlMessage.SASL_COMPLETE_REQUEST);
+                    channel.writeAndFlush(ControlMessage.SASL_COMPLETE_REQUEST, channel.voidPromise());
                 } else {
                     // Send response to client.
-                    channel.write(saslTokenMessageRequest);
+                    channel.writeAndFlush(saslTokenMessageRequest, channel.voidPromise());
                 }
 
                 if (saslNettyServer.isComplete()) {
@@ -97,12 +89,11 @@ public class KerberosSaslServerHandler extends SimpleChannelUpstreamHandler {
                     // SASL-Complete message to the client.
                     LOG.info("SASL authentication is complete for client with username: {}",
                              saslNettyServer.getUserName());
-                    channel.write(ControlMessage.SASL_COMPLETE_REQUEST);
+                    channel.writeAndFlush(ControlMessage.SASL_COMPLETE_REQUEST, channel.voidPromise());
                     LOG.debug("Removing SaslServerHandler from pipeline since SASL authentication is complete.");
-                    ctx.getPipeline().remove(this);
+                    ctx.pipeline().remove(this);
                     server.authenticated(channel);
                 }
-                return;
             } catch (Exception ex) {
                 LOG.error("Failed to handle SaslMessageToken: ", ex);
                 throw ex;
@@ -115,15 +106,13 @@ public class KerberosSaslServerHandler extends SimpleChannelUpstreamHandler {
             // authentication has not completed.
             LOG.warn("Sending upstream an unexpected non-SASL message : {}",
                      msg);
-            Channels.fireMessageReceived(ctx, msg);
+            ctx.fireChannelRead(msg);
         }
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-        if (server != null) {
-            server.closeChannel(e.getChannel());
-        }
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        ctx.close();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageBatch.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageBatch.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageBatch.java
index acf3893..e29d43c 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageBatch.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageBatch.java
@@ -14,13 +14,12 @@ package org.apache.storm.messaging.netty;
 
 import java.util.ArrayList;
 import org.apache.storm.messaging.TaskMessage;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBuffer;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBufferOutputStream;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBuffers;
+import org.apache.storm.shade.io.netty.buffer.ByteBuf;
 
-class MessageBatch {
-    private int buffer_size;
-    private ArrayList<TaskMessage> msgs;
+class MessageBatch implements INettySerializable {
+
+    private final int buffer_size;
+    private final ArrayList<TaskMessage> msgs;
     private int encoded_length;
 
     MessageBatch(int buffer_size) {
@@ -38,7 +37,6 @@ class MessageBatch {
         encoded_length += msgEncodeLength(msg);
     }
 
-
     private int msgEncodeLength(TaskMessage taskMsg) {
         if (taskMsg == null) {
             return 0;
@@ -72,30 +70,30 @@ class MessageBatch {
         return msgs.size();
     }
 
+    @Override
+    public int encodeLength() {
+        return encoded_length;
+    }
+    
     /**
      * create a buffer containing the encoding of this batch
      */
-    ChannelBuffer buffer() throws Exception {
-        ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encoded_length));
-
+    @Override
+    public void write(ByteBuf dest) {
         for (TaskMessage msg : msgs) {
-            writeTaskMessage(bout, msg);
+            writeTaskMessage(dest, msg);
         }
 
         //add a END_OF_BATCH indicator
-        ControlMessage.EOB_MESSAGE.write(bout);
-
-        bout.close();
-
-        return bout.buffer();
+        ControlMessage.EOB_MESSAGE.write(dest);
     }
 
     /**
-     * write a TaskMessage into a stream
+     * write a TaskMessage into a buffer
      *
      * Each TaskMessage is encoded as: task ... short(2) len ... int(4) payload ... byte[]     *
      */
-    private void writeTaskMessage(ChannelBufferOutputStream bout, TaskMessage message) throws Exception {
+    private void writeTaskMessage(ByteBuf buf, TaskMessage message) {
         int payload_len = 0;
         if (message.message() != null) {
             payload_len = message.message().length;
@@ -106,10 +104,10 @@ class MessageBatch {
             throw new RuntimeException("Task ID should not exceed " + Short.MAX_VALUE);
         }
 
-        bout.writeShort((short) task_id);
-        bout.writeInt(payload_len);
+        buf.writeShort((short) task_id);
+        buf.writeInt(payload_len);
         if (payload_len > 0) {
-            bout.write(message.message());
+            buf.writeBytes(message.message());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java
index 5b800f1..fcd8f0e 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java
@@ -16,14 +16,13 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.storm.messaging.TaskMessage;
 import org.apache.storm.serialization.KryoValuesDeserializer;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBuffer;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.handler.codec.frame.FrameDecoder;
+import org.apache.storm.shade.io.netty.buffer.ByteBuf;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.handler.codec.ByteToMessageDecoder;
 
-public class MessageDecoder extends FrameDecoder {
+public class MessageDecoder extends ByteToMessageDecoder {
 
-    private KryoValuesDeserializer deser;
+    private final KryoValuesDeserializer deser;
 
     public MessageDecoder(KryoValuesDeserializer deser) {
         this.deser = deser;
@@ -37,12 +36,13 @@ public class MessageDecoder extends FrameDecoder {
      *  len ... int(4)
      *  payload ... byte[]     *
      */
-    protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
         // Make sure that we have received at least a short 
         long available = buf.readableBytes();
         if (available < 2) {
             //need more data
-            return null;
+            return;
         }
 
         List<Object> ret = new ArrayList<>();
@@ -67,7 +67,8 @@ public class MessageDecoder extends FrameDecoder {
                 if (ctrl_msg == ControlMessage.EOB_MESSAGE) {
                     continue;
                 } else {
-                    return ctrl_msg;
+                    out.add(ctrl_msg);
+                    return;
                 }
             }
 
@@ -77,28 +78,30 @@ public class MessageDecoder extends FrameDecoder {
                 if (buf.readableBytes() < 4) {
                     //need more data
                     buf.resetReaderIndex();
-                    return null;
+                    return;
                 }
 
                 // Read the length field.
                 int length = buf.readInt();
                 if (length <= 0) {
-                    return new SaslMessageToken(null);
+                    out.add(new SaslMessageToken(null));
+                    return;
                 }
 
                 // Make sure if there's enough bytes in the buffer.
                 if (buf.readableBytes() < length) {
                     // The whole bytes were not received yet - return null.
                     buf.resetReaderIndex();
-                    return null;
+                    return;
                 }
 
                 // There's enough bytes in the buffer. Read it.  
-                ChannelBuffer payload = buf.readBytes(length);
-
+                byte[] bytes = new byte[length];
+                buf.readBytes(bytes);
                 // Successfully decoded a frame.
                 // Return a SaslTokenMessageRequest object
-                return new SaslMessageToken(payload.array());
+                out.add(new SaslMessageToken(bytes));
+                return;
             }
 
             // case 3: BackPressureStatus
@@ -107,18 +110,18 @@ public class MessageDecoder extends FrameDecoder {
                 if (available < 4) {
                     //Need  more data
                     buf.resetReaderIndex();
-                    return null;
+                    return;
                 }
                 int dataLen = buf.readInt();
                 if (available < 4 + dataLen) {
                     // need more data
                     buf.resetReaderIndex();
-                    return null;
+                    return;
                 }
                 byte[] bytes = new byte[dataLen];
                 buf.readBytes(bytes);
-                return BackPressureStatus.read(bytes, deser);
-
+                out.add(BackPressureStatus.read(bytes, deser));
+                return;
             }
 
             // case 4: task Message
@@ -149,18 +152,16 @@ public class MessageDecoder extends FrameDecoder {
             available -= length;
 
             // There's enough bytes in the buffer. Read it.
-            ChannelBuffer payload = buf.readBytes(length);
-
+            byte[] bytes = new byte[length];
+            buf.readBytes(bytes);
 
             // Successfully decoded a frame.
             // Return a TaskMessage object
-            ret.add(new TaskMessage(code, payload.array()));
+            ret.add(new TaskMessage(code, bytes));
         }
 
-        if (ret.size() == 0) {
-            return null;
-        } else {
-            return ret;
+        if (!ret.isEmpty()) {
+            out.add(ret);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageEncoder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageEncoder.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageEncoder.java
deleted file mode 100644
index d2e528c..0000000
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageEncoder.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.messaging.netty;
-
-import org.apache.storm.serialization.KryoValuesSerializer;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
-
-public class MessageEncoder extends OneToOneEncoder {
-
-    private KryoValuesSerializer ser;
-
-    public MessageEncoder(KryoValuesSerializer ser) {
-        this.ser = ser;
-    }
-
-    @Override
-    protected Object encode(ChannelHandlerContext ctx, Channel channel, Object obj) throws Exception {
-        if (obj instanceof ControlMessage) {
-            return ((ControlMessage) obj).buffer();
-        }
-
-        if (obj instanceof MessageBatch) {
-            return ((MessageBatch) obj).buffer();
-        }
-
-        if (obj instanceof BackPressureStatus) {
-            return ((BackPressureStatus) obj).buffer(ser);
-        }
-
-        if (obj instanceof SaslMessageToken) {
-            return ((SaslMessageToken) obj).buffer();
-        }
-
-        throw new RuntimeException("Unsupported encoding of object of class " + obj.getClass().getName());
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyRenameThreadFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyRenameThreadFactory.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyRenameThreadFactory.java
index 3fe98c4..f66510e 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyRenameThreadFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyRenameThreadFactory.java
@@ -14,21 +14,14 @@ package org.apache.storm.messaging.netty;
 
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.storm.shade.org.jboss.netty.util.ThreadNameDeterminer;
-import org.apache.storm.shade.org.jboss.netty.util.ThreadRenamingRunnable;
 
 public class NettyRenameThreadFactory implements ThreadFactory {
 
-    static final NettyUncaughtExceptionHandler uncaughtExceptionHandler = new NettyUncaughtExceptionHandler();
+    private static final NettyUncaughtExceptionHandler UNCAUGHT_EXCEPTION_HANDLER = new NettyUncaughtExceptionHandler();
 
-    static {
-        //Rename Netty threads
-        ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
-    }
-
-    final ThreadGroup group;
-    final AtomicInteger index = new AtomicInteger(1);
-    final String name;
+    private final ThreadGroup group;
+    private final AtomicInteger index = new AtomicInteger(1);
+    private final String name;
 
     public NettyRenameThreadFactory(String name) {
         SecurityManager s = System.getSecurityManager();
@@ -37,6 +30,7 @@ public class NettyRenameThreadFactory implements ThreadFactory {
         this.name = name;
     }
 
+    @Override
     public Thread newThread(Runnable r) {
         Thread t = new Thread(group, r, name + "-" + index.getAndIncrement(), 0);
         if (t.isDaemon()) {
@@ -45,7 +39,7 @@ public class NettyRenameThreadFactory implements ThreadFactory {
         if (t.getPriority() != Thread.NORM_PRIORITY) {
             t.setPriority(Thread.NORM_PRIORITY);
         }
-        t.setUncaughtExceptionHandler(uncaughtExceptionHandler);
+        t.setUncaughtExceptionHandler(UNCAUGHT_EXCEPTION_HANDLER);
         return t;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/NettySerializableMessageEncoder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/NettySerializableMessageEncoder.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/NettySerializableMessageEncoder.java
new file mode 100644
index 0000000..0eab349
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/NettySerializableMessageEncoder.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.shade.io.netty.buffer.ByteBuf;
+import org.apache.storm.shade.io.netty.channel.ChannelHandler;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.handler.codec.MessageToByteEncoder;
+
+@ChannelHandler.Sharable
+public class NettySerializableMessageEncoder extends MessageToByteEncoder<INettySerializable> {
+
+    public static final NettySerializableMessageEncoder INSTANCE = new NettySerializableMessageEncoder();
+    
+    private NettySerializableMessageEncoder() {}
+    
+    @Override
+    protected void encode(ChannelHandlerContext ctx, INettySerializable msg, ByteBuf out) throws Exception {
+        msg.write(out);
+    }
+
+    @Override
+    protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, INettySerializable msg, boolean preferDirect) throws Exception {
+        return ctx.alloc().ioBuffer(msg.encodeLength());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslMessageToken.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslMessageToken.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslMessageToken.java
index cafc109..669c88a 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslMessageToken.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslMessageToken.java
@@ -12,10 +12,8 @@
 
 package org.apache.storm.messaging.netty;
 
-import java.io.IOException;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBuffer;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBufferOutputStream;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBuffers;
+import org.apache.storm.shade.io.netty.buffer.ByteBuf;
+import org.apache.storm.shade.io.netty.buffer.Unpooled;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,7 +50,8 @@ public class SaslMessageToken implements INettySerializable {
     }
 
     public static SaslMessageToken read(byte[] serial) {
-        ChannelBuffer sm_buffer = ChannelBuffers.copiedBuffer(serial);
+        ByteBuf sm_buffer = Unpooled.wrappedBuffer(serial);
+        try {
         short identifier = sm_buffer.readShort();
         int payload_len = sm_buffer.readInt();
         if (identifier != IDENTIFIER) {
@@ -61,6 +60,9 @@ public class SaslMessageToken implements INettySerializable {
         byte token[] = new byte[payload_len];
         sm_buffer.readBytes(token, 0, payload_len);
         return new SaslMessageToken(token);
+        } finally {
+            sm_buffer.release();
+        }
     }
 
     /**
@@ -81,31 +83,29 @@ public class SaslMessageToken implements INettySerializable {
         this.token = token;
     }
 
+    @Override
     public int encodeLength() {
         return 2 + 4 + token.length;
     }
 
     /**
-     * encode the current SaslToken Message into a channel buffer SaslTokenMessageRequest is encoded as: identifier .... short(2) payload
+     * encode the current SaslToken Message into a ByteBuf.
+     * 
+     * <p>SaslTokenMessageRequest is encoded as: identifier .... short(2) payload
      * length .... int payload .... byte[]
-     *
-     * @throws IOException
      */
-    public ChannelBuffer buffer() throws IOException {
-        ChannelBufferOutputStream bout = new ChannelBufferOutputStream(
-            ChannelBuffers.directBuffer(encodeLength()));
+    @Override
+    public void write(ByteBuf dest) {
         int payload_len = 0;
         if (token != null) {
             payload_len = token.length;
         }
 
-        bout.writeShort(IDENTIFIER);
-        bout.writeInt(payload_len);
+        dest.writeShort(IDENTIFIER);
+        dest.writeInt(payload_len);
 
         if (payload_len > 0) {
-            bout.write(token);
+            dest.writeBytes(token);
         }
-        bout.close();
-        return bout.buffer();
     }
 }


[3/5] storm git commit: STORM-1038: Upgrade to latest Netty

Posted by ka...@apache.org.
http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClientState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClientState.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClientState.java
index 45fa2d8..1818dfa 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClientState.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClientState.java
@@ -12,15 +12,10 @@
 
 package org.apache.storm.messaging.netty;
 
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelLocal;
+import org.apache.storm.shade.io.netty.util.AttributeKey;
 
 final class SaslNettyClientState {
 
-    public static final ChannelLocal<SaslNettyClient> getSaslNettyClient = new ChannelLocal<SaslNettyClient>() {
-        protected SaslNettyClient initialValue(Channel channel) {
-            return null;
-        }
-    };
+    public static final AttributeKey<SaslNettyClient> SASL_NETTY_CLIENT = AttributeKey.valueOf("sasl.netty.client");
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyServerState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyServerState.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyServerState.java
index f8cb386..2c79519 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyServerState.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyServerState.java
@@ -12,14 +12,9 @@
 
 package org.apache.storm.messaging.netty;
 
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelLocal;
+import org.apache.storm.shade.io.netty.util.AttributeKey;
 
 final class SaslNettyServerState {
 
-    public static final ChannelLocal<SaslNettyServer> getSaslNettyServer = new ChannelLocal<SaslNettyServer>() {
-        protected SaslNettyServer initialValue(Channel channel) {
-            return null;
-        }
-    };
+    public static final AttributeKey<SaslNettyServer> SASL_NETTY_SERVER = AttributeKey.valueOf("sasl.netty.server");
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormClientHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormClientHandler.java
index 293cd38..25b0aa2 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormClientHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormClientHandler.java
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
@@ -13,21 +14,18 @@
 package org.apache.storm.messaging.netty;
 
 import java.io.IOException;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelStateEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.Channels;
-import org.apache.storm.shade.org.jboss.netty.channel.MessageEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.channel.ChannelInboundHandlerAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
+public class SaslStormClientHandler extends ChannelInboundHandlerAdapter {
 
     private static final Logger LOG = LoggerFactory
         .getLogger(SaslStormClientHandler.class);
-    long start_time;
-    private ISaslClient client;
+    private final long start_time;
+    private final ISaslClient client;
     /**
      * Used for client or server's token to send or receive from each other.
      */
@@ -41,25 +39,22 @@ public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
     }
 
     @Override
-    public void channelConnected(ChannelHandlerContext ctx,
-                                 ChannelStateEvent event) {
-        // register the newly established channel
-        Channel channel = ctx.getChannel();
-        client.channelConnected(channel);
+    public void channelActive(ChannelHandlerContext ctx) {
+        Channel channel = ctx.channel();
 
+        LOG.info("Connection established from " + channel.localAddress()
+            + " to " + channel.remoteAddress());
         try {
-            SaslNettyClient saslNettyClient = SaslNettyClientState.getSaslNettyClient
-                .get(channel);
+            SaslNettyClient saslNettyClient = channel.attr(SaslNettyClientState.SASL_NETTY_CLIENT).get();
 
             if (saslNettyClient == null) {
                 LOG.debug("Creating saslNettyClient now " + "for channel: "
                           + channel);
                 saslNettyClient = new SaslNettyClient(name, token);
-                SaslNettyClientState.getSaslNettyClient.set(channel,
-                                                            saslNettyClient);
+                channel.attr(SaslNettyClientState.SASL_NETTY_CLIENT).set(saslNettyClient);
             }
             LOG.debug("Sending SASL_TOKEN_MESSAGE_REQUEST");
-            channel.write(ControlMessage.SASL_TOKEN_MESSAGE_REQUEST);
+            channel.writeAndFlush(ControlMessage.SASL_TOKEN_MESSAGE_REQUEST, channel.voidPromise());
         } catch (Exception e) {
             LOG.error("Failed to authenticate with server " + "due to error: ",
                       e);
@@ -67,54 +62,65 @@ public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent event)
-        throws Exception {
+    public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception {
         LOG.debug("send/recv time (ms): {}",
                   (System.currentTimeMillis() - start_time));
 
-        Channel channel = ctx.getChannel();
-
+        // examine the response message from server
+        if (message instanceof ControlMessage) {
+            handleControlMessage(ctx, (ControlMessage) message);
+        } else if (message instanceof SaslMessageToken) {
+            handleSaslMessageToken(ctx, (SaslMessageToken) message);
+        } else {
+            LOG.error("Unexpected message from server: {}", message);
+        }
+    }
+    
+    private SaslNettyClient getChannelSaslNettyClient(Channel channel) throws Exception {
         // Generate SASL response to server using Channel-local SASL client.
-        SaslNettyClient saslNettyClient = SaslNettyClientState.getSaslNettyClient
-            .get(channel);
+        SaslNettyClient saslNettyClient = channel.attr(SaslNettyClientState.SASL_NETTY_CLIENT).get();
         if (saslNettyClient == null) {
             throw new Exception("saslNettyClient was unexpectedly "
-                                + "null for channel: " + channel);
+                + "null for channel: " + channel);
         }
+        return saslNettyClient;
+    }
+    
+    private void handleControlMessage(ChannelHandlerContext ctx, ControlMessage controlMessage) throws Exception {
+        SaslNettyClient saslNettyClient = getChannelSaslNettyClient(ctx.channel());
+        if (controlMessage == ControlMessage.SASL_COMPLETE_REQUEST) {
+            LOG.debug("Server has sent us the SaslComplete "
+                + "message. Allowing normal work to proceed.");
 
-        // examine the response message from server
-        if (event.getMessage() instanceof ControlMessage) {
-            ControlMessage msg = (ControlMessage) event.getMessage();
-            if (msg == ControlMessage.SASL_COMPLETE_REQUEST) {
-                LOG.debug("Server has sent us the SaslComplete "
-                          + "message. Allowing normal work to proceed.");
-
-                if (!saslNettyClient.isComplete()) {
-                    LOG.error("Server returned a Sasl-complete message, "
-                              + "but as far as we can tell, we are not authenticated yet.");
-                    throw new Exception("Server returned a "
-                                        + "Sasl-complete message, but as far as "
-                                        + "we can tell, we are not authenticated yet.");
-                }
-                ctx.getPipeline().remove(this);
-                this.client.channelReady();
-
-                // We call fireMessageReceived since the client is allowed to
-                // perform this request. The client's request will now proceed
-                // to the next pipeline component namely StormClientHandler.
-                Channels.fireMessageReceived(ctx, msg);
-                return;
+            if (!saslNettyClient.isComplete()) {
+                LOG.error("Server returned a Sasl-complete message, "
+                    + "but as far as we can tell, we are not authenticated yet.");
+                throw new Exception("Server returned a "
+                    + "Sasl-complete message, but as far as "
+                    + "we can tell, we are not authenticated yet.");
             }
+            ctx.pipeline().remove(this);
+            this.client.channelReady(ctx.channel());
+
+            // We call fireMessageRead since the client is allowed to
+            // perform this request. The client's request will now proceed
+            // to the next pipeline component namely StormClientHandler.
+            ctx.fireChannelRead(controlMessage);
+        } else {
+            LOG.warn("Unexpected control message: {}", controlMessage);
         }
-        SaslMessageToken saslTokenMessage = (SaslMessageToken) event
-            .getMessage();
+    }
+    
+    private void handleSaslMessageToken(ChannelHandlerContext ctx, SaslMessageToken saslMessageToken) throws Exception {
+        Channel channel = ctx.channel();
+        SaslNettyClient saslNettyClient = getChannelSaslNettyClient(channel);
         LOG.debug("Responding to server's token of length: "
-                  + saslTokenMessage.getSaslToken().length);
+                  + saslMessageToken.getSaslToken().length);
 
         // Generate SASL response (but we only actually send the response if
         // it's non-null.
         byte[] responseToServer = saslNettyClient
-            .saslResponse(saslTokenMessage);
+            .saslResponse(saslMessageToken);
         if (responseToServer == null) {
             // If we generate a null response, then authentication has completed
             // (if not, warn), and return without sending a response back to the
@@ -127,7 +133,7 @@ public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
                 throw new Exception("Server response is null, but as far as "
                                     + "we can tell, we are not authenticated yet.");
             }
-            this.client.channelReady();
+            this.client.channelReady(channel);
             return;
         } else {
             LOG.debug("Response to server token has length:"
@@ -136,7 +142,7 @@ public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
         // Construct a message containing the SASL response and send it to the
         // server.
         SaslMessageToken saslResponse = new SaslMessageToken(responseToServer);
-        channel.write(saslResponse);
+        channel.writeAndFlush(saslResponse, channel.voidPromise());
     }
 
     private void getSASLCredentials() throws IOException {

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormServerAuthorizeHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormServerAuthorizeHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormServerAuthorizeHandler.java
index 7db90db..64a8bae 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormServerAuthorizeHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormServerAuthorizeHandler.java
@@ -12,21 +12,18 @@
 
 package org.apache.storm.messaging.netty;
 
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.channel.Channels;
-import org.apache.storm.shade.org.jboss.netty.channel.MessageEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.channel.ChannelInboundHandlerAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Authorize or deny client requests based on existence and completeness of client's SASL authentication.
  */
-public class SaslStormServerAuthorizeHandler extends SimpleChannelUpstreamHandler {
+public class SaslStormServerAuthorizeHandler extends ChannelInboundHandlerAdapter {
 
     private static final Logger LOG = LoggerFactory
-        .getLogger(SaslStormServerHandler.class);
+        .getLogger(SaslStormServerAuthorizeHandler.class);
 
     /**
      * Constructor.
@@ -35,19 +32,16 @@ public class SaslStormServerAuthorizeHandler extends SimpleChannelUpstreamHandle
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
-        Object msg = e.getMessage();
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         if (msg == null) {
             return;
         }
 
-        Channel channel = ctx.getChannel();
         LOG.debug("messageReceived: Checking whether the client is authorized to send messages to the server ");
 
         // Authorize: client is allowed to doRequest() if and only if the client
         // has successfully authenticated with this server.
-        SaslNettyServer saslNettyServer = SaslNettyServerState.getSaslNettyServer
-            .get(channel);
+        SaslNettyServer saslNettyServer = ctx.channel().attr(SaslNettyServerState.SASL_NETTY_SERVER).get();
 
         if (saslNettyServer == null) {
             LOG.warn("messageReceived: This client is *NOT* authorized to perform "
@@ -70,9 +64,9 @@ public class SaslStormServerAuthorizeHandler extends SimpleChannelUpstreamHandle
                   + saslNettyServer.getUserName()
                   + " is authorized to do request " + "on server.");
 
-        // We call fireMessageReceived since the client is allowed to perform
+        // We call fireChannelRead since the client is allowed to perform
         // this request. The client's request will now proceed to the next
         // pipeline component.
-        Channels.fireMessageReceived(ctx, msg);
+        ctx.fireChannelRead(msg);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormServerHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormServerHandler.java
index 40f8916..ce69a6f 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormServerHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormServerHandler.java
@@ -13,20 +13,17 @@
 package org.apache.storm.messaging.netty;
 
 import java.io.IOException;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.channel.Channels;
-import org.apache.storm.shade.org.jboss.netty.channel.ExceptionEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.MessageEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.channel.ChannelInboundHandlerAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
+public class SaslStormServerHandler extends ChannelInboundHandlerAdapter {
 
     private static final Logger LOG = LoggerFactory
         .getLogger(SaslStormServerHandler.class);
-    ISaslServer server;
+    private final ISaslServer server;
     /**
      * Used for client or server's token to send or receive from each other.
      */
@@ -39,41 +36,39 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-        throws Exception {
-        Object msg = e.getMessage();
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         if (msg == null) {
             return;
         }
 
-        Channel channel = ctx.getChannel();
+        Channel channel = ctx.channel();
 
         if (msg instanceof ControlMessage
-            && e.getMessage() == ControlMessage.SASL_TOKEN_MESSAGE_REQUEST) {
+            && msg == ControlMessage.SASL_TOKEN_MESSAGE_REQUEST) {
             // initialize server-side SASL functionality, if we haven't yet
             // (in which case we are looking at the first SASL message from the
             // client).
-            SaslNettyServer saslNettyServer = SaslNettyServerState.getSaslNettyServer
-                .get(channel);
+            SaslNettyServer saslNettyServer = channel.attr(SaslNettyServerState.SASL_NETTY_SERVER).get();
             if (saslNettyServer == null) {
                 LOG.debug("No saslNettyServer for " + channel
-                          + " yet; creating now, with topology token: ");
+                          + " yet; creating now, with topology token: " + topologyName);
                 try {
                     saslNettyServer = new SaslNettyServer(topologyName, token);
-                } catch (IOException ioe) {
+                    LOG.debug("SaslNettyServer for " + channel
+                        + "created with topology token: " + topologyName);
+                } catch (IOException e) {
                     LOG.error("Error occurred while creating saslNettyServer on server "
-                              + channel.getLocalAddress()
+                              + channel.localAddress()
                               + " for client "
-                              + channel.getRemoteAddress());
-                    saslNettyServer = null;
+                              + channel.remoteAddress());
+                    throw new IllegalStateException("Failed to set SaslNettyServerState.SASL_NETTY_SERVER");
                 }
 
-                SaslNettyServerState.getSaslNettyServer.set(channel,
-                                                            saslNettyServer);
+                channel.attr(SaslNettyServerState.SASL_NETTY_SERVER).set(saslNettyServer);
             } else {
                 LOG.debug("Found existing saslNettyServer on server:"
-                          + channel.getLocalAddress() + " for client "
-                          + channel.getRemoteAddress());
+                          + channel.localAddress() + " for client "
+                          + channel.remoteAddress());
             }
 
             LOG.debug("processToken:  With nettyServer: " + saslNettyServer
@@ -83,7 +78,7 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
             saslTokenMessageRequest = new SaslMessageToken(
                 saslNettyServer.response(new byte[0]));
             // Send response to client.
-            channel.write(saslTokenMessageRequest);
+            channel.writeAndFlush(saslTokenMessageRequest, channel.voidPromise());
             // do not send upstream to other handlers: no further action needs
             // to be done for SASL_TOKEN_MESSAGE_REQUEST requests.
             return;
@@ -93,8 +88,7 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
             // initialize server-side SASL functionality, if we haven't yet
             // (in which case we are looking at the first SASL message from the
             // client).
-            SaslNettyServer saslNettyServer = SaslNettyServerState.getSaslNettyServer
-                .get(channel);
+            SaslNettyServer saslNettyServer = channel.attr(SaslNettyServerState.SASL_NETTY_SERVER).get();
             if (saslNettyServer == null) {
                 throw new Exception("saslNettyServer was unexpectedly "
                                     + "null for channel: " + channel);
@@ -104,17 +98,17 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
                                              .getSaslToken()));
 
             // Send response to client.
-            channel.write(saslTokenMessageRequest);
+            channel.writeAndFlush(saslTokenMessageRequest, channel.voidPromise());
 
             if (saslNettyServer.isComplete()) {
                 // If authentication of client is complete, we will also send a
                 // SASL-Complete message to the client.
                 LOG.debug("SASL authentication is complete for client with "
                           + "username: " + saslNettyServer.getUserName());
-                channel.write(ControlMessage.SASL_COMPLETE_REQUEST);
+                channel.writeAndFlush(ControlMessage.SASL_COMPLETE_REQUEST, channel.voidPromise());
                 LOG.debug("Removing SaslServerHandler from pipeline since SASL "
                           + "authentication is complete.");
-                ctx.getPipeline().remove(this);
+                ctx.pipeline().remove(this);
                 server.authenticated(channel);
             }
         } else {
@@ -125,15 +119,13 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
             // authentication has not completed.
             LOG.warn("Sending upstream an unexpected non-SASL message :  "
                      + msg);
-            Channels.fireMessageReceived(ctx, msg);
+            ctx.fireChannelRead(msg);
         }
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-        if (server != null) {
-            server.closeChannel(e.getChannel());
-        }
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        ctx.close();
     }
 
     private void getSASLCredentials() throws IOException {

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
index d5b2c8a..6500abe 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
@@ -13,14 +13,13 @@
 package org.apache.storm.messaging.netty;
 
 import java.net.InetSocketAddress;
-import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
@@ -33,37 +32,45 @@ import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.metric.api.IStatefulObject;
 import org.apache.storm.serialization.KryoValuesDeserializer;
 import org.apache.storm.serialization.KryoValuesSerializer;
-import org.apache.storm.shade.org.jboss.netty.bootstrap.ServerBootstrap;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelFactory;
-import org.apache.storm.shade.org.jboss.netty.channel.group.ChannelGroup;
-import org.apache.storm.shade.org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.apache.storm.shade.org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.apache.storm.shade.io.netty.bootstrap.ServerBootstrap;
+import org.apache.storm.shade.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelFuture;
+import org.apache.storm.shade.io.netty.channel.ChannelOption;
+import org.apache.storm.shade.io.netty.channel.EventLoopGroup;
+import org.apache.storm.shade.io.netty.channel.group.ChannelGroup;
+import org.apache.storm.shade.io.netty.channel.group.DefaultChannelGroup;
+import org.apache.storm.shade.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.storm.shade.io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.storm.shade.io.netty.util.concurrent.GlobalEventExecutor;
 import org.apache.storm.utils.ObjectReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServer {
 
+    public static final int LOAD_METRICS_TASK_ID = -1;
+    
     private static final Logger LOG = LoggerFactory.getLogger(Server.class);
-    final ChannelFactory factory;
-    final ServerBootstrap bootstrap;
+    private final EventLoopGroup bossEventLoopGroup;
+    private final EventLoopGroup workerEventLoopGroup;
+    private final ServerBootstrap bootstrap;
     private final ConcurrentHashMap<String, AtomicInteger> messagesEnqueued = new ConcurrentHashMap<>();
     private final AtomicInteger messagesDequeued = new AtomicInteger(0);
     private final int boundPort;
-    Map<String, Object> topoConf;
-    int port;
-    volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
-    KryoValuesSerializer _ser;
-    KryoValuesDeserializer deser;
+    private final Map<String, Object> topoConf;
+    private final int port;
+    private final ChannelGroup allChannels = new DefaultChannelGroup("storm-server", GlobalEventExecutor.INSTANCE);
+    private final KryoValuesSerializer ser;
+    private final KryoValuesDeserializer deser;
     private volatile boolean closing = false;
-    private IConnectionCallback _cb = null;
+    private IConnectionCallback cb = null;
     private Supplier<Object> newConnectionResponse;
 
     Server(Map<String, Object> topoConf, int port) {
         this.topoConf = topoConf;
         this.port = port;
-        _ser = new KryoValuesSerializer(topoConf);
+        ser = new KryoValuesSerializer(topoConf);
         deser = new KryoValuesDeserializer(topoConf);
 
         // Configure the server.
@@ -74,30 +81,33 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
         ThreadFactory bossFactory = new NettyRenameThreadFactory(netty_name() + "-boss");
         ThreadFactory workerFactory = new NettyRenameThreadFactory(netty_name() + "-worker");
 
-        if (maxWorkers > 0) {
-            factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
-                                                        Executors.newCachedThreadPool(workerFactory), maxWorkers);
-        } else {
-            factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
-                                                        Executors.newCachedThreadPool(workerFactory));
-        }
+        bossEventLoopGroup = new NioEventLoopGroup(1, bossFactory);
+        // 0 means DEFAULT_EVENT_LOOP_THREADS
+        // https://github.com/netty/netty/blob/netty-4.1.24.Final/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java#L40
+        this.workerEventLoopGroup = new NioEventLoopGroup(maxWorkers > 0 ? maxWorkers : 0, workerFactory);
 
         LOG.info("Create Netty Server " + netty_name() + ", buffer_size: " + buffer_size + ", maxWorkers: " + maxWorkers);
 
-        bootstrap = new ServerBootstrap(factory);
-        bootstrap.setOption("child.tcpNoDelay", true);
-        bootstrap.setOption("child.receiveBufferSize", buffer_size);
-        bootstrap.setOption("child.keepAlive", true);
-        bootstrap.setOption("reuseAddress", true);
-        bootstrap.setOption("backlog", backlog);
-
-        // Set up the pipeline factory.
-        bootstrap.setPipelineFactory(new StormServerPipelineFactory(this));
+        bootstrap = new ServerBootstrap()
+            .group(bossEventLoopGroup, workerEventLoopGroup)
+            .channel(NioServerSocketChannel.class)
+            .option(ChannelOption.SO_REUSEADDR, true)
+            .option(ChannelOption.SO_BACKLOG, backlog)
+            .childOption(ChannelOption.TCP_NODELAY, true)
+            .childOption(ChannelOption.SO_RCVBUF, buffer_size)
+            .childOption(ChannelOption.SO_KEEPALIVE, true)
+            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+            .childHandler(new StormServerPipelineFactory(ser, deser, topoConf, this));
 
         // Bind and start to accept incoming connections.
-        Channel channel = bootstrap.bind(new InetSocketAddress(port));
-        boundPort = ((InetSocketAddress) channel.getLocalAddress()).getPort();
-        allChannels.add(channel);
+        try {
+            ChannelFuture bindFuture = bootstrap.bind(new InetSocketAddress(port)).sync();
+            Channel channel = bindFuture.channel();
+            boundPort = ((InetSocketAddress) channel.localAddress()).getPort();
+            allChannels.add(channel);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     private void addReceiveCount(String from, int amount) {
@@ -124,18 +134,18 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
      * @throws InterruptedException
      */
     protected void enqueue(List<TaskMessage> msgs, String from) throws InterruptedException {
-        if (null == msgs || msgs.size() == 0 || closing) {
+        if (null == msgs || msgs.isEmpty() || closing) {
             return;
         }
         addReceiveCount(from, msgs.size());
-        if (_cb != null) {
-            _cb.recv(msgs);
+        if (cb != null) {
+            cb.recv(msgs);
         }
     }
 
     @Override
     public void registerRecv(IConnectionCallback cb) {
-        _cb = cb;
+        this.cb = cb;
     }
 
     @Override
@@ -143,14 +153,6 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
         this.newConnectionResponse = newConnectionResponse;
     }
 
-    /**
-     * @param channel channel to close
-     */
-    public void closeChannel(Channel channel) {
-        channel.close().awaitUninterruptibly();
-        allChannels.remove(channel);
-    }
-
     @Override
     public int getPort() {
         return boundPort;
@@ -159,26 +161,25 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
     /**
      * close all channels, and release resources
      */
-    public synchronized void close() {
-        if (allChannels != null) {
-            allChannels.close().awaitUninterruptibly();
-            factory.releaseExternalResources();
-            allChannels = null;
-        }
+    @Override
+    public void close() {
+        allChannels.close().awaitUninterruptibly();
+        workerEventLoopGroup.shutdownGracefully().awaitUninterruptibly();
+        bossEventLoopGroup.shutdownGracefully().awaitUninterruptibly();
     }
 
     @Override
-    synchronized public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
+    public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
         MessageBatch mb = new MessageBatch(1);
-        mb.add(new TaskMessage(-1, _ser.serialize(Arrays.asList((Object) taskToLoad))));
-        allChannels.write(mb);
+        mb.add(new TaskMessage(LOAD_METRICS_TASK_ID, ser.serialize(Collections.singletonList((Object) taskToLoad))));
+        allChannels.writeAndFlush(mb);
     }
 
     // this method expected to be thread safe
     @Override
-    synchronized public void sendBackPressureStatus(BackPressureStatus bpStatus) {
+    public void sendBackPressureStatus(BackPressureStatus bpStatus) {
         LOG.info("Sending BackPressure status update to connected workers. BPStatus = {}", bpStatus);
-        allChannels.write(bpStatus);
+        allChannels.writeAndFlush(bpStatus);
     }
 
     @Override
@@ -187,16 +188,11 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
     }
 
     @Override
-    public void send(int task, byte[] message) {
-        throw new UnsupportedOperationException("Server connection should not send any messages");
-    }
-
-    @Override
     public void send(Iterator<TaskMessage> msgs) {
         throw new UnsupportedOperationException("Server connection should not send any messages");
     }
 
-    public String netty_name() {
+    public final String netty_name() {
         return "Netty-server-localhost-" + port;
     }
 
@@ -212,7 +208,7 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
     }
 
     private boolean connectionEstablished(Channel channel) {
-        return channel != null && channel.isBound();
+        return channel != null && channel.isActive();
     }
 
     private boolean connectionEstablished(ChannelGroup allChannels) {
@@ -226,11 +222,12 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
         return allEstablished;
     }
 
+    @Override
     public Object getState() {
         LOG.debug("Getting metrics for server on port {}", port);
         HashMap<String, Object> ret = new HashMap<>();
         ret.put("dequeuedMessages", messagesDequeued.getAndSet(0));
-        HashMap<String, Integer> enqueued = new HashMap<String, Integer>();
+        HashMap<String, Integer> enqueued = new HashMap<>();
         Iterator<Map.Entry<String, AtomicInteger>> it = messagesEnqueued.entrySet().iterator();
         while (it.hasNext()) {
             Map.Entry<String, AtomicInteger> ent = it.next();
@@ -245,8 +242,8 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
         ret.put("enqueued", enqueued);
 
         // Report messageSizes metric, if enabled (non-null).
-        if (_cb instanceof IMetric) {
-            Object metrics = ((IMetric) _cb).getValueAndReset();
+        if (cb instanceof IMetric) {
+            Object metrics = ((IMetric) cb).getValueAndReset();
             if (metrics instanceof Map) {
                 ret.put("messageBytes", metrics);
             }
@@ -258,28 +255,32 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
     /**
      * Implementing IServer.
      **/
-    public void channelConnected(Channel c) {
+    @Override
+    public void channelActive(Channel c) {
         if (newConnectionResponse != null) {
-            c.write(newConnectionResponse.get()); // not synchronized since it is not yet in channel grp, so pvt to this thread
+            c.writeAndFlush(newConnectionResponse.get(), c.voidPromise());
         }
         allChannels.add(c);
     }
 
+    @Override
     public void received(Object message, String remote, Channel channel) throws InterruptedException {
         List<TaskMessage> msgs = (List<TaskMessage>) message;
         enqueue(msgs, remote);
     }
 
+    @Override
     public String name() {
         return (String) topoConf.get(Config.TOPOLOGY_NAME);
     }
 
+    @Override
     public String secretKey() {
         return SaslUtils.getSecretKey(topoConf);
     }
 
+    @Override
     public void authenticated(Channel c) {
-        return;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
index 9b63346..4c38344 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
@@ -18,19 +18,16 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.storm.messaging.TaskMessage;
 import org.apache.storm.serialization.KryoValuesDeserializer;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelStateEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.ExceptionEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.MessageEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.channel.ChannelInboundHandlerAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class StormClientHandler extends SimpleChannelUpstreamHandler {
+public class StormClientHandler extends ChannelInboundHandlerAdapter {
     private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class);
-    private Client client;
-    private KryoValuesDeserializer _des;
-    private AtomicBoolean[] remoteBpStatus;
+    private final Client client;
+    private final KryoValuesDeserializer _des;
+    private final AtomicBoolean[] remoteBpStatus;
 
     StormClientHandler(Client client, AtomicBoolean[] remoteBpStatus, Map<String, Object> conf) {
         this.client = client;
@@ -39,9 +36,8 @@ public class StormClientHandler extends SimpleChannelUpstreamHandler {
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) {
+    public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception {
         //examine the response message from server
-        Object message = event.getMessage();
         if (message instanceof ControlMessage) {
             ControlMessage msg = (ControlMessage) message;
             if (msg == ControlMessage.FAILURE_RESPONSE) {
@@ -61,16 +57,17 @@ public class StormClientHandler extends SimpleChannelUpstreamHandler {
             }
             LOG.debug("Received BackPressure status update : {}", status);
         } else if (message instanceof List) {
-            //This should be the metrics, and there should only be one of them
+            //This should be the load metrics. 
+            //There will usually only be one message, but if there are multiple we only process the latest one.
             List<TaskMessage> list = (List<TaskMessage>) message;
             if (list.size() < 1) {
                 throw new RuntimeException("Didn't see enough load metrics (" + client.getDstAddress() + ") " + list);
             }
-            TaskMessage tm = ((List<TaskMessage>) message).get(list.size() - 1);
-            if (tm.task() != -1) {
+            TaskMessage tm = list.get(list.size() - 1);
+            if (tm.task() != Server.LOAD_METRICS_TASK_ID) {
                 throw new RuntimeException("Metrics messages are sent to the system task (" + client.getDstAddress() + ") " + tm);
             }
-            List metrics = _des.deserialize(tm.message());
+            List<Object> metrics = _des.deserialize(tm.message());
             if (metrics.size() < 1) {
                 throw new RuntimeException("No metrics data in the metrics message (" + client.getDstAddress() + ") " + metrics);
             }
@@ -85,13 +82,7 @@ public class StormClientHandler extends SimpleChannelUpstreamHandler {
     }
 
     @Override
-    public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-        client.notifyInterestChanged(e.getChannel());
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
-        Throwable cause = event.getCause();
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
         if (!(cause instanceof ConnectException)) {
             LOG.info("Connection to " + client.getDstAddress() + " failed:", cause);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java
index 1808d3e..2a790ef 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java
@@ -15,14 +15,14 @@ package org.apache.storm.messaging.netty;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.storm.Config;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelPipeline;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelPipelineFactory;
-import org.apache.storm.shade.org.jboss.netty.channel.Channels;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelInitializer;
+import org.apache.storm.shade.io.netty.channel.ChannelPipeline;
 
-class StormClientPipelineFactory implements ChannelPipelineFactory {
-    private Client client;
-    private AtomicBoolean[] remoteBpStatus;
-    private Map<String, Object> conf;
+class StormClientPipelineFactory extends ChannelInitializer<Channel> {
+    private final Client client;
+    private final AtomicBoolean[] remoteBpStatus;
+    private final Map<String, Object> conf;
 
     StormClientPipelineFactory(Client client, AtomicBoolean[] remoteBpStatus, Map<String, Object> conf) {
         this.client = client;
@@ -30,14 +30,15 @@ class StormClientPipelineFactory implements ChannelPipelineFactory {
         this.conf = conf;
     }
 
-    public ChannelPipeline getPipeline() throws Exception {
+    @Override
+    protected void initChannel(Channel ch) throws Exception {
         // Create a default pipeline implementation.
-        ChannelPipeline pipeline = Channels.pipeline();
+        ChannelPipeline pipeline = ch.pipeline();
 
         // Decoder
         pipeline.addLast("decoder", new MessageDecoder(client.deser));
         // Encoder
-        pipeline.addLast("encoder", new MessageEncoder(client.ser));
+        pipeline.addLast("encoder", NettySerializableMessageEncoder.INSTANCE);
 
         boolean isNettyAuth = (Boolean) conf
             .get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
@@ -48,6 +49,5 @@ class StormClientPipelineFactory implements ChannelPipelineFactory {
         }
         // business logic.
         pipeline.addLast("handler", new StormClientHandler(client, remoteBpStatus, conf));
-        return pipeline;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
index 7138f94..55e7058 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
@@ -17,22 +17,18 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelStateEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.ExceptionEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.MessageEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.channel.ChannelInboundHandlerAdapter;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class StormServerHandler extends SimpleChannelUpstreamHandler {
+public class StormServerHandler extends ChannelInboundHandlerAdapter {
     private static final Logger LOG = LoggerFactory.getLogger(StormServerHandler.class);
-    private static final Set<Class> allowedExceptions = new HashSet<>(Arrays.asList(new Class[]{ IOException.class }));
-    IServer server;
-    private AtomicInteger failure_count;
-    private Channel channel;
+    private static final Set<Class> ALLOWED_EXCEPTIONS = new HashSet<>(Arrays.asList(new Class[]{ IOException.class }));
+    private final IServer server;
+    private final AtomicInteger failure_count;
 
     public StormServerHandler(IServer server) {
         this.server = server;
@@ -40,40 +36,35 @@ public class StormServerHandler extends SimpleChannelUpstreamHandler {
     }
 
     @Override
-    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
-        server.channelConnected(e.getChannel());
-        if (channel != null) {
-            LOG.debug("Replacing channel with new channel: {} -> ",
-                      channel, e.getChannel());
-        }
-        channel = e.getChannel();
-        server.channelConnected(channel);
+    public void channelActive(ChannelHandlerContext ctx) {
+        server.channelActive(ctx.channel());
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
-        Object msgs = e.getMessage();
-        if (msgs == null) {
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        if (msg == null) {
             return;
         }
 
+        Channel channel = ctx.channel();
         try {
-            server.received(msgs, e.getRemoteAddress().toString(), channel);
-        } catch (InterruptedException e1) {
+            server.received(msg, channel.remoteAddress().toString(), channel);
+        } catch (InterruptedException e) {
             LOG.info("failed to enqueue a request message", e);
             failure_count.incrementAndGet();
         }
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
         try {
-            LOG.error("server errors in handling the request", e.getCause());
+            LOG.error("server errors in handling the request", cause);
         } catch (Throwable err) {
             // Doing nothing (probably due to an oom issue) and hoping Utils.handleUncaughtException will handle it
         }
         try {
-            Utils.handleUncaughtException(e.getCause(), allowedExceptions);
+            Utils.handleUncaughtException(cause, ALLOWED_EXCEPTIONS);
+            ctx.close();
         } catch (Error error) {
             LOG.info("Received error in netty thread.. terminating server...");
             Runtime.getRuntime().exit(1);

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java
index 068843f..6e22fb7 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
@@ -12,28 +13,41 @@
 
 package org.apache.storm.messaging.netty;
 
+import java.util.Map;
 import org.apache.storm.Config;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelPipeline;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelPipelineFactory;
-import org.apache.storm.shade.org.jboss.netty.channel.Channels;
-
-class StormServerPipelineFactory implements ChannelPipelineFactory {
-    private Server server;
-
-    StormServerPipelineFactory(Server server) {
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelInitializer;
+import org.apache.storm.shade.io.netty.channel.ChannelPipeline;
+
+class StormServerPipelineFactory extends ChannelInitializer<Channel> {
+
+    private final KryoValuesSerializer ser;
+    private final KryoValuesDeserializer deser;
+    private final Map<String, Object> topoConf;
+    private final Server server;
+
+    StormServerPipelineFactory(KryoValuesSerializer ser, KryoValuesDeserializer deser,
+        Map<String, Object> topoConf, Server server) {
+        this.ser = ser;
+        this.deser = deser;
+        this.topoConf = topoConf;
         this.server = server;
     }
 
-    public ChannelPipeline getPipeline() throws Exception {
+    @Override
+    protected void initChannel(Channel ch) throws Exception {
         // Create a default pipeline implementation.
-        ChannelPipeline pipeline = Channels.pipeline();
+        ChannelPipeline pipeline = ch.pipeline();
 
         // Decoder
-        pipeline.addLast("decoder", new MessageDecoder(server.deser));
-        // Encoder
-        pipeline.addLast("encoder", new MessageEncoder(server._ser));
+        pipeline.addLast("decoder", new MessageDecoder(deser));
+        // Encoders
+        pipeline.addLast("netty-serializable-encoder", NettySerializableMessageEncoder.INSTANCE);
+        pipeline.addLast("backpressure-encoder", new BackPressureStatusEncoder(ser));
 
-        boolean isNettyAuth = (Boolean) this.server.topoConf
+        boolean isNettyAuth = (Boolean) topoConf
             .get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
         if (isNettyAuth) {
             // Authenticate: Removed after authentication completes
@@ -41,11 +55,9 @@ class StormServerPipelineFactory implements ChannelPipelineFactory {
                 server));
             // Authorize
             pipeline.addLast("authorizeServerHandler",
-                             new SaslStormServerAuthorizeHandler());
+                new SaslStormServerAuthorizeHandler());
         }
         // business logic.
         pipeline.addLast("handler", new StormServerHandler(server));
-
-        return pipeline;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java b/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
index ccaa0f8..c24d8db 100644
--- a/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
+++ b/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
@@ -16,7 +16,6 @@ import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -28,10 +27,14 @@ import org.apache.storm.messaging.netty.ISaslClient;
 import org.apache.storm.messaging.netty.NettyRenameThreadFactory;
 import org.apache.storm.pacemaker.codec.ThriftNettyClientCodec;
 import org.apache.storm.security.auth.ClientAuthUtils;
-import org.apache.storm.shade.org.jboss.netty.bootstrap.ClientBootstrap;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelPipelineFactory;
-import org.apache.storm.shade.org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.apache.storm.shade.io.netty.bootstrap.Bootstrap;
+import org.apache.storm.shade.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelOption;
+import org.apache.storm.shade.io.netty.channel.EventLoopGroup;
+import org.apache.storm.shade.io.netty.channel.WriteBufferWaterMark;
+import org.apache.storm.shade.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.storm.shade.io.netty.channel.socket.nio.NioSocketChannel;
 import org.apache.storm.utils.StormBoundedExponentialBackoffRetry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,7 +43,8 @@ public class PacemakerClient implements ISaslClient {
 
     private static final Logger LOG = LoggerFactory.getLogger(PacemakerClient.class);
     private static Timer timer = new Timer(true);
-    private final ClientBootstrap bootstrap;
+    private final Bootstrap bootstrap;
+    private final EventLoopGroup workerEventLoopGroup;
     private String client_name;
     private String secret;
     private AtomicBoolean ready;
@@ -56,11 +60,6 @@ public class PacemakerClient implements ISaslClient {
     private StormBoundedExponentialBackoffRetry backoff = new StormBoundedExponentialBackoffRetry(100, 5000, 20);
     private int retryTimes = 0;
 
-    //the constructor is invoked by pacemaker-state-factory-test
-    public PacemakerClient() {
-        bootstrap = new ClientBootstrap();
-    }
-
     public PacemakerClient(Map<String, Object> config, String host) {
 
         int port = (int) config.get(Config.PACEMAKER_PORT);
@@ -68,9 +67,9 @@ public class PacemakerClient implements ISaslClient {
         if (client_name == null) {
             client_name = "pacemaker-client";
         }
+        int maxWorkers = (int)config.get(Config.PACEMAKER_CLIENT_MAX_THREADS);
 
         String auth = (String) config.get(Config.PACEMAKER_AUTH_METHOD);
-        ThriftNettyClientCodec.AuthMethod authMethod;
 
         switch (auth) {
 
@@ -100,23 +99,25 @@ public class PacemakerClient implements ISaslClient {
 
         ready = new AtomicBoolean(false);
         shutdown = new AtomicBoolean(false);
-        channelRef = new AtomicReference<Channel>(null);
+        channelRef = new AtomicReference<>(null);
         setupMessaging();
 
-        ThreadFactory bossFactory = new NettyRenameThreadFactory("client-boss");
         ThreadFactory workerFactory = new NettyRenameThreadFactory("client-worker");
-        NioClientSocketChannelFactory factory =
-            new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), Executors.newCachedThreadPool(workerFactory));
-        bootstrap = new ClientBootstrap(factory);
-        bootstrap.setOption("tcpNoDelay", true);
-        bootstrap.setOption("sendBufferSize", 5242880);
-        bootstrap.setOption("keepAlive", true);
+        // 0 means DEFAULT_EVENT_LOOP_THREADS
+        // https://github.com/netty/netty/blob/netty-4.1.24.Final/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java#L40
+        this.workerEventLoopGroup = new NioEventLoopGroup(maxWorkers > 0 ? maxWorkers : 0, workerFactory);
+        int thriftMessageMaxSize = (Integer) config.get(Config.PACEMAKER_THRIFT_MESSAGE_SIZE_MAX);
+        bootstrap = new Bootstrap()
+            .group(workerEventLoopGroup)
+            .channel(NioSocketChannel.class)
+            .option(ChannelOption.TCP_NODELAY, true)
+            .option(ChannelOption.SO_SNDBUF, 5242880)
+            .option(ChannelOption.SO_KEEPALIVE, true)
+            .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024))
+            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+            .handler(new ThriftNettyClientCodec(this, config, authMethod, host, thriftMessageMaxSize));
 
         remote_addr = new InetSocketAddress(host, port);
-        int thriftMessageMaxSize = (Integer) config.get(Config.PACEMAKER_THRIFT_MESSAGE_SIZE_MAX);
-        ChannelPipelineFactory pipelineFactory =
-            new ThriftNettyClientCodec(this, config, authMethod, host, thriftMessageMaxSize).pipelineFactory();
-        bootstrap.setPipelineFactory(pipelineFactory);
         bootstrap.connect(remote_addr);
     }
 
@@ -129,27 +130,16 @@ public class PacemakerClient implements ISaslClient {
     }
 
     @Override
-    public synchronized void channelConnected(Channel channel) {
+    public synchronized void channelReady(Channel channel) {
         Channel oldChannel = channelRef.get();
         if (oldChannel != null) {
             LOG.debug("Closing oldChannel is connected: {}", oldChannel.toString());
             close_channel();
         }
 
-        LOG.debug("Channel is connected: {}", channel.toString());
         channelRef.set(channel);
-
-        //If we're not going to authenticate, we can begin sending.
-        if (authMethod == ThriftNettyClientCodec.AuthMethod.NONE) {
-            ready.set(true);
-            this.notifyAll();
-        }
         retryTimes = 0;
-    }
-
-    @Override
-    public synchronized void channelReady() {
-        LOG.debug("Channel is ready.");
+        LOG.debug("Channel is ready: {}", channel.toString());
         ready.set(true);
         this.notifyAll();
     }
@@ -176,7 +166,7 @@ public class PacemakerClient implements ISaslClient {
                     waitUntilReady();
                     Channel channel = channelRef.get();
                     if (channel != null) {
-                        channel.write(m);
+                        channel.writeAndFlush(m, channel.voidPromise());
                         m.wait(1000);
                     }
                     if (messages[next] != m && messages[next] != null) {
@@ -257,7 +247,7 @@ public class PacemakerClient implements ISaslClient {
 
     public void shutdown() {
         shutdown.set(true);
-        bootstrap.releaseExternalResources();
+        workerEventLoopGroup.shutdownGracefully().awaitUninterruptibly();
     }
 
     private synchronized void close_channel() {

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClientHandler.java b/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClientHandler.java
index 49525db..b81f02d 100644
--- a/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClientHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClientHandler.java
@@ -15,57 +15,49 @@ package org.apache.storm.pacemaker;
 import java.net.ConnectException;
 import org.apache.storm.generated.HBMessage;
 import org.apache.storm.messaging.netty.ControlMessage;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelStateEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.ExceptionEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.MessageEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.channel.ChannelInboundHandlerAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PacemakerClientHandler extends SimpleChannelUpstreamHandler {
+public class PacemakerClientHandler extends ChannelInboundHandlerAdapter {
     private static final Logger LOG = LoggerFactory.getLogger(PacemakerClientHandler.class);
 
-    private PacemakerClient client;
+    private final PacemakerClient client;
 
     public PacemakerClientHandler(PacemakerClient client) {
         this.client = client;
     }
 
     @Override
-    public void channelConnected(ChannelHandlerContext ctx,
-                                 ChannelStateEvent event) {
+    public void channelActive(ChannelHandlerContext ctx) {
         // register the newly established channel
-        Channel channel = ctx.getChannel();
-        client.channelConnected(channel);
-
+        Channel channel = ctx.channel();
         LOG.info("Connection established from {} to {}",
-                 channel.getLocalAddress(), channel.getRemoteAddress());
+                 channel.localAddress(), channel.remoteAddress());
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) {
-        LOG.debug("Got Message: {}", event.getMessage().toString());
-        Object evm = event.getMessage();
+    public void channelRead(ChannelHandlerContext ctx, Object message) {
+        LOG.debug("Got Message: {}", message.toString());
 
-        if (evm instanceof ControlMessage) {
-            LOG.debug("Got control message: {}", evm.toString());
+        if (message instanceof ControlMessage) {
+            LOG.debug("Got control message: {}", message.toString());
             return;
-        } else if (evm instanceof HBMessage) {
-            client.gotMessage((HBMessage) evm);
+        } else if (message instanceof HBMessage) {
+            client.gotMessage((HBMessage) message);
         } else {
-            LOG.warn("Got unexpected message: {} from server.", evm);
+            LOG.warn("Got unexpected message: {} from server.", message);
         }
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
-        Throwable t = event.getCause();
-        if (t instanceof ConnectException) {
-            LOG.warn("Connection to pacemaker failed. Trying to reconnect {}", t.getMessage());
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        if (cause instanceof ConnectException) {
+            LOG.warn("Connection to pacemaker failed. Trying to reconnect {}", cause.getMessage());
         } else {
-            LOG.error("Exception occurred in Pacemaker.", t);
+            LOG.error("Exception occurred in Pacemaker.", cause);
         }
         client.reconnect();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java b/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java
index 2d9d5c9..c88ecf2 100644
--- a/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java
+++ b/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java
@@ -13,17 +13,17 @@
 package org.apache.storm.pacemaker.codec;
 
 import java.io.IOException;
+import java.util.List;
 import org.apache.storm.generated.HBMessage;
 import org.apache.storm.generated.HBServerMessageType;
 import org.apache.storm.messaging.netty.ControlMessage;
 import org.apache.storm.messaging.netty.SaslMessageToken;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBuffer;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.handler.codec.frame.FrameDecoder;
+import org.apache.storm.shade.io.netty.buffer.ByteBuf;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.handler.codec.ByteToMessageDecoder;
 import org.apache.storm.utils.Utils;
 
-public class ThriftDecoder extends FrameDecoder {
+public class ThriftDecoder extends ByteToMessageDecoder {
 
     private static final int INTEGER_SIZE = 4;
 
@@ -40,11 +40,10 @@ public class ThriftDecoder extends FrameDecoder {
     }
 
     @Override
-    protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
-
+    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf buf, List<Object> out) throws Exception {
         long available = buf.readableBytes();
         if (available < INTEGER_SIZE) {
-            return null;
+            return;
         }
 
         buf.markReaderIndex();
@@ -61,7 +60,7 @@ public class ThriftDecoder extends FrameDecoder {
         if (available < thriftLen) {
             // We haven't received the entire object yet, return and wait for more bytes.
             buf.resetReaderIndex();
-            return null;
+            return;
         }
 
         byte serialized[] = new byte[thriftLen];
@@ -70,12 +69,12 @@ public class ThriftDecoder extends FrameDecoder {
 
         if (m.get_type() == HBServerMessageType.CONTROL_MESSAGE) {
             ControlMessage cm = ControlMessage.read(m.get_data().get_message_blob());
-            return cm;
+            out.add(cm);
         } else if (m.get_type() == HBServerMessageType.SASL_MESSAGE_TOKEN) {
             SaslMessageToken sm = SaslMessageToken.read(m.get_data().get_message_blob());
-            return sm;
+            out.add(sm);
         } else {
-            return m;
+            out.add(m);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftEncoder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftEncoder.java b/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftEncoder.java
index d726159..9dffd04 100644
--- a/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftEncoder.java
+++ b/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftEncoder.java
@@ -12,59 +12,56 @@
 
 package org.apache.storm.pacemaker.codec;
 
-import java.io.IOException;
+import java.util.List;
 import org.apache.storm.generated.HBMessage;
 import org.apache.storm.generated.HBMessageData;
 import org.apache.storm.generated.HBServerMessageType;
 import org.apache.storm.messaging.netty.ControlMessage;
 import org.apache.storm.messaging.netty.INettySerializable;
 import org.apache.storm.messaging.netty.SaslMessageToken;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBuffer;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBuffers;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+import org.apache.storm.shade.io.netty.buffer.ByteBuf;
+import org.apache.storm.shade.io.netty.buffer.ByteBufAllocator;
+import org.apache.storm.shade.io.netty.buffer.Unpooled;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.handler.codec.MessageToMessageEncoder;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ThriftEncoder extends OneToOneEncoder {
+public class ThriftEncoder extends MessageToMessageEncoder<Object> {
 
     private static final Logger LOG = LoggerFactory
         .getLogger(ThriftEncoder.class);
 
-    private HBMessage encodeNettySerializable(INettySerializable netty_message,
-                                              HBServerMessageType mType) {
+    private HBMessage encodeNettySerializable(ByteBufAllocator alloc,
+        INettySerializable netty_message, HBServerMessageType mType) {
 
         HBMessageData message_data = new HBMessageData();
         HBMessage m = new HBMessage();
+        byte[] messageBuffer = new byte[netty_message.encodeLength()];
+        ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(messageBuffer);
         try {
-            ChannelBuffer cbuffer = netty_message.buffer();
-            if (cbuffer.hasArray()) {
-                message_data.set_message_blob(cbuffer.array());
-            } else {
-                byte buff[] = new byte[netty_message.encodeLength()];
-                cbuffer.readBytes(buff, 0, netty_message.encodeLength());
-                message_data.set_message_blob(buff);
-            }
+            netty_message.write(wrappedBuffer);
+            
+            message_data.set_message_blob(messageBuffer);
             m.set_type(mType);
             m.set_data(message_data);
             return m;
-        } catch (IOException e) {
-            LOG.error("Failed to encode NettySerializable: ", e);
-            throw new RuntimeException(e);
+        } finally {
+            wrappedBuffer.release();
         }
     }
 
     @Override
-    protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) {
+    protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, List<Object> out) throws Exception {
         if (msg == null) {
-            return null;
+            return;
         }
 
         LOG.debug("Trying to encode: " + msg.getClass().toString() + " : " + msg.toString());
 
         HBMessage m;
+        ByteBufAllocator alloc = channelHandlerContext.alloc();
         if (msg instanceof INettySerializable) {
             INettySerializable nettyMsg = (INettySerializable) msg;
 
@@ -77,19 +74,19 @@ public class ThriftEncoder extends OneToOneEncoder {
                 LOG.error("Didn't recognise INettySerializable: " + nettyMsg.toString());
                 throw new RuntimeException("Unrecognized INettySerializable.");
             }
-            m = encodeNettySerializable(nettyMsg, type);
+            m = encodeNettySerializable(alloc, nettyMsg, type);
         } else {
             m = (HBMessage) msg;
         }
 
         try {
             byte serialized[] = Utils.thriftSerialize(m);
-            ChannelBuffer ret = ChannelBuffers.directBuffer(serialized.length + 4);
+            ByteBuf ret = alloc.ioBuffer(serialized.length + 4);
 
             ret.writeInt(serialized.length);
             ret.writeBytes(serialized);
 
-            return ret;
+            out.add(ret);
         } catch (RuntimeException e) {
             LOG.error("Failed to serialize.", e);
             throw e;

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyClientCodec.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyClientCodec.java b/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyClientCodec.java
index 853ce74..00e1bc9 100644
--- a/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyClientCodec.java
+++ b/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyClientCodec.java
@@ -19,25 +19,24 @@ import org.apache.storm.messaging.netty.SaslStormClientHandler;
 import org.apache.storm.pacemaker.PacemakerClient;
 import org.apache.storm.pacemaker.PacemakerClientHandler;
 import org.apache.storm.security.auth.ClientAuthUtils;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelPipeline;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelPipelineFactory;
-import org.apache.storm.shade.org.jboss.netty.channel.Channels;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelInitializer;
+import org.apache.storm.shade.io.netty.channel.ChannelPipeline;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ThriftNettyClientCodec {
+public class ThriftNettyClientCodec extends ChannelInitializer<Channel> {
 
     public static final String SASL_HANDLER = "sasl-handler";
     public static final String KERBEROS_HANDLER = "kerberos-handler";
     private static final Logger LOG = LoggerFactory
         .getLogger(ThriftNettyClientCodec.class);
 
-    ;
     private final int thriftMessageMaxSize;
-    private PacemakerClient client;
-    private AuthMethod authMethod;
-    private Map<String, Object> topoConf;
-    private String host;
+    private final PacemakerClient client;
+    private final AuthMethod authMethod;
+    private final Map<String, Object> topoConf;
+    private final String host;
 
     public ThriftNettyClientCodec(PacemakerClient pacemaker_client, Map<String, Object> topoConf,
                                   AuthMethod authMethod, String host, int thriftMessageMaxSizeBytes) {
@@ -48,10 +47,9 @@ public class ThriftNettyClientCodec {
         thriftMessageMaxSize = thriftMessageMaxSizeBytes;
     }
 
-    public ChannelPipelineFactory pipelineFactory() {
-        return new ChannelPipelineFactory() {
-            public ChannelPipeline getPipeline() {
-                ChannelPipeline pipeline = Channels.pipeline();
+    @Override
+    protected void initChannel(Channel ch) throws Exception {
+        ChannelPipeline pipeline = ch.pipeline();
                 pipeline.addLast("encoder", new ThriftEncoder());
                 pipeline.addLast("decoder", new ThriftDecoder(thriftMessageMaxSize));
 
@@ -74,13 +72,10 @@ public class ThriftNettyClientCodec {
                         throw new RuntimeException(e);
                     }
                 } else {
-                    client.channelReady();
+            client.channelReady(ch);
                 }
 
                 pipeline.addLast("PacemakerClientHandler", new PacemakerClientHandler(client));
-                return pipeline;
-            }
-        };
     }
 
     public enum AuthMethod {

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java b/storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java
index 3d41766..540d691 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
@@ -17,6 +18,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.stream.Stream;
 import org.apache.storm.generated.NodeInfo;
 import org.apache.storm.messaging.IConnection;
 import org.apache.storm.messaging.TaskMessage;
@@ -26,7 +28,7 @@ import org.slf4j.LoggerFactory;
 public class TransferDrainer {
 
     private static final Logger LOG = LoggerFactory.getLogger(TransferDrainer.class);
-    private Map<Integer, ArrayList<TaskMessage>> bundles = new HashMap();
+    private final Map<Integer, ArrayList<TaskMessage>> bundles = new HashMap<>();
 
     // Cache the msgs grouped by destination node
     public void add(TaskMessage taskMsg) {
@@ -40,26 +42,24 @@ public class TransferDrainer {
     }
 
     public void send(Map<Integer, NodeInfo> taskToNode, Map<NodeInfo, IConnection> connections) {
-        HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> bundleMapByDestination = groupBundleByDestination(taskToNode);
+        HashMap<NodeInfo, Stream<TaskMessage>> bundleMapByDestination = groupBundleByDestination(taskToNode);
 
-        for (Map.Entry<NodeInfo, ArrayList<ArrayList<TaskMessage>>> entry : bundleMapByDestination.entrySet()) {
+        for (Map.Entry<NodeInfo, Stream<TaskMessage>> entry : bundleMapByDestination.entrySet()) {
             NodeInfo node = entry.getKey();
             IConnection conn = connections.get(node);
             if (conn != null) {
-                ArrayList<ArrayList<TaskMessage>> bundle = entry.getValue();
-                Iterator<TaskMessage> iter = getBundleIterator(bundle);
-                if (null != iter && iter.hasNext()) {
+                Iterator<TaskMessage> iter = entry.getValue().iterator();
+                if (iter.hasNext()) {
                     conn.send(iter);
                 }
-                entry.getValue().clear();
             } else {
                 LOG.warn("Connection not available for hostPort {}", node);
             }
         }
     }
 
-    private HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> groupBundleByDestination(Map<Integer, NodeInfo> taskToNode) {
-        HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> result = new HashMap<>();
+    private HashMap<NodeInfo, Stream<TaskMessage>> groupBundleByDestination(Map<Integer, NodeInfo> taskToNode) {
+        HashMap<NodeInfo, Stream<TaskMessage>> result = new HashMap<>();
 
         for (Entry<Integer, ArrayList<TaskMessage>> entry : bundles.entrySet()) {
             if (entry.getValue().isEmpty()) {
@@ -67,12 +67,7 @@ public class TransferDrainer {
             }
             NodeInfo node = taskToNode.get(entry.getKey());
             if (node != null) {
-                ArrayList<ArrayList<TaskMessage>> msgs = result.get(node);
-                if (msgs == null) {
-                    msgs = new ArrayList<>();
-                    result.put(node, msgs);
-                }
-                msgs.add(entry.getValue());
+                result.merge(node, entry.getValue().stream(), Stream::concat);
             } else {
                 LOG.warn("No remote destination available for task {}", entry.getKey());
             }
@@ -80,54 +75,6 @@ public class TransferDrainer {
         return result;
     }
 
-    private Iterator<TaskMessage> getBundleIterator(final ArrayList<ArrayList<TaskMessage>> bundle) {
-
-        if (null == bundle) {
-            return null;
-        }
-
-        return new Iterator<TaskMessage>() {
-
-            private int offset = 0;
-            private int size = 0;
-            private int bundleOffset = 0;
-            private Iterator<TaskMessage> iter = bundle.get(bundleOffset).iterator();
-
-            {
-                for (ArrayList<TaskMessage> list : bundle) {
-                    size += list.size();
-                }
-            }
-
-            @Override
-            public boolean hasNext() {
-                return offset < size;
-            }
-
-            @Override
-            public TaskMessage next() {
-                TaskMessage msg;
-                if (iter.hasNext()) {
-                    msg = iter.next();
-                } else {
-                    bundleOffset++;
-                    iter = bundle.get(bundleOffset).iterator();
-                    msg = iter.next();
-                }
-                if (null != msg) {
-                    offset++;
-                }
-                return msg;
-            }
-
-            @Override
-            public void remove() {
-                throw new RuntimeException("not supported");
-            }
-        };
-    }
-
-
     public void clear() {
         for (ArrayList<TaskMessage> taskMessages : bundles.values()) {
             taskMessages.clear();

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/test/jvm/org/apache/storm/PaceMakerStateStorageFactoryTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/PaceMakerStateStorageFactoryTest.java b/storm-client/test/jvm/org/apache/storm/PaceMakerStateStorageFactoryTest.java
index f4031f1..39db4ef 100644
--- a/storm-client/test/jvm/org/apache/storm/PaceMakerStateStorageFactoryTest.java
+++ b/storm-client/test/jvm/org/apache/storm/PaceMakerStateStorageFactoryTest.java
@@ -18,6 +18,10 @@
 
 package org.apache.storm;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -30,18 +34,30 @@ import org.apache.storm.generated.HBPulse;
 import org.apache.storm.generated.HBServerMessageType;
 import org.apache.storm.pacemaker.PacemakerClient;
 import org.apache.storm.pacemaker.PacemakerClientPool;
+import org.apache.storm.pacemaker.PacemakerConnectionException;
 import org.apache.storm.utils.Utils;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
 
+@RunWith(MockitoJUnitRunner.class)
 public class PaceMakerStateStorageFactoryTest {
-    private PaceMakerClientProxy clientProxy;
+    
+    @Captor
+    private ArgumentCaptor<HBMessage> hbMessageCaptor;
+    
+    @Mock
+    private PacemakerClient clientMock;
     private PacemakerClientPoolProxy clientPoolProxy;
     private PaceMakerStateStorage stateStorage;
 
     public void createPaceMakerStateStorage(HBServerMessageType messageType, HBMessageData messageData) throws Exception {
         HBMessage response = new HBMessage(messageType, messageData);
-        clientProxy = new PaceMakerClientProxy(response, null);
+        when(clientMock.send(any())).thenReturn(response);
         clientPoolProxy = new PacemakerClientPoolProxy();
         stateStorage = new PaceMakerStateStorage(clientPoolProxy, null);
     }
@@ -50,7 +66,8 @@ public class PaceMakerStateStorageFactoryTest {
     public void testSetWorkerHb() throws Exception {
         createPaceMakerStateStorage(HBServerMessageType.SEND_PULSE_RESPONSE, null);
         stateStorage.set_worker_hb("/foo", Utils.javaSerialize("data"), null);
-        HBMessage sent = clientProxy.checkCaptured();
+        verify(clientMock).send(hbMessageCaptor.capture());
+        HBMessage sent = hbMessageCaptor.getValue();
         HBPulse pulse = sent.get_data().get_pulse();
         Assert.assertEquals(HBServerMessageType.SEND_PULSE, sent.get_type());
         Assert.assertEquals("/foo", pulse.get_id());
@@ -67,7 +84,8 @@ public class PaceMakerStateStorageFactoryTest {
     public void testDeleteWorkerHb() throws Exception {
         createPaceMakerStateStorage(HBServerMessageType.DELETE_PATH_RESPONSE, null);
         stateStorage.delete_worker_hb("/foo/bar");
-        HBMessage sent = clientProxy.checkCaptured();
+        verify(clientMock).send(hbMessageCaptor.capture());
+        HBMessage sent = hbMessageCaptor.getValue();
         Assert.assertEquals(HBServerMessageType.DELETE_PATH, sent.get_type());
         Assert.assertEquals("/foo/bar", sent.get_data().get_path());
     }
@@ -86,7 +104,8 @@ public class PaceMakerStateStorageFactoryTest {
         hbPulse.set_details(Utils.serialize(cwh));
         createPaceMakerStateStorage(HBServerMessageType.GET_PULSE_RESPONSE, HBMessageData.pulse(hbPulse));
         stateStorage.get_worker_hb("/foo", false);
-        HBMessage sent = clientProxy.checkCaptured();
+        verify(clientMock).send(hbMessageCaptor.capture());
+        HBMessage sent = hbMessageCaptor.getValue();
         Assert.assertEquals(HBServerMessageType.GET_PULSE, sent.get_type());
         Assert.assertEquals("/foo", sent.get_data().get_path());
     }
@@ -107,7 +126,8 @@ public class PaceMakerStateStorageFactoryTest {
     public void testGetWorkerHbChildren() throws Exception {
         createPaceMakerStateStorage(HBServerMessageType.GET_ALL_NODES_FOR_PATH_RESPONSE, HBMessageData.nodes(new HBNodes()));
         stateStorage.get_worker_hb_children("/foo", false);
-        HBMessage sent = clientProxy.checkCaptured();
+        verify(clientMock).send(hbMessageCaptor.capture());
+        HBMessage sent = hbMessageCaptor.getValue();
         Assert.assertEquals(HBServerMessageType.GET_ALL_NODES_FOR_PATH, sent.get_type());
         Assert.assertEquals("/foo", sent.get_data().get_path());
     }
@@ -118,42 +138,22 @@ public class PaceMakerStateStorageFactoryTest {
         stateStorage.get_worker_hb_children("/foo", false);
     }
 
-    private class PaceMakerClientProxy extends PacemakerClient {
-        private HBMessage response;
-        private HBMessage captured;
-
-        public PaceMakerClientProxy(HBMessage response, HBMessage captured) {
-            this.response = response;
-            this.captured = captured;
-        }
-
-        @Override
-        public HBMessage send(HBMessage m) {
-            captured = m;
-            return response;
-        }
-
-        public HBMessage checkCaptured() {
-            return captured;
-        }
-    }
-
     private class PacemakerClientPoolProxy extends PacemakerClientPool {
         public PacemakerClientPoolProxy() {
             super(new HashMap<>());
         }
 
         public PacemakerClient getWriteClient() {
-            return clientProxy;
+            return clientMock;
         }
 
-        public HBMessage send(HBMessage m) {
-            return clientProxy.send(m);
+        public HBMessage send(HBMessage m) throws PacemakerConnectionException, InterruptedException {
+            return clientMock.send(m);
         }
 
-        public List<HBMessage> sendAll(HBMessage m) {
+        public List<HBMessage> sendAll(HBMessage m) throws PacemakerConnectionException, InterruptedException {
             List<HBMessage> response = new ArrayList<>();
-            response.add(clientProxy.send(m));
+            response.add(clientMock.send(m));
             return response;
         }
     }


[2/5] storm git commit: STORM-1038: Upgrade to latest Netty

Posted by ka...@apache.org.
http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 47fd498..277f252 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -50,10 +50,10 @@
         </dependency>
 
         <!--Hadoop Mini Cluster cannot use log4j2 bridge,
-            Surefire has a way to exclude the conflicting log4j API jar
-            from the classpath, classpathDependencyExcludes, but it didn't work in practice.
-            This is here as a work around to place it at the beginning of the classpath
-            even though maven does not officially support ordering of the classpath.-->
+        Surefire has a way to exclude the conflicting log4j API jar
+        from the classpath, classpathDependencyExcludes, but it didn't work in practice.
+        This is here as a work around to place it at the beginning of the classpath
+        even though maven does not officially support ordering of the classpath.-->
         <dependency>
             <groupId>log4j</groupId>
             <artifactId>log4j</artifactId>
@@ -280,17 +280,17 @@
             <resource>
                 <directory>${basedir}/src/resources</directory>
                 <excludes>
-                  <exclude>storm-core-version-info.properties</exclude>
+                    <exclude>storm-core-version-info.properties</exclude>
                 </excludes>
                 <filtering>false</filtering>
-           </resource>
-           <resource>
-              <directory>${basedir}/src/resources</directory>
-              <includes>
-                <include>storm-core-version-info.properties</include>
-             </includes>
-             <filtering>true</filtering>
-         </resource>
+            </resource>
+            <resource>
+                <directory>${basedir}/src/resources</directory>
+                <includes>
+                    <include>storm-core-version-info.properties</include>
+                </includes>
+                <filtering>true</filtering>
+            </resource>
         </resources>
         <testResources>
             <testResource>
@@ -371,29 +371,29 @@
                 </configuration>
             </plugin>
             <plugin>
-               <groupId>org.apache.storm</groupId>
-               <artifactId>storm-maven-plugins</artifactId>
-               <version>${project.version}</version>
-               <executions>
-                 <execution>
-                   <id>version-info</id>
-                   <phase>generate-resources</phase>
-                   <goals>
-                     <goal>version-info</goal>
-                   </goals>
-                   <configuration>
-                   <source>
-                      <directory>${basedir}/src/</directory>
-                      <includes>
-                        <include>jvm/**/*.java</include>
-                        <include>clj/**/*.clj</include>
-                      </includes>
-                  </source>
-                 </configuration>
-               </execution>
-             </executions>
-          </plugin>
-          <plugin>
+                <groupId>org.apache.storm</groupId>
+                <artifactId>storm-maven-plugins</artifactId>
+                <version>${project.version}</version>
+                <executions>
+                    <execution>
+                        <id>version-info</id>
+                        <phase>generate-resources</phase>
+                        <goals>
+                            <goal>version-info</goal>
+                        </goals>
+                        <configuration>
+                            <source>
+                                <directory>${basedir}/src/</directory>
+                                <includes>
+                                    <include>jvm/**/*.java</include>
+                                    <include>clj/**/*.clj</include>
+                                </includes>
+                            </source>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
                 <artifactId>maven-dependency-plugin</artifactId>
                 <version>2.8</version>
                 <executions>
@@ -411,33 +411,33 @@
                         </configuration>
                     </execution>
                     <!-- multi-lang resources -->
-                               <execution>
-                                 <id>unpack</id>
-                                 <phase>process-test-resources</phase>
-                                 <goals>
-                                   <goal>unpack</goal>
-                                 </goals>
-                                 <configuration>
-                                   <artifactItems>
-                                     <artifactItem>
-                                       <groupId>org.apache.storm</groupId>
-                                       <artifactId>multilang-ruby</artifactId>
-                                       <version>${project.version}</version>
-                                     </artifactItem>
-                                     <artifactItem>
-                                        <groupId>org.apache.storm</groupId>
-                                        <artifactId>multilang-python</artifactId>
-                                        <version>${project.version}</version>
-                                      </artifactItem>
-                                      <artifactItem>
-                                         <groupId>org.apache.storm</groupId>
-                                         <artifactId>multilang-javascript</artifactId>
-                                         <version>${project.version}</version>
-                                       </artifactItem>
-                                   </artifactItems>
-                                   <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
-                                 </configuration>
-                               </execution>
+                    <execution>
+                        <id>unpack</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>unpack</goal>
+                        </goals>
+                        <configuration>
+                            <artifactItems>
+                                <artifactItem>
+                                    <groupId>org.apache.storm</groupId>
+                                    <artifactId>multilang-ruby</artifactId>
+                                    <version>${project.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.apache.storm</groupId>
+                                    <artifactId>multilang-python</artifactId>
+                                    <version>${project.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.apache.storm</groupId>
+                                    <artifactId>multilang-javascript</artifactId>
+                                    <version>${project.version}</version>
+                                </artifactItem>
+                            </artifactItems>
+                            <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+                        </configuration>
+                    </execution>
 
                 </executions>
             </plugin>
@@ -473,7 +473,9 @@
     <profiles>
         <profile>
             <id>coverage</id>
-            <activation><activeByDefault>true</activeByDefault></activation>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
             <build>
                 <plugins>
                     <plugin>
@@ -572,7 +574,9 @@
                         <executions>
                             <execution>
                                 <phase>generate-sources</phase>
-                                <goals><goal>exec</goal></goals>
+                                <goals>
+                                    <goal>exec</goal>
+                                </goals>
                                 <configuration>
                                     <executable>sh</executable>
                                     <arguments>

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java b/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java
index dffbfc0..702f58e 100644
--- a/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java
+++ b/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertThat;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -101,6 +102,10 @@ public class NettyTest {
             sleep());
     }
 
+    private void send(IConnection client, int taskId, byte[] messageBytes) {
+        client.send(Collections.singleton(new TaskMessage(taskId, messageBytes)).iterator());
+    }
+
     private void doTestBasic(Map<String, Object> stormConf) throws Exception {
         LOG.info("1. Should send and receive a basic message");
         String reqMessage = "0123456789abcdefghijklmnopqrstuvwxyz";
@@ -113,7 +118,7 @@ public class NettyTest {
                 waitUntilReady(client, server);
                 byte[] messageBytes = reqMessage.getBytes(StandardCharsets.UTF_8);
 
-                client.send(taskId, messageBytes);
+                send(client, taskId, messageBytes);
 
                 waitForNotNull(response);
                 TaskMessage responseMessage = response.get();
@@ -163,7 +168,7 @@ public class NettyTest {
     public void testBasicWithSasl() throws Exception {
         doTestBasic(withSaslConf(basicConf()));
     }
-
+    
     private void doTestLoad(Map<String, Object> stormConf) throws Exception {
         LOG.info("2 test load");
         String reqMessage = "0123456789abcdefghijklmnopqrstuvwxyz";
@@ -176,7 +181,19 @@ public class NettyTest {
                 waitUntilReady(client, server);
                 byte[] messageBytes = reqMessage.getBytes(StandardCharsets.UTF_8);
 
-                client.send(taskId, messageBytes);
+                send(client, taskId, messageBytes);
+                /*
+                 * This test sends a broadcast to all connected clients from the server, so we need to wait until the server has registered
+                 * the client as connected before sending load metrics.
+                 *
+                 * It's not enough to wait until the client reports that the channel is open, because the server event loop may not have
+                 * finished running channelActive for the new channel. If we send metrics too early, the server will broadcast to no one.
+                 *
+                 * By waiting for the response here, we ensure that the client will be registered at the server before we send load metrics.
+                 */
+
+                waitForNotNull(response);
+
                 Map<Integer, Double> taskToLoad = new HashMap<>();
                 taskToLoad.put(1, 0.0);
                 taskToLoad.put(2, 1.0);
@@ -191,10 +208,6 @@ public class NettyTest {
                 Map<Integer, Load> load = client.getLoad(tasks);
                 assertThat(load.get(1).getBoltLoad(), is(0.0));
                 assertThat(load.get(2).getBoltLoad(), is(1.0));
-                waitForNotNull(response);
-                TaskMessage responseMessage = response.get();
-                assertThat(responseMessage.task(), is(taskId));
-                assertThat(responseMessage.message(), is(messageBytes));
             }
         } finally {
             context.term();
@@ -223,7 +236,7 @@ public class NettyTest {
                 waitUntilReady(client, server);
                 byte[] messageBytes = reqMessage.getBytes(StandardCharsets.UTF_8);
 
-                client.send(taskId, messageBytes);
+                send(client, taskId, messageBytes);
 
                 waitForNotNull(response);
                 TaskMessage responseMessage = response.get();
@@ -240,7 +253,7 @@ public class NettyTest {
         conf.put(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE, 102_400);
         return conf;
     }
-    
+
     @Test
     public void testLargeMessage() throws Exception {
         doTestLargeMessage(largeMessageConf());
@@ -274,7 +287,7 @@ public class NettyTest {
                     serverStart.get(Testing.TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
                     byte[] messageBytes = reqMessage.getBytes(StandardCharsets.UTF_8);
 
-                    client.send(taskId, messageBytes);
+                    send(client, taskId, messageBytes);
 
                     waitForNotNull(response);
                     TaskMessage responseMessage = response.get();
@@ -317,7 +330,7 @@ public class NettyTest {
                 waitUntilReady(client, server);
 
                 IntStream.range(1, numMessages)
-                    .forEach(i -> client.send(taskId, String.valueOf(i).getBytes(StandardCharsets.UTF_8)));
+                    .forEach(i -> send(client, taskId, String.valueOf(i).getBytes(StandardCharsets.UTF_8)));
 
                 Testing.whileTimeout(Testing.TEST_TIMEOUT_MS,
                     () -> responses.size() < numMessages - 1,
@@ -340,7 +353,7 @@ public class NettyTest {
         conf.put(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE, 1_024_000);
         return conf;
     }
-    
+
     @Test
     public void testBatch() throws Exception {
         doTestBatch(batchConf());
@@ -360,11 +373,11 @@ public class NettyTest {
             int port = Utils.getAvailablePort(6700);
             try (IConnection client = context.connect(null, "localhost", port, remoteBpStatus)) {
                 byte[] messageBytes = reqMessage.getBytes(StandardCharsets.UTF_8);
-                client.send(taskId, messageBytes);
+                send(client, taskId, messageBytes);
                 try (IConnection server = context.bind(null, port)) {
                     server.registerRecv(mkConnectionCallback(response::set));
                     waitUntilReady(client, server);
-                    client.send(taskId, messageBytes);
+                    send(client, taskId, messageBytes);
                     waitForNotNull(response);
                     TaskMessage responseMessage = response.get();
                     assertThat(responseMessage.task(), is(taskId));

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-server/pom.xml
----------------------------------------------------------------------
diff --git a/storm-server/pom.xml b/storm-server/pom.xml
index 0d9501a..43d2e19 100644
--- a/storm-server/pom.xml
+++ b/storm-server/pom.xml
@@ -171,7 +171,7 @@
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <!--Note - the version would be inherited-->
                 <configuration>
-                    <maxAllowedViolations>783</maxAllowedViolations>
+                    <maxAllowedViolations>780</maxAllowedViolations>
                 </configuration>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-server/src/main/java/org/apache/storm/pacemaker/IServerMessageHandler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/pacemaker/IServerMessageHandler.java b/storm-server/src/main/java/org/apache/storm/pacemaker/IServerMessageHandler.java
index a7ebc1e..73451e9 100644
--- a/storm-server/src/main/java/org/apache/storm/pacemaker/IServerMessageHandler.java
+++ b/storm-server/src/main/java/org/apache/storm/pacemaker/IServerMessageHandler.java
@@ -16,5 +16,5 @@ import org.apache.storm.generated.HBMessage;
 
 public interface IServerMessageHandler {
 
-    public HBMessage handleMessage(HBMessage m, boolean authenticated);
+    HBMessage handleMessage(HBMessage m, boolean authenticated);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java b/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java
index a75a312..1543c64 100644
--- a/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java
+++ b/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java
@@ -14,8 +14,6 @@ package org.apache.storm.pacemaker;
 
 import java.net.InetSocketAddress;
 import java.util.Map;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import javax.security.auth.login.Configuration;
 import org.apache.storm.Config;
@@ -25,35 +23,41 @@ import org.apache.storm.messaging.netty.ISaslServer;
 import org.apache.storm.messaging.netty.NettyRenameThreadFactory;
 import org.apache.storm.pacemaker.codec.ThriftNettyServerCodec;
 import org.apache.storm.security.auth.ClientAuthUtils;
-import org.apache.storm.shade.org.jboss.netty.bootstrap.ServerBootstrap;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelPipelineFactory;
-import org.apache.storm.shade.org.jboss.netty.channel.group.ChannelGroup;
-import org.apache.storm.shade.org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.apache.storm.shade.org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.apache.storm.shade.io.netty.bootstrap.ServerBootstrap;
+import org.apache.storm.shade.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelFuture;
+import org.apache.storm.shade.io.netty.channel.ChannelOption;
+import org.apache.storm.shade.io.netty.channel.EventLoopGroup;
+import org.apache.storm.shade.io.netty.channel.WriteBufferWaterMark;
+import org.apache.storm.shade.io.netty.channel.group.ChannelGroup;
+import org.apache.storm.shade.io.netty.channel.group.DefaultChannelGroup;
+import org.apache.storm.shade.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.storm.shade.io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.storm.shade.io.netty.util.concurrent.GlobalEventExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 class PacemakerServer implements ISaslServer {
 
-    private static final long FIVE_MB_IN_BYTES = 5 * 1024 * 1024;
+    private static final int FIVE_MB_IN_BYTES = 5 * 1024 * 1024;
 
     private static final Logger LOG = LoggerFactory.getLogger(PacemakerServer.class);
 
-    private final ServerBootstrap bootstrap;
-    private int port;
-    private IServerMessageHandler handler;
+    private final IServerMessageHandler handler;
     private String secret;
-    private String topo_name;
-    private volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
-    private ConcurrentSkipListSet<Channel> authenticated_channels = new ConcurrentSkipListSet<Channel>();
-    private ThriftNettyServerCodec.AuthMethod authMethod;
+    private final String topologyName;
+    private volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server", GlobalEventExecutor.INSTANCE);
+    private final ChannelGroup authenticated_channels = new DefaultChannelGroup("authenticated-pacemaker-channels", GlobalEventExecutor.INSTANCE);
+    private final ThriftNettyServerCodec.AuthMethod authMethod;
+    private final EventLoopGroup bossEventLoopGroup;
+    private final EventLoopGroup workerEventLoopGroup;
 
     public PacemakerServer(IServerMessageHandler handler, Map<String, Object> config) {
         int maxWorkers = (int) config.get(DaemonConfig.PACEMAKER_MAX_THREADS);
-        this.port = (int) config.get(Config.PACEMAKER_PORT);
+        int port = (int) config.get(Config.PACEMAKER_PORT);
         this.handler = handler;
-        this.topo_name = "pacemaker_server";
+        this.topologyName = "pacemaker_server";
 
         String auth = (String) config.get(Config.PACEMAKER_AUTH_METHOD);
         switch (auth) {
@@ -83,48 +87,51 @@ class PacemakerServer implements ISaslServer {
 
         ThreadFactory bossFactory = new NettyRenameThreadFactory("server-boss");
         ThreadFactory workerFactory = new NettyRenameThreadFactory("server-worker");
-        NioServerSocketChannelFactory factory;
-        if (maxWorkers > 0) {
-            factory =
-                new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
-                                                  Executors.newCachedThreadPool(workerFactory),
-                                                  maxWorkers);
-        } else {
-            factory =
-                new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
-                                                  Executors.newCachedThreadPool(workerFactory));
-        }
+        this.bossEventLoopGroup = new NioEventLoopGroup(1, bossFactory);
+        // 0 means DEFAULT_EVENT_LOOP_THREADS
+        // https://github.com/netty/netty/blob/netty-4.1.24.Final/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java#L40
+        this.workerEventLoopGroup = new NioEventLoopGroup(maxWorkers > 0 ? maxWorkers : 0, workerFactory);
+
+        LOG.info("Create Netty Server " + name() + ", buffer_size: " + FIVE_MB_IN_BYTES + ", maxWorkers: " + maxWorkers);
 
-        bootstrap = new ServerBootstrap(factory);
-        bootstrap.setOption("tcpNoDelay", true);
-        bootstrap.setOption("sendBufferSize", FIVE_MB_IN_BYTES);
-        bootstrap.setOption("keepAlive", true);
         int thriftMessageMaxSize = (Integer) config.get(Config.PACEMAKER_THRIFT_MESSAGE_SIZE_MAX);
-        ChannelPipelineFactory pipelineFactory =
-            new ThriftNettyServerCodec(this, config, authMethod, thriftMessageMaxSize)
-                .pipelineFactory();
-        bootstrap.setPipelineFactory(pipelineFactory);
-        Channel channel = bootstrap.bind(new InetSocketAddress(port));
-        allChannels.add(channel);
+        ServerBootstrap bootstrap = new ServerBootstrap()
+            .group(bossEventLoopGroup, workerEventLoopGroup)
+            .channel(NioServerSocketChannel.class)
+            .childOption(ChannelOption.TCP_NODELAY, true)
+            .childOption(ChannelOption.SO_SNDBUF, FIVE_MB_IN_BYTES)
+            .childOption(ChannelOption.SO_KEEPALIVE, true)
+            .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024))
+            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+            .childHandler(new ThriftNettyServerCodec(this, config, authMethod, thriftMessageMaxSize));
+
+        try {
+            ChannelFuture channelFuture = bootstrap.bind(new InetSocketAddress(port)).sync();
+            allChannels.add(channelFuture.channel());
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
         LOG.info("Bound server to port: {}", Integer.toString(port));
     }
 
     /** Implementing IServer. **/
-    public void channelConnected(Channel c) {
+    @Override
+    public void channelActive(Channel c) {
         allChannels.add(c);
     }
 
     public void cleanPipeline(Channel channel) {
         boolean authenticated = authenticated_channels.contains(channel);
         if (!authenticated) {
-            if (channel.getPipeline().get(ThriftNettyServerCodec.SASL_HANDLER) != null) {
-                channel.getPipeline().remove(ThriftNettyServerCodec.SASL_HANDLER);
-            } else if (channel.getPipeline().get(ThriftNettyServerCodec.KERBEROS_HANDLER) != null) {
-                channel.getPipeline().remove(ThriftNettyServerCodec.KERBEROS_HANDLER);
+            if (channel.pipeline().get(ThriftNettyServerCodec.SASL_HANDLER) != null) {
+                channel.pipeline().remove(ThriftNettyServerCodec.SASL_HANDLER);
+            } else if (channel.pipeline().get(ThriftNettyServerCodec.KERBEROS_HANDLER) != null) {
+                channel.pipeline().remove(ThriftNettyServerCodec.KERBEROS_HANDLER);
             }
         }
     }
 
+    @Override
     public void received(Object mesg, String remote, Channel channel) throws InterruptedException {
         cleanPipeline(channel);
 
@@ -135,26 +142,23 @@ class PacemakerServer implements ISaslServer {
         HBMessage response = handler.handleMessage(m, authenticated);
         if (response != null) {
             LOG.debug("Got Response from handler: {}", response);
-            channel.write(response);
+            channel.writeAndFlush(response, channel.voidPromise());
         } else {
             LOG.info("Got null response from handler handling message: {}", m);
         }
     }
 
-    public void closeChannel(Channel c) {
-        c.close().awaitUninterruptibly();
-        allChannels.remove(c);
-        authenticated_channels.remove(c);
-    }
-
+    @Override
     public String name() {
-        return topo_name;
+        return topologyName;
     }
 
+    @Override
     public String secretKey() {
         return secret;
     }
 
+    @Override
     public void authenticated(Channel c) {
         LOG.debug("Pacemaker server authenticated channel: {}", c.toString());
         authenticated_channels.add(c);

http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-server/src/main/java/org/apache/storm/pacemaker/codec/ThriftNettyServerCodec.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/pacemaker/codec/ThriftNettyServerCodec.java b/storm-server/src/main/java/org/apache/storm/pacemaker/codec/ThriftNettyServerCodec.java
index 32ca92c..45babf9 100644
--- a/storm-server/src/main/java/org/apache/storm/pacemaker/codec/ThriftNettyServerCodec.java
+++ b/storm-server/src/main/java/org/apache/storm/pacemaker/codec/ThriftNettyServerCodec.java
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
@@ -22,13 +23,13 @@ import org.apache.storm.messaging.netty.KerberosSaslServerHandler;
 import org.apache.storm.messaging.netty.SaslStormServerHandler;
 import org.apache.storm.messaging.netty.StormServerHandler;
 import org.apache.storm.security.auth.ClientAuthUtils;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelPipeline;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelPipelineFactory;
-import org.apache.storm.shade.org.jboss.netty.channel.Channels;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelInitializer;
+import org.apache.storm.shade.io.netty.channel.ChannelPipeline;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ThriftNettyServerCodec {
+public class ThriftNettyServerCodec extends ChannelInitializer<Channel> {
 
     public static final String SASL_HANDLER = "sasl-handler";
     public static final String KERBEROS_HANDLER = "kerberos-handler";
@@ -36,26 +37,24 @@ public class ThriftNettyServerCodec {
         .getLogger(ThriftNettyServerCodec.class);
 
     ;
-    private final int thriftMessageMaxSize;
-    private IServer server;
-    private AuthMethod authMethod;
-    private Map<String, Object> topoConf;
+    private final int thriftMessageMaxSizeBytes;
+    private final IServer server;
+    private final AuthMethod authMethod;
+    private final Map<String, Object> topoConf;
 
     public ThriftNettyServerCodec(IServer server, Map<String, Object> topoConf,
                                   AuthMethod authMethod, int thriftMessageMaxSizeBytes) {
         this.server = server;
         this.authMethod = authMethod;
         this.topoConf = topoConf;
-        thriftMessageMaxSize = thriftMessageMaxSizeBytes;
+        this.thriftMessageMaxSizeBytes = thriftMessageMaxSizeBytes;
     }
 
-    public ChannelPipelineFactory pipelineFactory() {
-        return new ChannelPipelineFactory() {
-            public ChannelPipeline getPipeline() {
-
-                ChannelPipeline pipeline = Channels.pipeline();
+    @Override
+    protected void initChannel(Channel ch) throws Exception {
+        ChannelPipeline pipeline = ch.pipeline();
                 pipeline.addLast("encoder", new ThriftEncoder());
-                pipeline.addLast("decoder", new ThriftDecoder(thriftMessageMaxSize));
+        pipeline.addLast("decoder", new ThriftDecoder(thriftMessageMaxSizeBytes));
                 if (authMethod == AuthMethod.DIGEST) {
                     try {
                         LOG.debug("Adding SaslStormServerHandler to pacemaker server pipeline.");
@@ -66,7 +65,7 @@ public class ThriftNettyServerCodec {
                 } else if (authMethod == AuthMethod.KERBEROS) {
                     try {
                         LOG.debug("Adding KerberosSaslServerHandler to pacemaker server pipeline.");
-                        ArrayList<String> authorizedUsers = new ArrayList(1);
+                ArrayList<String> authorizedUsers = new ArrayList<>(1);
                         authorizedUsers.add((String) topoConf.get(DaemonConfig.NIMBUS_DAEMON_USER));
                         pipeline.addLast(KERBEROS_HANDLER, new KerberosSaslServerHandler((ISaslServer) server,
                                                                                          topoConf,
@@ -80,9 +79,6 @@ public class ThriftNettyServerCodec {
                 }
 
                 pipeline.addLast("handler", new StormServerHandler(server));
-                return pipeline;
-            }
-        };
     }
 
     public enum AuthMethod {


[5/5] storm git commit: Merge branch 'STORM-1038-merge'

Posted by ka...@apache.org.
Merge branch 'STORM-1038-merge'

This closes #2707


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4aa6b5e1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4aa6b5e1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4aa6b5e1

Branch: refs/heads/master
Commit: 4aa6b5e18cf2dcefb509f965cd8a61e31389d6aa
Parents: c24cf97 e230828
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Jun 9 20:37:03 2018 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Jun 9 20:37:23 2018 +0900

----------------------------------------------------------------------
 conf/defaults.yaml                              |   1 +
 pom.xml                                         |   8 +-
 shaded-deps/pom.xml                             |   8 +-
 storm-client/pom.xml                            |   2 +-
 .../src/jvm/org/apache/storm/Config.java        |   8 +
 .../org/apache/storm/daemon/worker/Worker.java  |   9 +-
 .../org/apache/storm/messaging/IConnection.java |   8 -
 .../apache/storm/messaging/local/Context.java   |  19 ---
 .../messaging/netty/BackPressureStatus.java     |  11 +-
 .../netty/BackPressureStatusEncoder.java        |  37 +++++
 .../apache/storm/messaging/netty/Client.java    | 158 ++++++++-----------
 .../apache/storm/messaging/netty/Context.java   |  31 ++--
 .../storm/messaging/netty/ControlMessage.java   |  32 ++--
 .../messaging/netty/INettySerializable.java     |   9 +-
 .../storm/messaging/netty/ISaslClient.java      |   6 +-
 .../storm/messaging/netty/ISaslServer.java      |   2 +-
 .../apache/storm/messaging/netty/IServer.java   |   6 +-
 .../netty/KerberosSaslClientHandler.java        | 107 ++++++-------
 .../netty/KerberosSaslNettyClientState.java     |   9 +-
 .../netty/KerberosSaslNettyServerState.java     |   9 +-
 .../netty/KerberosSaslServerHandler.java        |  53 +++----
 .../storm/messaging/netty/MessageBatch.java     |  40 +++--
 .../storm/messaging/netty/MessageDecoder.java   |  53 ++++---
 .../storm/messaging/netty/MessageEncoder.java   |  50 ------
 .../netty/NettyRenameThreadFactory.java         |  18 +--
 .../netty/NettySerializableMessageEncoder.java  |  37 +++++
 .../storm/messaging/netty/SaslMessageToken.java |  32 ++--
 .../messaging/netty/SaslNettyClientState.java   |   9 +-
 .../messaging/netty/SaslNettyServerState.java   |   9 +-
 .../messaging/netty/SaslStormClientHandler.java | 114 ++++++-------
 .../netty/SaslStormServerAuthorizeHandler.java  |  22 +--
 .../messaging/netty/SaslStormServerHandler.java |  62 ++++----
 .../apache/storm/messaging/netty/Server.java    | 145 ++++++++---------
 .../messaging/netty/StormClientHandler.java     |  35 ++--
 .../netty/StormClientPipelineFactory.java       |  22 +--
 .../messaging/netty/StormServerHandler.java     |  45 +++---
 .../netty/StormServerPipelineFactory.java       |  46 ++++--
 .../apache/storm/pacemaker/PacemakerClient.java |  68 ++++----
 .../storm/pacemaker/PacemakerClientHandler.java |  46 +++---
 .../storm/pacemaker/codec/ThriftDecoder.java    |  23 ++-
 .../storm/pacemaker/codec/ThriftEncoder.java    |  47 +++---
 .../pacemaker/codec/ThriftNettyClientCodec.java |  29 ++--
 .../org/apache/storm/utils/TransferDrainer.java |  73 ++-------
 .../storm/PaceMakerStateStorageFactoryTest.java |  62 ++++----
 storm-core/pom.xml                              | 134 ++++++++--------
 .../apache/storm/messaging/netty/NettyTest.java |  41 +++--
 storm-server/pom.xml                            |   2 +-
 .../storm/pacemaker/IServerMessageHandler.java  |   2 +-
 .../apache/storm/pacemaker/PacemakerServer.java | 108 +++++++------
 .../pacemaker/codec/ThriftNettyServerCodec.java |  34 ++--
 50 files changed, 901 insertions(+), 1040 deletions(-)
----------------------------------------------------------------------