You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2021/03/24 10:06:41 UTC

[phoenix-omid] branch master updated: OMID-202 Refactor Omid to use Netty 4

This is an automated email from the ASF dual-hosted git repository.

stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix-omid.git


The following commit(s) were added to refs/heads/master by this push:
     new 18ef57d  OMID-202 Refactor Omid to use Netty 4
18ef57d is described below

commit 18ef57d4ec23d96f0ff7dbebec399d838c6c7566
Author: Istvan Toth <st...@apache.org>
AuthorDate: Fri Mar 12 11:23:00 2021 +0100

    OMID-202 Refactor Omid to use Netty 4
---
 pom.xml                                            |   2 +-
 transaction-client/pom.xml                         |   4 +-
 .../java/org/apache/omid/tso/client/TSOClient.java | 143 +++++++-------
 tso-server/pom.xml                                 |   4 +-
 .../apache/omid/tso/AbstractRequestProcessor.java  |   2 +-
 .../src/main/java/org/apache/omid/tso/Batch.java   |   2 +-
 .../java/org/apache/omid/tso/PersistEvent.java     |   2 +-
 .../org/apache/omid/tso/PersistenceProcessor.java  |   2 +-
 .../apache/omid/tso/PersistenceProcessorImpl.java  |   2 +-
 .../omid/tso/PersitenceProcessorNullImpl.java      |   2 +-
 .../java/org/apache/omid/tso/ReplyProcessor.java   |   2 +-
 .../org/apache/omid/tso/ReplyProcessorImpl.java    |  10 +-
 .../java/org/apache/omid/tso/RequestProcessor.java |   2 +-
 .../apache/omid/tso/RequestProcessorPersistCT.java |   2 +-
 .../apache/omid/tso/RequestProcessorSkipCT.java    |   2 +-
 .../java/org/apache/omid/tso/RetryProcessor.java   |   2 +-
 .../org/apache/omid/tso/RetryProcessorImpl.java    |   2 +-
 .../org/apache/omid/tso/TSOChannelHandler.java     | 211 ++++++++++-----------
 .../org/apache/omid/tso/ProgrammableTSOServer.java | 112 ++++++-----
 .../test/java/org/apache/omid/tso/TestBatch.java   |   2 +-
 .../apache/omid/tso/TestPersistenceProcessor.java  |   2 +-
 .../omid/tso/TestPersistenceProcessorHandler.java  |   2 +-
 .../org/apache/omid/tso/TestReplyProcessor.java    |   2 +-
 .../org/apache/omid/tso/TestRequestProcessor.java  |   2 +-
 .../org/apache/omid/tso/TestRetryProcessor.java    |   2 +-
 .../omid/tso/TestTSOChannelHandlerNetty.java       | 157 ++++++++-------
 .../org/apache/omid/tso/client/TSOClientRaw.java   | 106 ++++++-----
 27 files changed, 402 insertions(+), 383 deletions(-)

diff --git a/pom.xml b/pom.xml
index fea3247..0447d9b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -175,7 +175,7 @@
         <testng.version>6.10</testng.version>
         <slf4j.version>1.7.7</slf4j.version>
         <log4j.version>1.2.17</log4j.version>
-        <netty.version>3.10.6.Final</netty.version>
+        <netty4.version>4.1.60.Final</netty4.version>
         <protobuf.version>2.5.0</protobuf.version>
         <protobuf.plugin.version>0.6.1</protobuf.plugin.version>
         <os.plugin.version>1.6.2</os.plugin.version>
diff --git a/transaction-client/pom.xml b/transaction-client/pom.xml
index ef79107..43989b2 100644
--- a/transaction-client/pom.xml
+++ b/transaction-client/pom.xml
@@ -69,8 +69,8 @@
         </dependency>
         <dependency>
             <groupId>io.netty</groupId>
-            <artifactId>netty</artifactId>
-            <version>${netty.version}</version>
+            <artifactId>netty-all</artifactId>
+            <version>${netty4.version}</version>
         </dependency>
 
         <!-- end distributed comm -->
diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
index 43eaaa6..172d5eb 100644
--- a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
+++ b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
@@ -30,25 +30,26 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.NodeCache;
 import org.apache.curator.framework.recipes.cache.NodeCacheListener;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
-import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
-import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.TimerTask;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,6 +65,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 
@@ -84,8 +86,7 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
     private CuratorFramework zkClient;
     private NodeCache currentTSOZNode;
 
-    private ChannelFactory factory;
-    private ClientBootstrap bootstrap;
+    private Bootstrap bootstrap;
     private Channel currentChannel;
     private final ScheduledExecutorService fsmExecutor;
     StateMachine.Fsm fsm;
@@ -115,17 +116,6 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
     // Avoid instantiation
     private TSOClient(OmidClientConfiguration omidConf) throws IOException {
 
-        // Start client with Nb of active threads = 3 as maximum.
-        int tsoExecutorThreads = omidConf.getExecutorThreads();
-
-        factory = new NioClientSocketChannelFactory(
-                Executors.newCachedThreadPool(
-                        new ThreadFactoryBuilder().setNameFormat("tsoclient-boss-%d").build()),
-                Executors.newCachedThreadPool(
-                        new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build()), tsoExecutorThreads);
-        // Create the bootstrap
-        bootstrap = new ClientBootstrap(factory);
-
         requestTimeoutInMs = omidConf.getRequestTimeoutInMs();
         requestMaxRetries = omidConf.getRequestMaxRetries();
         tsoReconnectionDelayInSecs = omidConf.getReconnectionDelayInSecs();
@@ -160,21 +150,31 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
         fsm = new StateMachine.FsmImpl(fsmExecutor);
         fsm.setInitState(new DisconnectedState(fsm));
 
-        ChannelPipeline pipeline = bootstrap.getPipeline();
-        pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
-        pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
-        pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
-        pipeline.addLast("protobufencoder", new ProtobufEncoder());
-        pipeline.addLast("handler", new Handler(fsm));
-
-        bootstrap.setOption("tcpNoDelay", true);
-        bootstrap.setOption("keepAlive", true);
-        bootstrap.setOption("reuseAddress", true);
-        bootstrap.setOption("connectTimeoutMillis", 100);
-        lowLatency = false;
-
+        // Start client with the configured thread count
+        int tsoExecutorThreads = omidConf.getExecutorThreads();
+        ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build();
+        EventLoopGroup workerGroup = new NioEventLoopGroup(tsoExecutorThreads, workerThreadFactory);
 
+        bootstrap = new Bootstrap();
+        bootstrap.group(workerGroup);
+        bootstrap.channel(NioSocketChannel.class);
+        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
+            @Override
+            public void initChannel(SocketChannel channel) throws Exception {
+                ChannelPipeline pipeline = channel.pipeline();
+                pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
+                pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
+                pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
+                pipeline.addLast("protobufencoder", new ProtobufEncoder());
+                pipeline.addLast("inboundHandler", new Handler(fsm));
+            }
+        });
+        bootstrap.option(ChannelOption.TCP_NODELAY, true);
+        bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+        bootstrap.option(ChannelOption.SO_REUSEADDR, true);
+        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 100);
 
+        lowLatency = false;
         conflictDetectionLevel = omidConf.getConflictAnalysisLevel();
 
     }
@@ -317,6 +317,7 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
      * Used for family deletion
      * @return the conflict detection level.
      */
+    @Override
     public ConflictDetectionLevel getConflictDetectionLevel() {
         return conflictDetectionLevel;
     }
@@ -324,6 +325,7 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
     /**
      * Used for family deletion testing
      */
+    @Override
     public void setConflictDetectionLevel(ConflictDetectionLevel conflictDetectionLevel) {
         this.conflictDetectionLevel = conflictDetectionLevel;
     }
@@ -342,7 +344,7 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
         setTSOAddress(hp.getHost(), hp.getPort());
         epoch = Long.parseLong(currentTSOAndEpochArray[1]);
         LOG.info("CurrentTSO ZNode changed. New TSO Host & Port {}/Epoch {}", hp, getEpoch());
-        if (currentChannel != null && currentChannel.isConnected()) {
+        if (currentChannel != null && currentChannel.isActive()) {
             LOG.info("\tClosing channel with previous TSO {}", currentChannel);
             currentChannel.close();
         }
@@ -513,7 +515,7 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
         }
 
         public StateMachine.State handleEvent(CloseEvent e) {
-            factory.releaseExternalResources();
+            bootstrap.config().group().shutdownGracefully();
             e.success(null);
             return this;
         }
@@ -527,10 +529,11 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
                 public void operationComplete(ChannelFuture channelFuture) throws Exception {
                     if (channelFuture.isSuccess()) {
                         LOG.info("Connection to TSO [{}] established. Channel {}",
-                                 tsoAddress, channelFuture.getChannel());
+                                 tsoAddress, channelFuture.channel());
                     } else {
                         LOG.error("Failed connection attempt to TSO [{}] failed. Channel {}",
-                                  tsoAddress, channelFuture.getChannel());
+                                  tsoAddress, channelFuture.channel());
+                        fsm.sendEvent(new ErrorEvent(new ConnectionException()));
                     }
                 }
             });
@@ -582,6 +585,7 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
             return timeout;
         }
 
+        @Override
         public String toString() {
             String info = "Request type ";
             if (event.getRequest().hasTimestampRequest()) {
@@ -610,7 +614,7 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
             TSOProto.HandshakeRequest.Builder handshake = TSOProto.HandshakeRequest.newBuilder();
             // Add the required handshake capabilities when necessary
             handshake.setClientCapabilities(TSOProto.Capabilities.newBuilder().build());
-            channel.write(TSOProto.Request.newBuilder().setHandshakeRequest(handshake.build()).build());
+            channel.writeAndFlush(TSOProto.Request.newBuilder().setHandshakeRequest(handshake.build()).build());
             timeout = newTimeout();
         }
 
@@ -764,13 +768,13 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
                 request.error(new IllegalArgumentException("Unknown request type"));
                 return;
             }
-            ChannelFuture f = channel.write(req);
+            ChannelFuture f = channel.writeAndFlush(req);
 
             f.addListener(new ChannelFutureListener() {
                 @Override
                 public void operationComplete(ChannelFuture future) {
                     if (!future.isSuccess()) {
-                        fsm.sendEvent(new ErrorEvent(future.getCause()));
+                        fsm.sendEvent(new ErrorEvent(future.cause()));
                     }
                 }
             });
@@ -1003,7 +1007,7 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
     // Helper classes & methods
     // ----------------------------------------------------------------------------------------------------------------
 
-    private class Handler extends SimpleChannelHandler {
+    private class Handler extends ChannelInboundHandlerAdapter {
 
         private StateMachine.Fsm fsm;
 
@@ -1012,37 +1016,34 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
         }
 
         @Override
-        public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
-            currentChannel = e.getChannel();
-            LOG.debug("HANDLER (CHANNEL CONNECTED): Connection {}. Sending connected event to FSM", e);
-            fsm.sendEvent(new ConnectedEvent(e.getChannel()));
+        public void channelActive(ChannelHandlerContext ctx) {
+            currentChannel = ctx.channel();
+            LOG.debug("HANDLER (CHANNEL ACTIVE): Connection {}. Sending connected event to FSM", ctx.channel());
+            fsm.sendEvent(new ConnectedEvent(ctx.channel()));
         }
 
         @Override
-        public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-            LOG.debug("HANDLER (CHANNEL DISCONNECTED): Connection {}. Sending error event to FSM", e);
+        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+            LOG.debug("HANDLER (CHANNEL INACTIVE): Connection {}. Sending error, then channelClosed event to FSM", ctx.channel());
+            // Netty 3 had separate callbacks, and the FSM expects both events.
+            // Sending both is much easier than rewriting the FSM
             fsm.sendEvent(new ErrorEvent(new ConnectionException()));
-        }
-
-        @Override
-        public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-            LOG.debug("HANDLER (CHANNEL CLOSED): Connection {}. Sending channel closed event to FSM", e);
             fsm.sendEvent(new ChannelClosedEvent(new ConnectionException()));
         }
 
         @Override
-        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
-            if (e.getMessage() instanceof TSOProto.Response) {
-                fsm.sendEvent(new ResponseEvent((TSOProto.Response) e.getMessage()));
+        public void channelRead(ChannelHandlerContext ctx, Object msg) {
+            if (msg instanceof TSOProto.Response) {
+                fsm.sendEvent(new ResponseEvent((TSOProto.Response) msg));
             } else {
-                LOG.warn("Received unknown message", e.getMessage());
+                LOG.warn("Received unknown message", msg);
             }
         }
 
         @Override
-        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
-            LOG.error("Error on channel {}", ctx.getChannel(), e.getCause());
-            fsm.sendEvent(new ErrorEvent(e.getCause()));
+        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+            LOG.error("Error on channel {}", ctx.channel(), cause);
+            fsm.sendEvent(new ErrorEvent(cause));
         }
     }
 
diff --git a/tso-server/pom.xml b/tso-server/pom.xml
index f5e3bbf..e2e0253 100644
--- a/tso-server/pom.xml
+++ b/tso-server/pom.xml
@@ -111,8 +111,8 @@
 
         <dependency>
             <groupId>io.netty</groupId>
-            <artifactId>netty</artifactId>
-            <version>${netty.version}</version>
+            <artifactId>netty-all</artifactId>
+            <version>${netty4.version}</version>
         </dependency>
         <dependency>
             <groupId>com.google.protobuf</groupId>
diff --git a/tso-server/src/main/java/org/apache/omid/tso/AbstractRequestProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/AbstractRequestProcessor.java
index 9f502df..2eb9cb1 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/AbstractRequestProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/AbstractRequestProcessor.java
@@ -28,7 +28,7 @@ import com.lmax.disruptor.dsl.Disruptor;
 
 import org.apache.omid.metrics.MetricsRegistry;
 import org.apache.omid.tso.TSOStateManager.TSOState;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/tso-server/src/main/java/org/apache/omid/tso/Batch.java b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
index 3c3e432..57405f4 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/Batch.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
@@ -23,7 +23,7 @@ import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
 import org.apache.commons.pool2.BasePooledObjectFactory;
 import org.apache.commons.pool2.PooledObject;
 import org.apache.commons.pool2.impl.DefaultPooledObject;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
index 9528d66..c177f02 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
@@ -19,7 +19,7 @@ package org.apache.omid.tso;
 
 import org.apache.phoenix.thirdparty.com.google.common.base.MoreObjects;
 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
 
 public final class PersistEvent {
 
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
index beb51c4..7982e12 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
@@ -18,7 +18,7 @@
 package org.apache.omid.tso;
 
 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
 
 import java.io.Closeable;
 
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
index 9360881..e1e3c60 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
@@ -30,7 +30,7 @@ import com.lmax.disruptor.dsl.Disruptor;
 import org.apache.commons.pool2.ObjectPool;
 import org.apache.omid.committable.CommitTable;
 import org.apache.omid.metrics.MetricsRegistry;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersitenceProcessorNullImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersitenceProcessorNullImpl.java
index e4aed24..411770e 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersitenceProcessorNullImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersitenceProcessorNullImpl.java
@@ -18,7 +18,7 @@
 package org.apache.omid.tso;
 
 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
 
 import java.io.IOException;
 
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
index d548909..50cf8a5 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
@@ -18,7 +18,7 @@
 package org.apache.omid.tso;
 
 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
 
 import java.io.Closeable;
 
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
index b51612e..7b4e4dc 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
@@ -32,7 +32,7 @@ import org.apache.commons.pool2.ObjectPool;
 import org.apache.omid.metrics.Meter;
 import org.apache.omid.metrics.MetricsRegistry;
 import org.apache.omid.proto.TSOProto;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -210,7 +210,7 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
                 .setStartTimestamp(startTimestamp)
                 .setCommitTimestamp(commitTimestamp);
         builder.setCommitResponse(commitBuilder.build());
-        c.write(builder.build());
+        c.writeAndFlush(builder.build());
         commitMeter.mark();
         monCtx.timerStop("reply.processor.commit.latency");
     }
@@ -223,7 +223,7 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
         commitBuilder.setAborted(true);
         commitBuilder.setStartTimestamp(startTimestamp);
         builder.setCommitResponse(commitBuilder.build());
-        c.write(builder.build());
+        c.writeAndFlush(builder.build());
         abortMeter.mark();
         monCtx.timerStop("reply.processor.abort.latency");
     }
@@ -235,7 +235,7 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
         TSOProto.TimestampResponse.Builder respBuilder = TSOProto.TimestampResponse.newBuilder();
         respBuilder.setStartTimestamp(startTimestamp);
         builder.setTimestampResponse(respBuilder.build());
-        c.write(builder.build());
+        c.writeAndFlush(builder.build());
         timestampMeter.mark();
         monCtx.timerStop("reply.processor.timestamp.latency");
     }
@@ -248,7 +248,7 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
         fenceBuilder.setTableId(tableID);
         fenceBuilder.setFenceId(fenceTimestamp);
         builder.setFenceResponse(fenceBuilder.build());
-        c.write(builder.build());
+        c.writeAndFlush(builder.build());
         monCtx.timerStop("reply.processor.fence.latency");
         fenceMeter.mark();
     }
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessor.java
index 062329d..2f83b6e 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessor.java
@@ -17,7 +17,7 @@
  */
 package org.apache.omid.tso;
 
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
 
 import java.io.Closeable;
 import java.util.Collection;
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java
index 7323638..15b5c78 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java
@@ -20,7 +20,7 @@ package org.apache.omid.tso;
 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
 import com.google.inject.Inject;
 import org.apache.omid.metrics.MetricsRegistry;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
 
 import java.io.IOException;
 
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java
index 2760e19..f8ce89a 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java
@@ -20,7 +20,7 @@ package org.apache.omid.tso;
 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
 import com.google.inject.Inject;
 import org.apache.omid.metrics.MetricsRegistry;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
 
 import java.io.IOException;
 
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessor.java
index c17b29a..1f9c000 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessor.java
@@ -17,7 +17,7 @@
  */
 package org.apache.omid.tso;
 
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
 
 import java.io.Closeable;
 
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
index 988e3cd..3654c58 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
@@ -31,7 +31,7 @@ import org.apache.omid.committable.CommitTable;
 import org.apache.omid.committable.CommitTable.CommitTimestamp;
 import org.apache.omid.metrics.Meter;
 import org.apache.omid.metrics.MetricsRegistry;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java b/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java
index 6269bb8..815ce55 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java
@@ -17,56 +17,58 @@
  */
 package org.apache.omid.tso;
 
-import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.ThreadFactory;
+
+import javax.inject.Inject;
+
 import org.apache.omid.metrics.MetricsRegistry;
 import org.apache.omid.proto.TSOProto;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelHandler;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
-import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
-import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.inject.Inject;
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.channels.ClosedChannelException;
-import java.util.concurrent.Executors;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.util.AttributeKey;
+import io.netty.util.concurrent.GlobalEventExecutor;
 
 /**
  * ChannelHandler for the TSO Server.
  *
  * Incoming requests are processed in this class
  */
-public class TSOChannelHandler extends SimpleChannelHandler implements Closeable {
-
-    private static final Logger LOG = LoggerFactory.getLogger(TSOChannelHandler.class);
+// Marked sharable, as all global members used in callbacks are singletons.
+@Sharable
+public class TSOChannelHandler extends ChannelInboundHandlerAdapter implements Closeable {
 
-    private final ChannelFactory factory;
+    private final Logger LOG = LoggerFactory.getLogger(TSOChannelHandler.class);
 
     private final ServerBootstrap bootstrap;
 
     @VisibleForTesting
     Channel listeningChannel;
     @VisibleForTesting
-    ChannelGroup channelGroup;
+    ChannelGroup allChannels;
 
     private RequestProcessor requestProcessor;
 
@@ -74,38 +76,57 @@ public class TSOChannelHandler extends SimpleChannelHandler implements Closeable
 
     private MetricsRegistry metrics;
 
+    private static final AttributeKey<TSOChannelContext> TSO_CTX =
+            AttributeKey.valueOf("TSO_CTX");
+
     @Inject
     public TSOChannelHandler(TSOServerConfig config, RequestProcessor requestProcessor, MetricsRegistry metrics) {
 
         this.config = config;
         this.metrics = metrics;
         this.requestProcessor = requestProcessor;
-        // Setup netty listener
-        this.factory = new NioServerSocketChannelFactory(
-                Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("boss-%d").build()),
-                Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("worker-%d").build()),
-                (Runtime.getRuntime().availableProcessors() * 2 + 1) * 2);
-
-        this.bootstrap = new ServerBootstrap(factory);
-        bootstrap.setPipelineFactory(new TSOPipelineFactory(this));
 
+        // Setup netty listener
+        int workerThreadCount= (Runtime.getRuntime().availableProcessors() * 2 + 1) * 2;
+        ThreadFactory bossThreadFactory = new ThreadFactoryBuilder().setNameFormat("tsoserver-boss-%d").build();
+        ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("tsoserver-worker-%d").build();
+        EventLoopGroup workerGroup = new NioEventLoopGroup(workerThreadCount, workerThreadFactory);
+        EventLoopGroup bossGroup = new NioEventLoopGroup(bossThreadFactory);
+
+        this.bootstrap = new ServerBootstrap();
+        bootstrap.group(bossGroup,  workerGroup);
+        bootstrap.channel(NioServerSocketChannel.class);
+        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
+            @Override
+            public void initChannel(SocketChannel channel) throws Exception {
+                ChannelPipeline pipeline = channel.pipeline();
+                // Max packet length is 10MB. Transactions with so many cells
+                // that the packet is rejected will receive a ServiceUnavailableException.
+                // 10MB is enough for 2 million cells in a transaction though.
+                pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(10 * 1024 * 1024, 0, 4, 0, 4));
+                pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
+                pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Request.getDefaultInstance()));
+                pipeline.addLast("protobufencoder", new ProtobufEncoder());
+                pipeline.addLast("handler", TSOChannelHandler.this);
+            }
+        });
     }
 
     /**
      * Allows to create and connect the communication channel closing the previous one if existed
      */
     void reconnect() {
-        if (listeningChannel == null && channelGroup == null) {
+        if (listeningChannel == null && allChannels == null) {
             LOG.debug("Creating communication channel...");
         } else {
             LOG.debug("Reconnecting communication channel...");
             closeConnection();
         }
         // Create the global ChannelGroup
-        channelGroup = new DefaultChannelGroup(TSOChannelHandler.class.getName());
+        allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
         LOG.debug("\tCreating channel to listening for incoming connections in port {}", config.getPort());
-        listeningChannel = bootstrap.bind(new InetSocketAddress(config.getPort()));
-        channelGroup.add(listeningChannel);
+        listeningChannel = bootstrap.bind(new InetSocketAddress(config.getPort())).syncUninterruptibly().channel();
+        allChannels.add(listeningChannel);
         LOG.debug("\tListening channel created and connected: {}", listeningChannel);
     }
 
@@ -114,15 +135,10 @@ public class TSOChannelHandler extends SimpleChannelHandler implements Closeable
      */
     void closeConnection() {
         LOG.debug("Closing communication channel...");
-        if (listeningChannel != null) {
-            LOG.debug("\tUnbinding listening channel {}", listeningChannel);
-            listeningChannel.unbind().awaitUninterruptibly();
-            LOG.debug("\tListening channel {} unbound", listeningChannel);
-        }
-        if (channelGroup != null) {
-            LOG.debug("\tClosing channel group {}", channelGroup);
-            channelGroup.close().awaitUninterruptibly();
-            LOG.debug("\tChannel group {} closed", channelGroup);
+        if (allChannels != null) {
+            LOG.debug("\tClosing channel group {}", allChannels);
+            allChannels.close().awaitUninterruptibly();
+            LOG.debug("\tChannel group {} closed", allChannels);
         }
     }
 
@@ -131,28 +147,22 @@ public class TSOChannelHandler extends SimpleChannelHandler implements Closeable
     // ----------------------------------------------------------------------------------------------------------------
 
     @Override
-    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-        channelGroup.add(ctx.getChannel());
-        LOG.debug("TSO channel connected: {}", ctx.getChannel());
-    }
-
-    @Override
-    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-        channelGroup.remove(ctx.getChannel());
-        LOG.debug("TSO channel disconnected: {}", ctx.getChannel());
+    public void channelActive(ChannelHandlerContext ctx) {
+        allChannels.add(ctx.channel());
+        LOG.debug("TSO channel active: {}", ctx.channel());
     }
 
     @Override
-    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-        LOG.debug("TSO channel closed: {}", ctx.getChannel());
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        //ChannelGroup will automatically remove closed Channels
+        LOG.debug("TSO channel inactive: {}", ctx.channel());
     }
 
     /**
      * Handle received messages
      */
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
-        Object msg = e.getMessage();
+    public void channelRead(ChannelHandlerContext ctx, Object msg) {
         if (msg instanceof TSOProto.Request) {
             TSOProto.Request request = (TSOProto.Request) msg;
             if (request.hasHandshakeRequest()) {
@@ -160,28 +170,28 @@ public class TSOChannelHandler extends SimpleChannelHandler implements Closeable
                 return;
             }
             if (!handshakeCompleted(ctx)) {
-                LOG.error("Handshake not completed. Closing channel {}", ctx.getChannel());
-                ctx.getChannel().close();
+                LOG.error("Handshake not completed. Closing channel {}", ctx.channel());
+                ctx.channel().close();
             }
 
             if (request.hasTimestampRequest()) {
-                requestProcessor.timestampRequest(ctx.getChannel(), MonitoringContextFactory.getInstance(config,metrics));
+                requestProcessor.timestampRequest(ctx.channel(), MonitoringContextFactory.getInstance(config,metrics));
             } else if (request.hasCommitRequest()) {
                 TSOProto.CommitRequest cr = request.getCommitRequest();
                 requestProcessor.commitRequest(cr.getStartTimestamp(),
                                                cr.getCellIdList(),
                                                cr.getTableIdList(),
                                                cr.getIsRetry(),
-                                               ctx.getChannel(),
+                                               ctx.channel(),
                                                MonitoringContextFactory.getInstance(config,metrics));
             } else if (request.hasFenceRequest()) {
                 TSOProto.FenceRequest fr = request.getFenceRequest();
                 requestProcessor.fenceRequest(fr.getTableId(),
-                        ctx.getChannel(),
+                        ctx.channel(),
                         MonitoringContextFactory.getInstance(config,metrics));
             } else {
-                LOG.error("Invalid request {}. Closing channel {}", request, ctx.getChannel());
-                ctx.getChannel().close();
+                LOG.error("Invalid request {}. Closing channel {}", request, ctx.channel());
+                ctx.channel().close();
             }
         } else {
             LOG.error("Unknown message type", msg);
@@ -190,13 +200,13 @@ public class TSOChannelHandler extends SimpleChannelHandler implements Closeable
 
     @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-        if (e.getCause() instanceof ClosedChannelException) {
-            LOG.warn("ClosedChannelException caught. Cause: ", e.getCause());
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        if (cause instanceof ClosedChannelException) {
+            LOG.warn("ClosedChannelException caught. Cause: ", cause);
             return;
         }
-        LOG.warn("Unexpected exception from downstream. Closing channel {} {}", ctx.getChannel(), e.getCause());
-        ctx.getChannel().close();
+        LOG.warn("Unexpected exception. Closing channel {}", ctx.channel(), cause);
+        ctx.channel().close();
     }
 
     // ----------------------------------------------------------------------------------------------------------------
@@ -204,8 +214,12 @@ public class TSOChannelHandler extends SimpleChannelHandler implements Closeable
     // ----------------------------------------------------------------------------------------------------------------
     @Override
     public void close() throws IOException {
-        closeConnection();
-        factory.releaseExternalResources();
+        LOG.debug("Shutting down communication channel...");
+        bootstrap.config().group().shutdownGracefully();
+        bootstrap.config().childGroup().shutdownGracefully();
+
+        bootstrap.config().group().terminationFuture().awaitUninterruptibly();
+        bootstrap.config().childGroup().terminationFuture().awaitUninterruptibly();
     }
 
     // ----------------------------------------------------------------------------------------------------------------
@@ -242,51 +256,22 @@ public class TSOChannelHandler extends SimpleChannelHandler implements Closeable
                     .setServerCapabilities(TSOProto.Capabilities.newBuilder().build());
             TSOChannelContext tsoCtx = new TSOChannelContext();
             tsoCtx.setHandshakeComplete();
-            ctx.setAttachment(tsoCtx);
+            ctx.channel().attr(TSO_CTX).set(tsoCtx);
         } else {
             response.setClientCompatible(false);
         }
         response.setLowLatency(config.getLowLatency());
-        ctx.getChannel().write(TSOProto.Response.newBuilder().setHandshakeResponse(response.build()).build());
+        ctx.channel().writeAndFlush(TSOProto.Response.newBuilder().setHandshakeResponse(response.build()).build());
 
     }
 
     private boolean handshakeCompleted(ChannelHandlerContext ctx) {
 
-        Object o = ctx.getAttachment();
-        if (o instanceof TSOChannelContext) {
-            TSOChannelContext tsoCtx = (TSOChannelContext) o;
+        TSOChannelContext tsoCtx = ctx.channel().attr(TSO_CTX).get();
+        if (tsoCtx != null) {
             return tsoCtx.getHandshakeComplete();
         }
         return false;
-
-    }
-
-    /**
-     * Netty pipeline configuration
-     */
-    static class TSOPipelineFactory implements ChannelPipelineFactory {
-
-        private final ChannelHandler handler;
-
-        TSOPipelineFactory(ChannelHandler handler) {
-            this.handler = handler;
-        }
-
-        public ChannelPipeline getPipeline() throws Exception {
-
-            ChannelPipeline pipeline = Channels.pipeline();
-            // Max packet length is 10MB. Transactions with so many cells
-            // that the packet is rejected will receive a ServiceUnavailableException.
-            // 10MB is enough for 2 million cells in a transaction though.
-            pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(10 * 1024 * 1024, 0, 4, 0, 4));
-            pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
-            pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Request.getDefaultInstance()));
-            pipeline.addLast("protobufencoder", new ProtobufEncoder());
-            pipeline.addLast("handler", handler);
-
-            return pipeline;
-        }
     }
 
 }
diff --git a/tso-server/src/test/java/org/apache/omid/tso/ProgrammableTSOServer.java b/tso-server/src/test/java/org/apache/omid/tso/ProgrammableTSOServer.java
index 0e2c9f6..7142ef1 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/ProgrammableTSOServer.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/ProgrammableTSOServer.java
@@ -20,18 +20,27 @@ package org.apache.omid.tso;
 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.omid.proto.TSOProto;
 import org.apache.omid.tso.ProgrammableTSOServer.Response.ResponseType;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.util.AttributeKey;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,36 +49,51 @@ import java.net.InetSocketAddress;
 import java.nio.channels.ClosedChannelException;
 import java.util.LinkedList;
 import java.util.Queue;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 
 /**
  * Used in tests. Allows to program the set of responses returned by a TSO
  */
-public class ProgrammableTSOServer extends SimpleChannelHandler {
+@Sharable
+public class ProgrammableTSOServer extends ChannelInboundHandlerAdapter {
 
     private static final Logger LOG = LoggerFactory.getLogger(ProgrammableTSOServer.class);
 
-    private ChannelFactory factory;
-    private ChannelGroup channelGroup;
+    private ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
 
     private Queue<Response> responseQueue = new LinkedList<>();
 
+    private static final AttributeKey<TSOChannelContext> TSO_CTX =
+            AttributeKey.valueOf("TSO_CTX");
+
     @Inject
     public ProgrammableTSOServer(int port) {
         // Setup netty listener
-        factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(new ThreadFactoryBuilder()
-                .setNameFormat("boss-%d").build()), Executors.newCachedThreadPool(new ThreadFactoryBuilder()
-                .setNameFormat("worker-%d").build()), (Runtime.getRuntime().availableProcessors() * 2 + 1) * 2);
-
-        // Create the global ChannelGroup
-        channelGroup = new DefaultChannelGroup(ProgrammableTSOServer.class.getName());
 
-        ServerBootstrap bootstrap = new ServerBootstrap(factory);
-        bootstrap.setPipelineFactory(new TSOChannelHandler.TSOPipelineFactory(this));
+        int workerThreadCount = (Runtime.getRuntime().availableProcessors() * 2 + 1) * 2;
+        ThreadFactory bossThreadFactory = new ThreadFactoryBuilder().setNameFormat("tsoserver-boss-%d").build();
+        ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("tsoserver-worker-%d").build();
+        EventLoopGroup workerGroup = new NioEventLoopGroup(workerThreadCount, workerThreadFactory);
+        EventLoopGroup bossGroup = new NioEventLoopGroup(bossThreadFactory);
+
+        ServerBootstrap bootstrap = new ServerBootstrap();
+        bootstrap.group(bossGroup,  workerGroup);
+        bootstrap.channel(NioServerSocketChannel.class);
+        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
+            @Override
+            public void initChannel(SocketChannel channel) throws Exception {
+                ChannelPipeline pipeline = channel.pipeline();
+                pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(10 * 1024 * 1024, 0, 4, 0, 4));
+                pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
+                pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Request.getDefaultInstance()));
+                pipeline.addLast("protobufencoder", new ProtobufEncoder());
+                pipeline.addLast("handler", ProgrammableTSOServer.this);
+            }
+        });
 
         // Add the parent channel to the group
-        Channel channel = bootstrap.bind(new InetSocketAddress(port));
-        channelGroup.add(channel);
+        Channel channel = bootstrap.bind(new InetSocketAddress(port)).syncUninterruptibly().channel();
+        allChannels.add(channel);
 
         LOG.info("********** Dumb TSO Server running on port {} **********", port);
     }
@@ -96,24 +120,18 @@ public class ProgrammableTSOServer extends SimpleChannelHandler {
     // ******************** End of Main interface for tests *******************
 
     @Override
-    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-        channelGroup.add(ctx.getChannel());
-    }
-
-    @Override
-    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-        channelGroup.remove(ctx.getChannel());
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        allChannels.add(ctx.channel());
     }
 
     /**
      * Handle received messages
      */
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
-        Object msg = e.getMessage();
+    public void channelRead(ChannelHandlerContext ctx, Object msg) {
         if (msg instanceof TSOProto.Request) {
             TSOProto.Request request = (TSOProto.Request) msg;
-            Channel channel = ctx.getChannel();
+            Channel channel = ctx.channel();
             if (request.hasHandshakeRequest()) {
                 checkHandshake(ctx, request.getHandshakeRequest());
                 return;
@@ -148,7 +166,7 @@ public class ProgrammableTSOServer extends SimpleChannelHandler {
                 }
             } else {
                 LOG.error("Invalid request {}", request);
-                ctx.getChannel().close();
+                ctx.channel().close();
             }
         } else {
             LOG.error("Unknown message type", msg);
@@ -156,12 +174,12 @@ public class ProgrammableTSOServer extends SimpleChannelHandler {
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-        if (e.getCause() instanceof ClosedChannelException) {
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        if (cause instanceof ClosedChannelException) {
             return;
         }
-        LOG.warn("TSOHandler: Unexpected exception from downstream.", e.getCause());
-        Channels.close(e.getChannel());
+        LOG.warn("TSOHandler: Unexpected exception.", cause);
+        ctx.channel().close();
     }
 
     private void checkHandshake(final ChannelHandlerContext ctx, TSOProto.HandshakeRequest request) {
@@ -171,15 +189,15 @@ public class ProgrammableTSOServer extends SimpleChannelHandler {
             response.setClientCompatible(true).setServerCapabilities(TSOProto.Capabilities.newBuilder().build());
             TSOChannelContext tsoCtx = new TSOChannelContext();
             tsoCtx.setHandshakeComplete();
-            ctx.setAttachment(tsoCtx);
+            ctx.channel().attr(TSO_CTX).set(tsoCtx);
         } else {
             response.setClientCompatible(false);
         }
-        ctx.getChannel().write(TSOProto.Response.newBuilder().setHandshakeResponse(response.build()).build());
+        ctx.channel().writeAndFlush(TSOProto.Response.newBuilder().setHandshakeResponse(response.build()).build());
     }
 
     private boolean handshakeCompleted(ChannelHandlerContext ctx) {
-        Object o = ctx.getAttachment();
+        Object o = ctx.channel().attr(TSO_CTX).get();
         if (o instanceof TSOChannelContext) {
             TSOChannelContext tsoCtx = (TSOChannelContext) o;
             return tsoCtx.getHandshakeComplete();
@@ -192,7 +210,7 @@ public class ProgrammableTSOServer extends SimpleChannelHandler {
         TSOProto.TimestampResponse.Builder respBuilder = TSOProto.TimestampResponse.newBuilder();
         respBuilder.setStartTimestamp(startTimestamp);
         builder.setTimestampResponse(respBuilder.build());
-        c.write(builder.build());
+        c.writeAndFlush(builder.build());
     }
 
     private void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel c) {
@@ -200,7 +218,7 @@ public class ProgrammableTSOServer extends SimpleChannelHandler {
         TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
         commitBuilder.setAborted(false).setStartTimestamp(startTimestamp).setCommitTimestamp(commitTimestamp);
         builder.setCommitResponse(commitBuilder.build());
-        c.write(builder.build());
+        c.writeAndFlush(builder.build());
     }
 
     private void sendAbortResponse(long startTimestamp, Channel c) {
@@ -208,7 +226,7 @@ public class ProgrammableTSOServer extends SimpleChannelHandler {
         TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
         commitBuilder.setAborted(true).setStartTimestamp(startTimestamp);
         builder.setCommitResponse(commitBuilder.build());
-        c.write(builder.build());
+        c.writeAndFlush(builder.build());
     }
 
     private static class TSOChannelContext {
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
index 3e35f93..da3351c 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
@@ -19,7 +19,7 @@ package org.apache.omid.tso;
 
 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
 import org.apache.commons.pool2.PooledObject;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
 import org.mockito.Mock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
index be15c62..e98336e 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
@@ -22,7 +22,7 @@ import org.apache.commons.pool2.ObjectPool;
 import org.apache.omid.committable.CommitTable;
 import org.apache.omid.metrics.MetricsRegistry;
 import org.apache.omid.metrics.NullMetricsProvider;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.Mockito;
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
index c99dd25..02a6790 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
@@ -22,7 +22,7 @@ import org.apache.omid.committable.CommitTable;
 import org.apache.omid.metrics.MetricsRegistry;
 import org.apache.omid.metrics.NullMetricsProvider;
 import org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
index 977f630..d1c5ade 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
@@ -23,7 +23,7 @@ import org.apache.commons.pool2.ObjectPool;
 import org.apache.omid.metrics.MetricsRegistry;
 import org.apache.omid.metrics.NullMetricsProvider;
 import org.apache.omid.tso.ReplyProcessorImpl.ReplyBatchEvent;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
 import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
index 8f7a5a8..e8c621f 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
@@ -24,7 +24,7 @@ import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableF
 import org.apache.omid.committable.CommitTable;
 import org.apache.omid.metrics.MetricsRegistry;
 import org.apache.omid.metrics.NullMetricsProvider;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
 import org.mockito.ArgumentCaptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
index 1b84b31..370b27b 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
@@ -25,7 +25,7 @@ import org.apache.omid.committable.CommitTable;
 import org.apache.omid.committable.CommitTable.CommitTimestamp;
 import org.apache.omid.committable.InMemoryCommitTable;
 import org.apache.omid.metrics.MetricsRegistry;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java b/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
index 88988e1..cae9c85 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
@@ -21,21 +21,25 @@ import org.apache.omid.NetworkUtils;
 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.omid.metrics.NullMetricsProvider;
 import org.apache.omid.proto.TSOProto;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelException;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
-import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
-import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelException;
+import io.netty.channel.ChannelFactory;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.slf4j.Logger;
@@ -48,6 +52,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Random;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Matchers.any;
@@ -92,45 +97,45 @@ public class TestTSOChannelHandlerNetty {
         try {
             // Check initial state
             assertNull(channelHandler.listeningChannel);
-            assertNull(channelHandler.channelGroup);
+            assertNull(channelHandler.allChannels);
 
             // Check initial connection
             channelHandler.reconnect();
             assertTrue(channelHandler.listeningChannel.isOpen());
-            assertEquals(channelHandler.channelGroup.size(), 1);
-            assertEquals(((InetSocketAddress) channelHandler.listeningChannel.getLocalAddress()).getPort(), port);
+            assertEquals(channelHandler.allChannels.size(), 1);
+            assertEquals(((InetSocketAddress) channelHandler.listeningChannel.localAddress()).getPort(), port);
 
             // Check connection close
             channelHandler.closeConnection();
             assertFalse(channelHandler.listeningChannel.isOpen());
-            assertEquals(channelHandler.channelGroup.size(), 0);
+            assertEquals(channelHandler.allChannels.size(), 0);
 
             // Check re-closing connection
             channelHandler.closeConnection();
             assertFalse(channelHandler.listeningChannel.isOpen());
-            assertEquals(channelHandler.channelGroup.size(), 0);
+            assertEquals(channelHandler.allChannels.size(), 0);
 
             // Check connection after closing
             channelHandler.reconnect();
             assertTrue(channelHandler.listeningChannel.isOpen());
-            assertEquals(channelHandler.channelGroup.size(), 1);
+            assertEquals(channelHandler.allChannels.size(), 1);
 
             // Check re-connection
             channelHandler.reconnect();
             assertTrue(channelHandler.listeningChannel.isOpen());
-            assertEquals(channelHandler.channelGroup.size(), 1);
+            assertEquals(channelHandler.allChannels.size(), 1);
 
             // Exercise closeable with re-connection trial
             channelHandler.close();
             assertFalse(channelHandler.listeningChannel.isOpen());
-            assertEquals(channelHandler.channelGroup.size(), 0);
+            assertEquals(channelHandler.allChannels.size(), 0);
             try {
                 channelHandler.reconnect();
                 fail("Can't reconnect after closing");
-            } catch (ChannelException e) {
+            } catch (Exception e) {
                 // Expected: Can't reconnect after closing
                 assertFalse(channelHandler.listeningChannel.isOpen());
-                assertEquals(channelHandler.channelGroup.size(), 0);
+                assertEquals(channelHandler.allChannels.size(), 0);
             }
         } finally {
             if(channelHandler != null) channelHandler.close();
@@ -142,7 +147,7 @@ public class TestTSOChannelHandlerNetty {
         int port = NetworkUtils.getFreePort();
         TSOChannelHandler channelHandler = getTSOChannelHandler(port);
         try {
-            ClientBootstrap nettyClient = createNettyClientBootstrap();
+            Bootstrap nettyClient = createNettyClientBootstrap();
 
             ChannelFuture channelF = nettyClient.connect(new InetSocketAddress("localhost", port));
 
@@ -158,7 +163,7 @@ public class TestTSOChannelHandlerNetty {
             channelHandler.reconnect();
             assertTrue(channelHandler.listeningChannel.isOpen());
             // Eventually the channel group of the server should contain the listening channel
-            assertEquals(channelHandler.channelGroup.size(), 1);
+            assertEquals(channelHandler.allChannels.size(), 1);
 
             // ------------------------------------------------------------------------------------------------------------
             // Test that a client can connect now
@@ -166,16 +171,16 @@ public class TestTSOChannelHandlerNetty {
             channelF = nettyClient.connect(new InetSocketAddress("localhost", port));
             while (!channelF.isDone()) /** do nothing */ ;
             assertTrue(channelF.isSuccess());
-            assertTrue(channelF.getChannel().isConnected());
+            assertTrue(channelF.channel().isActive());
             // Eventually the channel group of the server should have 2 elements
-            while (channelHandler.channelGroup.size() != 2) /** do nothing */ ;
+            while (channelHandler.allChannels.size() != 2) /** do nothing */ ;
 
             // ------------------------------------------------------------------------------------------------------------
             // Close the channel on the client side and test we have one element less in the channel group
             // ------------------------------------------------------------------------------------------------------------
-            channelF.getChannel().close().await();
+            channelF.channel().close().await();
             // Eventually the channel group of the server should have only one element
-            while (channelHandler.channelGroup.size() != 1) /** do nothing */ ;
+            while (channelHandler.allChannels.size() != 1) /** do nothing */ ;
 
             // ------------------------------------------------------------------------------------------------------------
             // Open a new channel and test the connection closing on the server side through the channel handler
@@ -184,13 +189,13 @@ public class TestTSOChannelHandlerNetty {
             while (!channelF.isDone()) /** do nothing */ ;
             assertTrue(channelF.isSuccess());
             // Eventually the channel group of the server should have 2 elements again
-            while (channelHandler.channelGroup.size() != 2) /** do nothing */ ;
+            while (channelHandler.allChannels.size() != 2) /** do nothing */ ;
             channelHandler.closeConnection();
             assertFalse(channelHandler.listeningChannel.isOpen());
-            assertEquals(channelHandler.channelGroup.size(), 0);
+            assertEquals(channelHandler.allChannels.size(), 0);
             // Wait some time and check the channel was closed
             TimeUnit.SECONDS.sleep(1);
-            assertFalse(channelF.getChannel().isOpen());
+            assertFalse(channelF.channel().isOpen());
 
             // ------------------------------------------------------------------------------------------------------------
             // Test server re-connections with connected clients
@@ -199,28 +204,28 @@ public class TestTSOChannelHandlerNetty {
             channelHandler.reconnect();
             assertTrue(channelHandler.listeningChannel.isOpen());
             // Eventually the channel group of the server should contain the listening channel
-            assertEquals(channelHandler.channelGroup.size(), 1);
+            assertEquals(channelHandler.allChannels.size(), 1);
             // Check the client can connect
             channelF = nettyClient.connect(new InetSocketAddress("localhost", port));
             while (!channelF.isDone()) /** do nothing */ ;
             assertTrue(channelF.isSuccess());
             // Eventually the channel group of the server should have 2 elements
-            while (channelHandler.channelGroup.size() != 2) /** do nothing */ ;
+            while (channelHandler.allChannels.size() != 2) /** do nothing */ ;
             // Re-connect and check that client connection was gone
             channelHandler.reconnect();
             assertTrue(channelHandler.listeningChannel.isOpen());
             // Eventually the channel group of the server should contain the listening channel
-            assertEquals(channelHandler.channelGroup.size(), 1);
+            assertEquals(channelHandler.allChannels.size(), 1);
             // Wait some time and check the channel was closed
             TimeUnit.SECONDS.sleep(1);
-            assertFalse(channelF.getChannel().isOpen());
+            assertFalse(channelF.channel().isOpen());
 
             // ------------------------------------------------------------------------------------------------------------
             // Test closeable interface with re-connection trial
             // ------------------------------------------------------------------------------------------------------------
             channelHandler.close();
             assertFalse(channelHandler.listeningChannel.isOpen());
-            assertEquals(channelHandler.channelGroup.size(), 0);
+            assertEquals(channelHandler.allChannels.size(), 0);
         } finally {
             if (channelHandler != null) channelHandler.close();
         }
@@ -239,20 +244,20 @@ public class TestTSOChannelHandlerNetty {
             // Connect channel handler
             channelHandler.reconnect();
             // Create client and connect it
-            ClientBootstrap nettyClient = createNettyClientBootstrap();
+            Bootstrap nettyClient = createNettyClientBootstrap();
             ChannelFuture channelF = nettyClient.connect(new InetSocketAddress("localhost", port));
             // Basic checks for connection
             while (!channelF.isDone()) /** do nothing */ ;
             assertTrue(channelF.isSuccess());
-            assertTrue(channelF.getChannel().isConnected());
-            Channel channel = channelF.getChannel();
+            assertTrue(channelF.channel().isActive());
+            Channel channel = channelF.channel();
             // Eventually the channel group of the server should have 2 elements
-            while (channelHandler.channelGroup.size() != 2) /** do nothing */ ;
+            while (channelHandler.allChannels.size() != 2) /** do nothing */ ;
             // Write first handshake request
             TSOProto.HandshakeRequest.Builder handshake = TSOProto.HandshakeRequest.newBuilder();
             // NOTE: Add here the required handshake capabilities when necessary
             handshake.setClientCapabilities(TSOProto.Capabilities.newBuilder().build());
-            channelF.getChannel().write(TSOProto.Request.newBuilder().setHandshakeRequest(handshake.build()).build());
+            channelF.channel().writeAndFlush(TSOProto.Request.newBuilder().setHandshakeRequest(handshake.build()).build());
 
             // ------------------------------------------------------------------------------------------------------------
             // Test channel writing
@@ -275,7 +280,7 @@ public class TestTSOChannelHandlerNetty {
         TSOProto.TimestampRequest.Builder tsRequestBuilder = TSOProto.TimestampRequest.newBuilder();
         tsBuilder.setTimestampRequest(tsRequestBuilder.build());
         // Write into the channel
-        channel.write(tsBuilder.build()).await();
+        channel.writeAndFlush(tsBuilder.build()).await();
         verify(requestProcessor, timeout(100).times(1)).timestampRequest(any(Channel.class), any(MonitoringContextImpl.class));
         verify(requestProcessor, timeout(100).times(0))
                 .commitRequest(anyLong(), anyCollectionOf(Long.class), anyCollectionOf(Long.class), anyBoolean(), any(Channel.class), any(MonitoringContextImpl.class));
@@ -292,7 +297,7 @@ public class TestTSOChannelHandlerNetty {
         TSOProto.Request r = commitBuilder.build();
         assertTrue(r.hasCommitRequest());
         // Write into the channel
-        channel.write(commitBuilder.build()).await();
+        channel.writeAndFlush(commitBuilder.build()).await();
         verify(requestProcessor, timeout(100).times(0)).timestampRequest(any(Channel.class), any(MonitoringContextImpl.class));
         verify(requestProcessor, timeout(100).times(1))
                 .commitRequest(eq(666L), anyCollectionOf(Long.class), anyCollectionOf(Long.class), eq(false), any(Channel.class), any(MonitoringContextImpl.class));
@@ -308,7 +313,7 @@ public class TestTSOChannelHandlerNetty {
         TSOProto.Request r = fenceBuilder.build();
         assertTrue(r.hasFenceRequest());
         // Write into the channel
-        channel.write(fenceBuilder.build()).await();
+        channel.writeAndFlush(fenceBuilder.build()).await();
         verify(requestProcessor, timeout(100).times(0)).timestampRequest(any(Channel.class), any(MonitoringContextImpl.class));
         verify(requestProcessor, timeout(100).times(1))
                 .fenceRequest(eq(666L), any(Channel.class), any(MonitoringContextImpl.class));
@@ -318,37 +323,43 @@ public class TestTSOChannelHandlerNetty {
     // Helper methods
     // ----------------------------------------------------------------------------------------------------------------
 
-    private ClientBootstrap createNettyClientBootstrap() {
-
-        ChannelFactory factory = new NioClientSocketChannelFactory(
-                Executors.newCachedThreadPool(
-                        new ThreadFactoryBuilder().setNameFormat("client-boss-%d").build()),
-                Executors.newCachedThreadPool(
-                        new ThreadFactoryBuilder().setNameFormat("client-worker-%d").build()), 1);
-        // Create the bootstrap
-        ClientBootstrap bootstrap = new ClientBootstrap(factory);
-        bootstrap.setOption("tcpNoDelay", true);
-        bootstrap.setOption("keepAlive", true);
-        bootstrap.setOption("reuseAddress", true);
-        bootstrap.setOption("connectTimeoutMillis", 100);
-        ChannelPipeline pipeline = bootstrap.getPipeline();
-        pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
-        pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
-        pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
-        pipeline.addLast("protobufencoder", new ProtobufEncoder());
-        pipeline.addLast("handler", new SimpleChannelHandler() {
+    private Bootstrap createNettyClientBootstrap() {
 
-            @Override
-            public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
-                LOG.info("Channel {} connected", ctx.getChannel());
-            }
+        ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build();
+        EventLoopGroup workerGroup = new NioEventLoopGroup(1, workerThreadFactory);
 
+        Bootstrap bootstrap = new Bootstrap();
+        bootstrap.option(ChannelOption.TCP_NODELAY, true);
+        bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+        bootstrap.option(ChannelOption.SO_REUSEADDR, true);
+        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 100);
+
+        bootstrap.group(workerGroup);
+        bootstrap.channel(NioSocketChannel.class);
+        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
             @Override
-            public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
-                LOG.error("Error on channel {}", ctx.getChannel(), e.getCause());
+            public void initChannel(SocketChannel channel) throws Exception {
+                ChannelPipeline pipeline = channel.pipeline();
+                pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
+                pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
+                pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
+                pipeline.addLast("protobufencoder", new ProtobufEncoder());
+                pipeline.addLast("testhandler", new ChannelInboundHandlerAdapter() {
+
+                    @Override
+                    public void channelActive(ChannelHandlerContext ctx) {
+                        LOG.info("Channel {} active", ctx.channel());
+                    }
+
+                    @Override
+                    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+                        LOG.error("Error on channel {}", ctx.channel(), cause);
+                    }
+
+                });
             }
-
         });
+
         return bootstrap;
     }
 }
diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientRaw.java b/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientRaw.java
index d40c26c..03f1609 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientRaw.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientRaw.java
@@ -21,21 +21,24 @@ import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableF
 import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.omid.proto.TSOProto;
 import org.apache.omid.proto.TSOProto.Response;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
-import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
-import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFactory;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,6 +48,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
 
 /**
  * Raw client for communicating with tso server directly with protobuf messages
@@ -58,39 +62,39 @@ public class TSOClientRaw {
     private final Channel channel;
 
     public TSOClientRaw(String host, int port) throws InterruptedException, ExecutionException {
-        // Start client with Nb of active threads = 3 as maximum.
-        ChannelFactory factory = new NioClientSocketChannelFactory(
-                Executors.newCachedThreadPool(
-                        new ThreadFactoryBuilder().setNameFormat("tsoclient-boss-%d").build()),
-                Executors.newCachedThreadPool(
-                        new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build()), 3);
-        // Create the bootstrap
-        ClientBootstrap bootstrap = new ClientBootstrap(factory);
 
         InetSocketAddress addr = new InetSocketAddress(host, port);
 
-        ChannelPipeline pipeline = bootstrap.getPipeline();
-        pipeline.addLast("lengthbaseddecoder",
-                new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
-        pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
-        pipeline.addLast("protobufdecoder",
-                new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
-        pipeline.addLast("protobufencoder", new ProtobufEncoder());
-
-        Handler handler = new Handler();
-        pipeline.addLast("handler", handler);
-
-        bootstrap.setOption("tcpNoDelay", true);
-        bootstrap.setOption("keepAlive", true);
-        bootstrap.setOption("reuseAddress", true);
-        bootstrap.setOption("connectTimeoutMillis", 100);
+        // Start client with Nb of active threads = 3
+        ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build();
+        EventLoopGroup workerGroup = new NioEventLoopGroup(3, workerThreadFactory);
+
+        Bootstrap bootstrap = new Bootstrap();
+        bootstrap.group(workerGroup);
+        bootstrap.channel(NioSocketChannel.class);
+        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
+            @Override
+            public void initChannel(SocketChannel channel) throws Exception {
+                ChannelPipeline pipeline = channel.pipeline();
+                pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
+                pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
+                pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
+                pipeline.addLast("protobufencoder", new ProtobufEncoder());
+                pipeline.addLast("rawHandler", new RawHandler());
+            }
+        });
+        bootstrap.option(ChannelOption.TCP_NODELAY, true);
+        bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+        bootstrap.option(ChannelOption.SO_REUSEADDR, true);
+        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 100);
 
         ChannelFuture channelFuture = bootstrap.connect(addr).await();
-        channel = channelFuture.getChannel();
+        channel = channelFuture.channel();
+
     }
 
     public void write(TSOProto.Request request) {
-        channel.write(request);
+        channel.writeAndFlush(request);
     }
 
     public Future<Response> getResponse() throws InterruptedException {
@@ -104,12 +108,12 @@ public class TSOClientRaw {
         channel.close();
     }
 
-    private class Handler extends SimpleChannelHandler {
+    private class RawHandler extends ChannelInboundHandlerAdapter {
         @Override
-        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
-            LOG.info("Message received", e);
-            if (e.getMessage() instanceof Response) {
-                Response resp = (Response) e.getMessage();
+        public void channelRead(ChannelHandlerContext ctx, Object msg) {
+            LOG.info("Message received", msg);
+            if (msg instanceof Response) {
+                Response resp = (Response) msg;
                 try {
                     SettableFuture<Response> future = responseQueue.take();
                     future.set(resp);
@@ -118,16 +122,16 @@ public class TSOClientRaw {
                     LOG.warn("Interrupted in handler", ie);
                 }
             } else {
-                LOG.warn("Received unknown message", e.getMessage());
+                LOG.warn("Received unknown message", msg);
             }
         }
 
         @Override
-        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-            LOG.info("Exception received", e.getCause());
+        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+            LOG.info("Exception received", cause);
             try {
                 SettableFuture<Response> future = responseQueue.take();
-                future.setException(e.getCause());
+                future.setException(cause);
             } catch (InterruptedException ie) {
                 Thread.currentThread().interrupt();
                 LOG.warn("Interrupted handling exception", ie);
@@ -135,9 +139,9 @@ public class TSOClientRaw {
         }
 
         @Override
-        public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e)
+        public void channelInactive(ChannelHandlerContext ctx)
                 throws Exception {
-            LOG.info("Disconnected");
+            LOG.info("Inactive");
             try {
                 SettableFuture<Response> future = responseQueue.take();
                 future.setException(new ConnectionException());