You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/03/03 14:11:23 UTC

[2/3] tajo git commit: TAJO-527: Upgrade to Netty 4

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 860bc8e..f0dcd26 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -19,6 +19,10 @@
 package org.apache.tajo.pullserver;
 
 import com.google.common.collect.Lists;
+
+import io.netty.channel.*;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.http.*;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -53,15 +57,18 @@ import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.index.bst.BSTIndex;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.handler.codec.http.*;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-import org.jboss.netty.util.CharsetUtil;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.util.CharsetUtil;
+import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.util.concurrent.GlobalEventExecutor;
 
 import java.io.*;
 import java.net.InetSocketAddress;
@@ -72,16 +79,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
 public class TajoPullServerService extends AbstractService {
 
   private static final Log LOG = LogFactory.getLog(TajoPullServerService.class);
@@ -93,9 +92,9 @@ public class TajoPullServerService extends AbstractService {
   public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
 
   private int port;
-  private ChannelFactory selector;
-  private final ChannelGroup accepted = new DefaultChannelGroup();
-  private HttpPipelineFactory pipelineFact;
+  private ServerBootstrap selector;
+  private final ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+  private HttpChannelInitializer channelInitializer;
   private int sslFileBufferSize;
 
   private ApplicationId appId;
@@ -131,7 +130,7 @@ public class TajoPullServerService extends AbstractService {
   }
 
   @Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo")
-  static class ShuffleMetrics implements ChannelFutureListener {
+  static class ShuffleMetrics implements GenericFutureListener<ChannelFuture> {
     @Metric({"OutputBytes","PullServer output in bytes"})
     MutableCounterLong shuffleOutputBytes;
     @Metric({"Failed","# of failed shuffle outputs"})
@@ -212,7 +211,10 @@ public class TajoPullServerService extends AbstractService {
       int workerNum = conf.getInt("tajo.shuffle.rpc.server.worker-thread-num",
           Runtime.getRuntime().availableProcessors() * 2);
 
-      selector = RpcChannelFactory.createServerChannelFactory("PullServerAuxService", workerNum);
+      selector = RpcChannelFactory.createServerChannelFactory("PullServerAuxService", workerNum)
+                   .option(ChannelOption.TCP_NODELAY, true)
+                   .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+                   .childOption(ChannelOption.TCP_NODELAY, true);
 
       localFS = new LocalFileSystem();
 
@@ -228,23 +230,26 @@ public class TajoPullServerService extends AbstractService {
   // TODO change AbstractService to throw InterruptedException
   @Override
   public synchronized void serviceInit(Configuration conf) throws Exception {
-    ServerBootstrap bootstrap = new ServerBootstrap(selector);
+    ServerBootstrap bootstrap = selector.clone();
 
     try {
-      pipelineFact = new HttpPipelineFactory(conf);
+      channelInitializer = new HttpChannelInitializer(conf);
     } catch (Exception ex) {
       throw new RuntimeException(ex);
     }
-    bootstrap.setPipelineFactory(pipelineFact);
+    bootstrap.childHandler(channelInitializer)
+      .channel(NioServerSocketChannel.class);
 
     port = conf.getInt(ConfVars.PULLSERVER_PORT.varname,
         ConfVars.PULLSERVER_PORT.defaultIntVal);
-    Channel ch = bootstrap.bind(new InetSocketAddress(port));
+    ChannelFuture future = bootstrap.bind(new InetSocketAddress(port))
+        .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE)
+        .syncUninterruptibly();
 
-    accepted.add(ch);
-    port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
+    accepted.add(future.channel());
+    port = ((InetSocketAddress)future.channel().localAddress()).getPort();
     conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
-    pipelineFact.PullServer.setPort(port);
+    channelInitializer.PullServer.setPort(port);
     LOG.info(getName() + " listening on port " + port);
 
     sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
@@ -314,10 +319,19 @@ public class TajoPullServerService extends AbstractService {
   @Override
   public synchronized void stop() {
     try {
-      accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
-      ServerBootstrap bootstrap = new ServerBootstrap(selector);
-      bootstrap.releaseExternalResources();
-      pipelineFact.destroy();
+      accepted.close();
+      if (selector != null) {
+        if (selector.group() != null) {
+          selector.group().shutdownGracefully();
+        }
+        if (selector.childGroup() != null) {
+          selector.childGroup().shutdownGracefully();
+        }
+      }
+
+      if (channelInitializer != null) {
+        channelInitializer.destroy();
+      }
 
       localFS.close();
     } catch (Throwable t) {
@@ -337,12 +351,12 @@ public class TajoPullServerService extends AbstractService {
     }
   }
 
-  class HttpPipelineFactory implements ChannelPipelineFactory {
+  class HttpChannelInitializer extends ChannelInitializer<SocketChannel> {
 
     final PullServer PullServer;
     private SSLFactory sslFactory;
 
-    public HttpPipelineFactory(Configuration conf) throws Exception {
+    public HttpChannelInitializer(Configuration conf) throws Exception {
       PullServer = new PullServer(conf);
       if (conf.getBoolean(ConfVars.SHUFFLE_SSL_ENABLED_KEY.varname,
           ConfVars.SHUFFLE_SSL_ENABLED_KEY.defaultBoolVal)) {
@@ -358,8 +372,8 @@ public class TajoPullServerService extends AbstractService {
     }
 
     @Override
-    public ChannelPipeline getPipeline() throws Exception {
-      ChannelPipeline pipeline = Channels.pipeline();
+    protected void initChannel(SocketChannel channel) throws Exception {
+      ChannelPipeline pipeline = channel.pipeline();
       if (sslFactory != null) {
         pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
       }
@@ -367,10 +381,9 @@ public class TajoPullServerService extends AbstractService {
       int maxChunkSize = getConfig().getInt(ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.varname,
           ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.defaultIntVal);
       pipeline.addLast("codec", new HttpServerCodec(4096, 8192, maxChunkSize));
-      pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
+      pipeline.addLast("aggregator", new HttpObjectAggregator(1 << 16));
       pipeline.addLast("chunking", new ChunkedWriteHandler());
       pipeline.addLast("shuffle", PullServer);
-      return pipeline;
       // TODO factor security manager into pipeline
       // TODO factor out encode/decode to permit binary shuffle
       // TODO factor out decode of index to permit alt. models
@@ -408,31 +421,31 @@ public class TajoPullServerService extends AbstractService {
       this.numFiles = numFiles;
       this.remainFiles = new AtomicInteger(numFiles);
     }
-    public void decrementRemainFiles(FileRegion filePart, long fileStartTime) {
-      synchronized(remainFiles) {
-        long fileSendTime = System.currentTimeMillis() - fileStartTime;
-        if (fileSendTime > 20 * 1000) {
-          LOG.info("PullServer send too long time: filePos=" + filePart.getPosition() + ", fileLen=" + filePart.getCount());
-          numSlowFile++;
-        }
-        if (fileSendTime > maxTime) {
-          maxTime = fileSendTime;
-        }
-        if (fileSendTime < minTime) {
-          minTime = fileSendTime;
-        }
-        int remain = remainFiles.decrementAndGet();
-        if (remain <= 0) {
-          processingStatusMap.remove(requestUri);
-          LOG.info("PullServer processing status: totalTime=" + (System.currentTimeMillis() - startTime) + " ms, " +
-              "makeFileListTime=" + makeFileListTime + " ms, minTime=" + minTime + " ms, maxTime=" + maxTime + " ms, " +
-              "numFiles=" + numFiles + ", numSlowFile=" + numSlowFile);
-        }
+
+    public synchronized void decrementRemainFiles(FileRegion filePart, long fileStartTime) {
+      long fileSendTime = System.currentTimeMillis() - fileStartTime;
+      if (fileSendTime > 20 * 1000) {
+        LOG.info("PullServer send too long time: filePos=" + filePart.position() + ", fileLen=" + filePart.count());
+        numSlowFile++;
+      }
+      if (fileSendTime > maxTime) {
+        maxTime = fileSendTime;
+      }
+      if (fileSendTime < minTime) {
+        minTime = fileSendTime;
+      }
+      int remain = remainFiles.decrementAndGet();
+      if (remain <= 0) {
+        processingStatusMap.remove(requestUri);
+        LOG.info("PullServer processing status: totalTime=" + (System.currentTimeMillis() - startTime) + " ms, "
+            + "makeFileListTime=" + makeFileListTime + " ms, minTime=" + minTime + " ms, maxTime=" + maxTime + " ms, "
+            + "numFiles=" + numFiles + ", numSlowFile=" + numSlowFile);
       }
     }
   }
 
-  class PullServer extends SimpleChannelUpstreamHandler {
+  @ChannelHandler.Sharable
+  class PullServer extends SimpleChannelInboundHandler<FullHttpRequest> {
 
     private final Configuration conf;
 //    private final IndexCache indexCache;
@@ -466,69 +479,58 @@ public class TajoPullServerService extends AbstractService {
     }
 
     @Override
-    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
-        throws Exception {
-
-      accepted.add(evt.getChannel());
+    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+      accepted.add(ctx.channel());
       LOG.info(String.format("Current number of shuffle connections (%d)", accepted.size()));
-      super.channelOpen(ctx, evt);
-
+      super.channelRegistered(ctx);
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-        throws Exception {
+    public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request)
+            throws Exception {
 
-      HttpRequest request = (HttpRequest) e.getMessage();
-      if (request.getMethod() != GET) {
-        sendError(ctx, METHOD_NOT_ALLOWED);
+      if (request.getMethod() != HttpMethod.GET) {
+        sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
         return;
       }
 
       ProcessingStatus processingStatus = new ProcessingStatus(request.getUri().toString());
       processingStatusMap.put(request.getUri().toString(), processingStatus);
       // Parsing the URL into key-values
-      final Map<String, List<String>> params =
-          new QueryStringDecoder(request.getUri()).getParameters();
+      final Map<String, List<String>> params = new QueryStringDecoder(request.getUri()).parameters();
       final List<String> types = params.get("type");
       final List<String> qids = params.get("qid");
       final List<String> taskIdList = params.get("ta");
-      final List<String> stageIds = params.get("sid");
+      final List<String> subQueryIds = params.get("sid");
       final List<String> partIds = params.get("p");
       final List<String> offsetList = params.get("offset");
       final List<String> lengthList = params.get("length");
 
-      if (types == null || stageIds == null || qids == null || partIds == null) {
-        sendError(ctx, "Required queryId, type, stage Id, and part id",
-            BAD_REQUEST);
+      if (types == null || subQueryIds == null || qids == null || partIds == null) {
+        sendError(ctx, "Required queryId, type, subquery Id, and part id", HttpResponseStatus.BAD_REQUEST);
         return;
       }
 
-      if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) {
-        sendError(ctx, "Required qids, type, taskIds, stage Id, and part id",
-            BAD_REQUEST);
+      if (qids.size() != 1 && types.size() != 1 || subQueryIds.size() != 1) {
+        sendError(ctx, "Required qids, type, taskIds, subquery Id, and part id", HttpResponseStatus.BAD_REQUEST);
         return;
       }
 
       String partId = partIds.get(0);
       String queryId = qids.get(0);
       String shuffleType = types.get(0);
-      String sid = stageIds.get(0);
+      String sid = subQueryIds.get(0);
 
       long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
       long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
 
-      if (!shuffleType.equals("h") && !shuffleType.equals("s") && taskIdList == null) {
-        sendError(ctx, "Required taskIds", BAD_REQUEST);
-      }
-
       List<String> taskIds = splitMaps(taskIdList);
 
       String queryBaseDir = queryId.toString() + "/output";
 
       if (LOG.isDebugEnabled()) {
-        LOG.debug("PullServer request param: shuffleType=" + shuffleType +
-            ", sid=" + sid + ", partId=" + partId + ", taskIds=" + taskIdList);
+        LOG.debug("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
+            + ", taskIds=" + taskIdList);
 
         // the working dir of tajo worker for each query
         LOG.debug("PullServer baseDir: " + conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname) + "/" + queryBaseDir);
@@ -539,13 +541,14 @@ public class TajoPullServerService extends AbstractService {
       // if a stage requires a range shuffle
       if (shuffleType.equals("r")) {
         String ta = taskIds.get(0);
-        if(!lDirAlloc.ifExists(queryBaseDir + "/" + sid + "/" + ta + "/output/", conf)){
-          LOG.warn(e);
-          sendError(ctx, NO_CONTENT);
+        String pathString = queryBaseDir + "/" + sid + "/" + ta + "/output/";
+        if (!lDirAlloc.ifExists(pathString, conf)) {
+          LOG.warn(pathString + "does not exist.");
+          sendError(ctx, HttpResponseStatus.NO_CONTENT);
           return;
         }
-        Path path = localFS.makeQualified(
-            lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta + "/output/", conf));
+        Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta
+            + "/output/", conf));
         String startKey = params.get("start").get(0);
         String endKey = params.get("end").get(0);
         boolean last = params.get("final") != null;
@@ -555,7 +558,7 @@ public class TajoPullServerService extends AbstractService {
           chunk = getFileCunks(path, startKey, endKey, last);
         } catch (Throwable t) {
           LOG.error("ERROR Request: " + request.getUri(), t);
-          sendError(ctx, "Cannot get file chunks to be sent", BAD_REQUEST);
+          sendError(ctx, "Cannot get file chunks to be sent", HttpResponseStatus.BAD_REQUEST);
           return;
         }
         if (chunk != null) {
@@ -568,7 +571,7 @@ public class TajoPullServerService extends AbstractService {
         String partPath = queryBaseDir + "/" + sid + "/hash-shuffle/" + partParentId + "/" + partId;
         if (!lDirAlloc.ifExists(partPath, conf)) {
           LOG.warn("Partition shuffle file not exists: " + partPath);
-          sendError(ctx, NO_CONTENT);
+          sendError(ctx, HttpResponseStatus.NO_CONTENT);
           return;
         }
 
@@ -581,7 +584,7 @@ public class TajoPullServerService extends AbstractService {
         if (startPos >= file.length()) {
           String errorMessage = "Start pos[" + startPos + "] great than file length [" + file.length() + "]";
           LOG.error(errorMessage);
-          sendError(ctx, errorMessage, BAD_REQUEST);
+          sendError(ctx, errorMessage, HttpResponseStatus.BAD_REQUEST);
           return;
         }
         LOG.info("RequestURL: " + request.getUri() + ", fileLen=" + file.length());
@@ -589,44 +592,53 @@ public class TajoPullServerService extends AbstractService {
         chunks.add(chunk);
       } else {
         LOG.error("Unknown shuffle type: " + shuffleType);
-        sendError(ctx, "Unknown shuffle type:" + shuffleType, BAD_REQUEST);
+        sendError(ctx, "Unknown shuffle type:" + shuffleType, HttpResponseStatus.BAD_REQUEST);
         return;
       }
 
       processingStatus.setNumFiles(chunks.size());
       processingStatus.makeFileListTime = System.currentTimeMillis() - processingStatus.startTime;
       // Write the content.
-      Channel ch = e.getChannel();
       if (chunks.size() == 0) {
-        HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT);
-        ch.write(response);
-        if (!isKeepAlive(request)) {
-          ch.close();
+        HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
+
+        if (!HttpHeaders.isKeepAlive(request)) {
+          ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+        } else {
+          response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+          ctx.writeAndFlush(response);
         }
-      }  else {
+      } else {
         FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
-        HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+        ChannelFuture writeFuture = null;
+        HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
         long totalSize = 0;
         for (FileChunk chunk : file) {
           totalSize += chunk.length();
         }
-        setContentLength(response, totalSize);
+        HttpHeaders.setContentLength(response, totalSize);
 
+        if (HttpHeaders.isKeepAlive(request)) {
+          response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+        }
         // Write the initial line and the header.
-        ch.write(response);
-
-        ChannelFuture writeFuture = null;
+        writeFuture = ctx.write(response);
 
         for (FileChunk chunk : file) {
-          writeFuture = sendFile(ctx, ch, chunk, request.getUri().toString());
+          writeFuture = sendFile(ctx, chunk, request.getUri().toString());
           if (writeFuture == null) {
-            sendError(ctx, NOT_FOUND);
+            sendError(ctx, HttpResponseStatus.NOT_FOUND);
             return;
           }
         }
+        if (ctx.pipeline().get(SslHandler.class) == null) {
+          writeFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+        } else {
+          ctx.flush();
+        }
 
         // Decide whether to close the connection or not.
-        if (!isKeepAlive(request)) {
+        if (!HttpHeaders.isKeepAlive(request)) {
           // Close the connection when the whole content is written out.
           writeFuture.addListener(ChannelFutureListener.CLOSE);
         }
@@ -634,19 +646,18 @@ public class TajoPullServerService extends AbstractService {
     }
 
     private ChannelFuture sendFile(ChannelHandlerContext ctx,
-                                   Channel ch,
                                    FileChunk file,
                                    String requestUri) throws IOException {
       long startTime = System.currentTimeMillis();
-      RandomAccessFile spill = null;
+      RandomAccessFile spill = null;      
       ChannelFuture writeFuture;
       try {
         spill = new RandomAccessFile(file.getFile(), "r");
-        if (ch.getPipeline().get(SslHandler.class) == null) {
+        if (ctx.pipeline().get(SslHandler.class) == null) {
           final FadvisedFileRegion filePart = new FadvisedFileRegion(spill,
               file.startOffset(), file.length(), manageOsCache, readaheadLength,
               readaheadPool, file.getFile().getAbsolutePath());
-          writeFuture = ch.write(filePart);
+          writeFuture = ctx.write(filePart);
           writeFuture.addListener(new FileCloseListener(filePart, requestUri, startTime, TajoPullServerService.this));
         } else {
           // HTTPS cannot be done with zero copy.
@@ -654,7 +665,7 @@ public class TajoPullServerService extends AbstractService {
               file.startOffset(), file.length(), sslFileBufferSize,
               manageOsCache, readaheadLength, readaheadPool,
               file.getFile().getAbsolutePath());
-          writeFuture = ch.write(chunk);
+          writeFuture = ctx.write(new HttpChunkedInput(chunk));
         }
       } catch (FileNotFoundException e) {
         LOG.info(file.getFile() + " not found");
@@ -678,22 +689,20 @@ public class TajoPullServerService extends AbstractService {
 
     private void sendError(ChannelHandlerContext ctx, String message,
         HttpResponseStatus status) {
-      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
-      response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
-      response.setContent(
-        ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
+      FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status,
+          Unpooled.copiedBuffer(message, CharsetUtil.UTF_8));
+      response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");
 
       // Close the connection as soon as the error message is sent.
-      ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+      ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
         throws Exception {
-      LOG.error(e.getCause().getMessage(), e.getCause());
-      //if channel.close() is not called, never closed files in this request
-      if (ctx.getChannel().isConnected()){
-        ctx.getChannel().close();
+      LOG.error(cause.getMessage(), cause);
+      if (ctx.channel().isOpen()) {
+        ctx.channel().close();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
index 5591bba..fb91094 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
@@ -20,6 +20,7 @@ package org.apache.tajo.pullserver.retriever;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.ExecutionBlockId;
@@ -27,9 +28,10 @@ import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.TaskId;
 import org.apache.tajo.pullserver.FileAccessForbiddenException;
 import org.apache.tajo.util.TajoIdUtils;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.QueryStringDecoder;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.QueryStringDecoder;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -67,7 +69,7 @@ public class AdvancedDataRetriever implements DataRetriever {
       throws IOException {
 
     final Map<String, List<String>> params =
-      new QueryStringDecoder(request.getUri()).getParameters();
+      new QueryStringDecoder(request.getUri()).parameters();
 
     if (!params.containsKey("qid")) {
       throw new FileNotFoundException("No such qid: " + params.containsKey("qid"));

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java
index 8f55f7b..0a1ad41 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java
@@ -18,8 +18,8 @@
 
 package org.apache.tajo.pullserver.retriever;
 
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.http.HttpRequest;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.HttpRequest;
 
 import java.io.IOException;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java
index dc63929..e26bcd6 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java
@@ -18,11 +18,12 @@
 
 package org.apache.tajo.pullserver.retriever;
 
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.http.HttpRequest;
 import org.apache.tajo.pullserver.FileAccessForbiddenException;
 import org.apache.tajo.pullserver.HttpDataServerHandler;
 
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.HttpRequest;
+
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-rpc/pom.xml b/tajo-rpc/pom.xml
index d0037ca..2dc3765 100644
--- a/tajo-rpc/pom.xml
+++ b/tajo-rpc/pom.xml
@@ -138,7 +138,15 @@
   <dependencies>
     <dependency>
       <groupId>io.netty</groupId>
-      <artifactId>netty</artifactId>
+      <artifactId>netty-transport</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-codec</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-handler</artifactId>
     </dependency>
     <dependency>
       <groupId>commons-logging</groupId>

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
index 4b1842e..5845229 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
@@ -20,12 +20,15 @@ package org.apache.tajo.rpc;
 
 import com.google.protobuf.Descriptors.MethodDescriptor;
 import com.google.protobuf.*;
+
+import io.netty.channel.*;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.rpc.RpcProtos.RpcRequest;
 import org.apache.tajo.rpc.RpcProtos.RpcResponse;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.concurrent.GenericFutureListener;
 
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
@@ -38,8 +41,7 @@ import static org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
 public class AsyncRpcClient extends NettyClientBase {
   private static final Log LOG = LogFactory.getLog(AsyncRpcClient.class);
 
-  private final ChannelUpstreamHandler handler;
-  private final ChannelPipelineFactory pipeFactory;
+  private final ChannelInitializer<Channel> initializer;
   private final ProxyRpcChannel rpcChannel;
 
   private final AtomicInteger sequence = new AtomicInteger(0);
@@ -56,7 +58,7 @@ public class AsyncRpcClient extends NettyClientBase {
    * new an instance through this constructor.
    */
   AsyncRpcClient(final Class<?> protocol,
-                        final InetSocketAddress addr, ClientSocketChannelFactory factory, int retries)
+                        final InetSocketAddress addr, int retries)
       throws ClassNotFoundException, NoSuchMethodException, ConnectTimeoutException {
 
     this.protocol = protocol;
@@ -65,10 +67,9 @@ public class AsyncRpcClient extends NettyClientBase {
     Class<?> serviceClass = Class.forName(serviceClassName);
     stubMethod = serviceClass.getMethod("newStub", RpcChannel.class);
 
-    this.handler = new ClientChannelUpstreamHandler();
-    pipeFactory = new ProtoPipelineFactory(handler,
+    initializer = new ProtoChannelInitializer(new ClientChannelInboundHandler(), 
         RpcResponse.getDefaultInstance());
-    super.init(addr, pipeFactory, factory, retries);
+    super.init(addr, initializer, retries);
     rpcChannel = new ProxyRpcChannel();
     this.key = new RpcConnectionKey(addr, protocol, true);
   }
@@ -83,7 +84,7 @@ public class AsyncRpcClient extends NettyClientBase {
     try {
       return (T) stubMethod.invoke(null, rpcChannel);
     } catch (Exception e) {
-      throw new RuntimeException(e.getMessage(), e);
+      throw new RemoteException(e.getMessage(), e);
     }
   }
 
@@ -91,12 +92,32 @@ public class AsyncRpcClient extends NettyClientBase {
     return this.rpcChannel;
   }
 
+  protected void sendExceptions(String message) {
+    for(Map.Entry<Integer, ResponseCallback> callbackEntry: requests.entrySet()) {
+      ResponseCallback callback = callbackEntry.getValue();
+      Integer id = callbackEntry.getKey();
+
+      RpcResponse.Builder responseBuilder = RpcResponse.newBuilder()
+          .setErrorMessage(message)
+          .setId(id);
+
+      callback.run(responseBuilder.build());
+    }
+  }
+
+  @Override
+  public void close() {
+    sendExceptions("AsyncRpcClient terminates all the connections");
+
+    super.close();
+  }
+
   private class ProxyRpcChannel implements RpcChannel {
-    private final ClientChannelUpstreamHandler handler;
+    private final ClientChannelInboundHandler handler;
 
     public ProxyRpcChannel() {
-      this.handler = getChannel().getPipeline()
-          .get(ClientChannelUpstreamHandler.class);
+      this.handler = getChannel().pipeline()
+          .get(ClientChannelInboundHandler.class);
 
       if (handler == null) {
         throw new IllegalArgumentException("Channel does not have " +
@@ -117,7 +138,17 @@ public class AsyncRpcClient extends NettyClientBase {
       handler.registerCallback(nextSeqId,
           new ResponseCallback(controller, responseType, done));
 
-      getChannel().write(rpcRequest);
+      ChannelPromise channelPromise = getChannel().newPromise();
+      channelPromise.addListener(new GenericFutureListener<ChannelFuture>() {
+
+        @Override
+        public void operationComplete(ChannelFuture future) throws Exception {
+          if (!future.isSuccess()) {
+            handler.exceptionCaught(null, new ServiceException(future.cause()));
+          }
+        }
+      });
+      getChannel().writeAndFlush(rpcRequest, channelPromise);
     }
 
     private Message buildRequest(int seqId,
@@ -180,10 +211,11 @@ public class AsyncRpcClient extends NettyClientBase {
   private String getErrorMessage(String message) {
     return "Exception [" + protocol.getCanonicalName() +
         "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress)
-        getChannel().getRemoteAddress()) + ")]: " + message;
+        getChannel().remoteAddress()) + ")]: " + message;
   }
 
-  private class ClientChannelUpstreamHandler extends SimpleChannelUpstreamHandler {
+  @ChannelHandler.Sharable
+  private class ClientChannelInboundHandler extends ChannelInboundHandlerAdapter {
 
     synchronized void registerCallback(int seqId, ResponseCallback callback) {
 
@@ -196,37 +228,39 @@ public class AsyncRpcClient extends NettyClientBase {
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+    public void channelRead(ChannelHandlerContext ctx, Object msg)
         throws Exception {
-      RpcResponse response = (RpcResponse) e.getMessage();
-      ResponseCallback callback = requests.remove(response.getId());
+      if (msg instanceof RpcResponse) {
+        try {
+          RpcResponse response = (RpcResponse) msg;
+          ResponseCallback callback = requests.remove(response.getId());
 
-      if (callback == null) {
-        LOG.warn("Dangling rpc call");
-      } else {
-        callback.run(response);
+          if (callback == null) {
+            LOG.warn("Dangling rpc call");
+          } else {
+            callback.run(response);
+          }
+        } finally {
+          ReferenceCountUtil.release(msg);
+        }
       }
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
         throws Exception {
-      LOG.error(getRemoteAddress() + "," + protocol + "," + e.getCause().getMessage(), e.getCause());
-
-      for(Map.Entry<Integer, ResponseCallback> callbackEntry: requests.entrySet()) {
-        ResponseCallback callback = callbackEntry.getValue();
-        Integer id = callbackEntry.getKey();
-
-        RpcResponse.Builder responseBuilder = RpcResponse.newBuilder()
-            .setErrorMessage(e.toString())
-            .setId(id);
+      LOG.error(getRemoteAddress() + "," + protocol + "," + cause.getMessage(), cause);
 
-        callback.run(responseBuilder.build());
-      }
+      sendExceptions(cause.getMessage());
+      
       if(LOG.isDebugEnabled()) {
-        LOG.error("" + e.getCause(), e.getCause());
+        LOG.error(cause.getMessage(), cause);
       } else {
-        LOG.error("RPC Exception:" + e.getCause());
+        LOG.error("RPC Exception:" + cause.getMessage());
+      }
+      
+      if (ctx != null && ctx.channel().isActive()) {
+        ctx.channel().close();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
index f9c5d3b..3b5a747 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
@@ -18,16 +18,16 @@
 
 package org.apache.tajo.rpc;
 
+import com.google.protobuf.*;
 import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.Service;
+
+import io.netty.channel.*;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.rpc.RpcProtos.RpcRequest;
 import org.apache.tajo.rpc.RpcProtos.RpcResponse;
-import org.jboss.netty.channel.*;
+
+import io.netty.util.ReferenceCountUtil;
 
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
@@ -36,7 +36,7 @@ public class AsyncRpcServer extends NettyServerBase {
   private static final Log LOG = LogFactory.getLog(AsyncRpcServer.class);
 
   private final Service service;
-  private final ChannelPipelineFactory pipeline;
+  private final ChannelInitializer<Channel> initializer;
 
   public AsyncRpcServer(final Class<?> protocol,
                         final Object instance,
@@ -52,87 +52,97 @@ public class AsyncRpcServer extends NettyServerBase {
     Method method = serviceClass.getMethod("newReflectiveService", interfaceClass);
     this.service = (Service) method.invoke(null, instance);
 
-    ServerHandler handler = new ServerHandler();
-    this.pipeline = new ProtoPipelineFactory(handler,
-        RpcRequest.getDefaultInstance());
-    super.init(this.pipeline, workerNum);
+    this.initializer = new ProtoChannelInitializer(new ServerHandler(), RpcRequest.getDefaultInstance());
+    super.init(this.initializer, workerNum);
   }
 
-  private class ServerHandler extends SimpleChannelUpstreamHandler {
+  @ChannelHandler.Sharable
+  private class ServerHandler extends ChannelInboundHandlerAdapter {
 
     @Override
-    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
-        throws Exception {
-
-      accepted.add(evt.getChannel());
+    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+      accepted.add(ctx.channel());
       if(LOG.isDebugEnabled()){
         LOG.debug(String.format(serviceName + " accepted number of connections (%d)", accepted.size()));
       }
-      super.channelOpen(ctx, evt);
+      super.channelRegistered(ctx);
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+      accepted.remove(ctx.channel());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(serviceName + " closes a connection. The number of current connections are " + accepted.size());
+      }
+      super.channelUnregistered(ctx);
+    }
+
+    @Override
+    public void channelRead(final ChannelHandlerContext ctx, Object msg)
         throws Exception {
+      if (msg instanceof RpcRequest) {
+        try {
+          final RpcRequest request = (RpcRequest) msg;
 
-      final RpcRequest request = (RpcRequest) e.getMessage();
+          String methodName = request.getMethodName();
+          MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName);
 
-      String methodName = request.getMethodName();
-      MethodDescriptor methodDescriptor = service.getDescriptorForType().
-          findMethodByName(methodName);
+          if (methodDescriptor == null) {
+            throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName));
+          }
 
-      if (methodDescriptor == null) {
-        throw new RemoteCallException(request.getId(),
-            new NoSuchMethodException(methodName));
-      }
+          Message paramProto = null;
+          if (request.hasRequestMessage()) {
+            try {
+              paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType()
+                  .mergeFrom(request.getRequestMessage()).build();
+            } catch (Throwable t) {
+              throw new RemoteCallException(request.getId(), methodDescriptor, t);
+            }
+          }
 
-      Message paramProto = null;
-      if (request.hasRequestMessage()) {
-        try {
-          paramProto = service.getRequestPrototype(methodDescriptor)
-                  .newBuilderForType().mergeFrom(request.getRequestMessage()).
-                  build();
-        } catch (Throwable t) {
-          throw new RemoteCallException(request.getId(), methodDescriptor, t);
-        }
-      }
+          final RpcController controller = new NettyRpcController();
 
-      final Channel channel = e.getChannel();
-      final RpcController controller = new NettyRpcController();
+          RpcCallback<Message> callback = !request.hasId() ? null : new RpcCallback<Message>() {
 
-      RpcCallback<Message> callback =
-          !request.hasId() ? null : new RpcCallback<Message>() {
+            public void run(Message returnValue) {
 
-        public void run(Message returnValue) {
+              RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId());
 
-          RpcResponse.Builder builder = RpcResponse.newBuilder()
-              .setId(request.getId());
+              if (returnValue != null) {
+                builder.setResponseMessage(returnValue.toByteString());
+              }
 
-          if (returnValue != null) {
-            builder.setResponseMessage(returnValue.toByteString());
-          }
+              if (controller.failed()) {
+                builder.setErrorMessage(controller.errorText());
+              }
 
-          if (controller.failed()) {
-            builder.setErrorMessage(controller.errorText());
-          }
+              ctx.writeAndFlush(builder.build());
+            }
+          };
 
-          channel.write(builder.build());
-        }
-      };
+          service.callMethod(methodDescriptor, controller, paramProto, callback);
 
-      service.callMethod(methodDescriptor, controller, paramProto, callback);
+        } finally {
+          ReferenceCountUtil.release(msg);
+        }
+      }
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
         throws Exception{
-
-      if (e.getCause() instanceof RemoteCallException) {
-        RemoteCallException callException = (RemoteCallException) e.getCause();
-        e.getChannel().write(callException.getResponse());
+      if (cause instanceof RemoteCallException) {
+        RemoteCallException callException = (RemoteCallException) cause;
+        ctx.writeAndFlush(callException.getResponse());
       } else {
-        LOG.error(e.getCause());
+        LOG.error(cause.getMessage());
+      }
+      
+      if (ctx != null && ctx.channel().isActive()) {
+        ctx.channel().close();
       }
     }
+    
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
index 869919c..4ec5718 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
@@ -18,22 +18,23 @@
 
 package org.apache.tajo.rpc;
 
-import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.*;
 import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
+
+import io.netty.channel.*;
+import io.netty.util.concurrent.*;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.rpc.RpcProtos.RpcRequest;
 import org.apache.tajo.rpc.RpcProtos.RpcResponse;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+
+import io.netty.util.ReferenceCountUtil;
 
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.concurrent.*;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
@@ -41,8 +42,7 @@ import static org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
 public class BlockingRpcClient extends NettyClientBase {
   private static final Log LOG = LogFactory.getLog(RpcProtos.class);
 
-  private final ChannelUpstreamHandler handler;
-  private final ChannelPipelineFactory pipeFactory;
+  private final ChannelInitializer<Channel> initializer;
   private final ProxyRpcChannel rpcChannel;
 
   private final AtomicInteger sequence = new AtomicInteger(0);
@@ -59,7 +59,7 @@ public class BlockingRpcClient extends NettyClientBase {
    * new an instance through this constructor.
    */
   BlockingRpcClient(final Class<?> protocol,
-                           final InetSocketAddress addr, ClientSocketChannelFactory factory, int retries)
+                           final InetSocketAddress addr, int retries)
       throws ClassNotFoundException, NoSuchMethodException, ConnectTimeoutException {
 
     this.protocol = protocol;
@@ -69,10 +69,8 @@ public class BlockingRpcClient extends NettyClientBase {
     stubMethod = serviceClass.getMethod("newBlockingStub",
         BlockingRpcChannel.class);
 
-    this.handler = new ClientChannelUpstreamHandler();
-    pipeFactory = new ProtoPipelineFactory(handler,
-        RpcResponse.getDefaultInstance());
-    super.init(addr, pipeFactory, factory, retries);
+    initializer = new ProtoChannelInitializer(new ClientChannelInboundHandler(), RpcResponse.getDefaultInstance());
+    super.init(addr, initializer, retries);
     rpcChannel = new ProxyRpcChannel();
 
     this.key = new RpcConnectionKey(addr, protocol, false);
@@ -96,14 +94,24 @@ public class BlockingRpcClient extends NettyClientBase {
     return this.rpcChannel;
   }
 
+  @Override
+  public void close() {
+    for(ProtoCallFuture callback: requests.values()) {
+      callback.setFailed("BlockingRpcClient terminates all the connections",
+          new ServiceException("BlockingRpcClient terminates all the connections"));
+    }
+
+    super.close();
+  }
+
   private class ProxyRpcChannel implements BlockingRpcChannel {
 
-    private final ClientChannelUpstreamHandler handler;
+    private final ClientChannelInboundHandler handler;
 
     public ProxyRpcChannel() {
 
-      this.handler = getChannel().getPipeline().
-          get(ClientChannelUpstreamHandler.class);
+      this.handler = getChannel().pipeline().
+          get(ClientChannelInboundHandler.class);
 
       if (handler == null) {
         throw new IllegalArgumentException("Channel does not have " +
@@ -125,10 +133,20 @@ public class BlockingRpcClient extends NettyClientBase {
       ProtoCallFuture callFuture =
           new ProtoCallFuture(controller, responsePrototype);
       requests.put(nextSeqId, callFuture);
-      getChannel().write(rpcRequest);
+
+      ChannelPromise channelPromise = getChannel().newPromise();
+      channelPromise.addListener(new GenericFutureListener<ChannelFuture>() {
+        @Override
+        public void operationComplete(ChannelFuture future) throws Exception {
+          if (!future.isSuccess()) {
+            handler.exceptionCaught(null, new ServiceException(future.cause()));
+          }
+        }
+      });
+      getChannel().writeAndFlush(rpcRequest, channelPromise);
 
       try {
-        return callFuture.get();
+        return callFuture.get(60, TimeUnit.SECONDS);
       } catch (Throwable t) {
         if (t instanceof ExecutionException) {
           Throwable cause = t.getCause();
@@ -159,7 +177,7 @@ public class BlockingRpcClient extends NettyClientBase {
     if(protocol != null && getChannel() != null) {
       return protocol.getName() +
           "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress)
-          getChannel().getRemoteAddress()) + "): " + message;
+          getChannel().remoteAddress()) + "): " + message;
     } else {
       return "Exception " + message;
     }
@@ -168,55 +186,64 @@ public class BlockingRpcClient extends NettyClientBase {
   private TajoServiceException makeTajoServiceException(RpcResponse response, Throwable cause) {
     if(protocol != null && getChannel() != null) {
       return new TajoServiceException(response.getErrorMessage(), cause, protocol.getName(),
-          RpcUtils.normalizeInetSocketAddress((InetSocketAddress)getChannel().getRemoteAddress()));
+          RpcUtils.normalizeInetSocketAddress((InetSocketAddress)getChannel().remoteAddress()));
     } else {
       return new TajoServiceException(response.getErrorMessage());
     }
   }
 
-  private class ClientChannelUpstreamHandler extends SimpleChannelUpstreamHandler {
+  @ChannelHandler.Sharable
+  private class ClientChannelInboundHandler extends ChannelInboundHandlerAdapter {
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+    public void channelRead(ChannelHandlerContext ctx, Object msg)
         throws Exception {
 
-      RpcResponse rpcResponse = (RpcResponse) e.getMessage();
-      ProtoCallFuture callback = requests.remove(rpcResponse.getId());
+      if (msg instanceof RpcResponse) {
+        try {
+          RpcResponse rpcResponse = (RpcResponse) msg;
+          ProtoCallFuture callback = requests.remove(rpcResponse.getId());
 
-      if (callback == null) {
-        LOG.warn("Dangling rpc call");
-      } else {
-        if (rpcResponse.hasErrorMessage()) {
-          callback.setFailed(rpcResponse.getErrorMessage(),
-              makeTajoServiceException(rpcResponse, new ServiceException(rpcResponse.getErrorTrace())));
-          throw new RemoteException(getErrorMessage(rpcResponse.getErrorMessage()));
-        } else {
-          Message responseMessage;
-
-          if (!rpcResponse.hasResponseMessage()) {
-            responseMessage = null;
+          if (callback == null) {
+            LOG.warn("Dangling rpc call");
           } else {
-            responseMessage =
-                callback.returnType.newBuilderForType().
-                    mergeFrom(rpcResponse.getResponseMessage()).build();
+            if (rpcResponse.hasErrorMessage()) {
+              callback.setFailed(rpcResponse.getErrorMessage(),
+                  makeTajoServiceException(rpcResponse, new ServiceException(rpcResponse.getErrorTrace())));
+              throw new RemoteException(getErrorMessage(rpcResponse.getErrorMessage()));
+            } else {
+              Message responseMessage;
+
+              if (!rpcResponse.hasResponseMessage()) {
+                responseMessage = null;
+              } else {
+                responseMessage = callback.returnType.newBuilderForType().mergeFrom(rpcResponse.getResponseMessage())
+                    .build();
+              }
+
+              callback.setResponse(responseMessage);
+            }
           }
-
-          callback.setResponse(responseMessage);
+        } finally {
+          ReferenceCountUtil.release(msg);
         }
       }
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
         throws Exception {
-      e.getChannel().close();
       for(ProtoCallFuture callback: requests.values()) {
-        callback.setFailed(e.getCause().getMessage(), e.getCause());
+        callback.setFailed(cause.getMessage(), cause);
       }
+      
       if(LOG.isDebugEnabled()) {
-        LOG.error("" + e.getCause().getMessage(), e.getCause());
+        LOG.error("" + cause.getMessage(), cause);
       } else {
-        LOG.error("RPC Exception:" + e.getCause().getMessage());
+        LOG.error("RPC Exception:" + cause.getMessage());
+      }
+      if (ctx != null && ctx.channel().isActive()) {
+        ctx.channel().close();
       }
     }
   }
@@ -253,6 +280,9 @@ public class BlockingRpcClient extends NettyClientBase {
     public Message get(long timeout, TimeUnit unit)
         throws InterruptedException, ExecutionException, TimeoutException {
       if(sem.tryAcquire(timeout, unit)) {
+        if (ee != null) {
+          throw ee;
+        }
         return response;
       } else {
         throw new TimeoutException();

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
index 9e0d57c..0ce359f 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
@@ -22,19 +22,22 @@ import com.google.protobuf.BlockingService;
 import com.google.protobuf.Descriptors.MethodDescriptor;
 import com.google.protobuf.Message;
 import com.google.protobuf.RpcController;
+
+import io.netty.channel.*;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.channel.*;
 import org.apache.tajo.rpc.RpcProtos.RpcRequest;
 import org.apache.tajo.rpc.RpcProtos.RpcResponse;
 
+import io.netty.util.ReferenceCountUtil;
+
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 
 public class BlockingRpcServer extends NettyServerBase {
   private static Log LOG = LogFactory.getLog(BlockingRpcServer.class);
   private final BlockingService service;
-  private final ChannelPipelineFactory pipeline;
+  private final ChannelInitializer<Channel> initializer;
 
   public BlockingRpcServer(final Class<?> protocol,
                            final Object instance,
@@ -53,78 +56,92 @@ public class BlockingRpcServer extends NettyServerBase {
         "newReflectiveBlockingService", interfaceClass);
 
     this.service = (BlockingService) method.invoke(null, instance);
-    this.pipeline = new ProtoPipelineFactory(new ServerHandler(),
-        RpcRequest.getDefaultInstance());
+    this.initializer = new ProtoChannelInitializer(new ServerHandler(), RpcRequest.getDefaultInstance());
 
-    super.init(this.pipeline, workerNum);
+    super.init(this.initializer, workerNum);
   }
 
-  private class ServerHandler extends SimpleChannelUpstreamHandler {
+  @ChannelHandler.Sharable
+  private class ServerHandler extends ChannelInboundHandlerAdapter {
 
     @Override
-    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
-        throws Exception {
-
-      accepted.add(evt.getChannel());
+    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+      accepted.add(ctx.channel());
       if(LOG.isDebugEnabled()){
         LOG.debug(String.format(serviceName + " accepted number of connections (%d)", accepted.size()));
       }
-      super.channelOpen(ctx, evt);
+      super.channelRegistered(ctx);
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-        throws Exception {
-      final RpcRequest request = (RpcRequest) e.getMessage();
+    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+      accepted.remove(ctx.channel());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(serviceName + " closes a connection. The number of current connections are " + accepted.size());
+      }
+      super.channelUnregistered(ctx);
+    }
 
-      String methodName = request.getMethodName();
-      MethodDescriptor methodDescriptor =
-          service.getDescriptorForType().findMethodByName(methodName);
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg)
+        throws Exception {
 
-      if (methodDescriptor == null) {
-        throw new RemoteCallException(request.getId(),
-            new NoSuchMethodException(methodName));
-      }
-      Message paramProto = null;
-      if (request.hasRequestMessage()) {
+      if (msg instanceof RpcRequest) {
         try {
-          paramProto = service.getRequestPrototype(methodDescriptor)
-              .newBuilderForType().mergeFrom(request.getRequestMessage()).
-                  build();
-
-        } catch (Throwable t) {
-          throw new RemoteCallException(request.getId(), methodDescriptor, t);
+          final RpcRequest request = (RpcRequest) msg;
+
+          String methodName = request.getMethodName();
+          MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName);
+
+          if (methodDescriptor == null) {
+            throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName));
+          }
+          Message paramProto = null;
+          if (request.hasRequestMessage()) {
+            try {
+              paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType()
+                  .mergeFrom(request.getRequestMessage()).build();
+
+            } catch (Throwable t) {
+              throw new RemoteCallException(request.getId(), methodDescriptor, t);
+            }
+          }
+          Message returnValue;
+          RpcController controller = new NettyRpcController();
+
+          try {
+            returnValue = service.callBlockingMethod(methodDescriptor, controller, paramProto);
+          } catch (Throwable t) {
+            throw new RemoteCallException(request.getId(), methodDescriptor, t);
+          }
+
+          RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId());
+
+          if (returnValue != null) {
+            builder.setResponseMessage(returnValue.toByteString());
+          }
+
+          if (controller.failed()) {
+            builder.setErrorMessage(controller.errorText());
+          }
+          ctx.writeAndFlush(builder.build());
+        } finally {
+          ReferenceCountUtil.release(msg);
         }
       }
-      Message returnValue;
-      RpcController controller = new NettyRpcController();
-
-      try {
-        returnValue = service.callBlockingMethod(methodDescriptor,
-            controller, paramProto);
-      } catch (Throwable t) {
-        throw new RemoteCallException(request.getId(), methodDescriptor, t);
-      }
-
-      RpcResponse.Builder builder =
-          RpcResponse.newBuilder().setId(request.getId());
-
-      if (returnValue != null) {
-        builder.setResponseMessage(returnValue.toByteString());
-      }
-
-      if (controller.failed()) {
-        builder.setErrorMessage(controller.errorText());
-      }
-      e.getChannel().write(builder.build());
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-      if (e.getCause() instanceof RemoteCallException) {
-        RemoteCallException callException = (RemoteCallException) e.getCause();
-        e.getChannel().write(callException.getResponse());
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+      if (cause instanceof RemoteCallException) {
+        RemoteCallException callException = (RemoteCallException) cause;
+        ctx.writeAndFlush(callException.getResponse());
+      }
+      
+      if (ctx != null && ctx.channel().isActive()) {
+        ctx.channel().close();
       }
     }
+    
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java
index fd612a5..c4c3256 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java
@@ -50,14 +50,14 @@ public class CallFuture<T> implements RpcCallback<T>, Future<T> {
 
   @Override
   public boolean cancel(boolean mayInterruptIfRunning) {
-    // TODO - to be implemented
-    throw new UnsupportedOperationException();
+    controller.startCancel();
+    sem.release();
+    return controller.isCanceled();
   }
 
   @Override
   public boolean isCancelled() {
-    // TODO - to be implemented
-    throw new UnsupportedOperationException();
+    return controller.isCanceled();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java
index 1bf0ed8..4ba19a5 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java
@@ -24,9 +24,13 @@ import com.google.protobuf.RpcController;
 public class DefaultRpcController implements RpcController {
   private String errorText;
   private boolean error;
+  private boolean canceled;
 
   @Override
   public void reset() {
+    errorText = "";
+    error = false;
+    canceled = false;
   }
 
   @Override
@@ -41,6 +45,7 @@ public class DefaultRpcController implements RpcController {
 
   @Override
   public void startCancel() {
+    this.canceled = true;
   }
 
   @Override
@@ -51,7 +56,7 @@ public class DefaultRpcController implements RpcController {
 
   @Override
   public boolean isCanceled() {
-    return false;
+    return canceled;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index bc0c567..7b52178 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -18,12 +18,16 @@
 
 package org.apache.tajo.rpc;
 
+import io.netty.channel.*;
+
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.GenericFutureListener;
 
 import java.io.Closeable;
 import java.net.InetSocketAddress;
@@ -37,7 +41,7 @@ public abstract class NettyClientBase implements Closeable {
   private static final long PAUSE = 1000; // 1 sec
   private int numRetries;
 
-  protected ClientBootstrap bootstrap;
+  protected Bootstrap bootstrap;
   private ChannelFuture channelFuture;
 
   public NettyClientBase() {
@@ -46,55 +50,39 @@ public abstract class NettyClientBase implements Closeable {
   public abstract <T> T getStub();
   public abstract RpcConnectionPool.RpcConnectionKey getKey();
   
-  public void init(InetSocketAddress addr, ChannelPipelineFactory pipeFactory, ClientSocketChannelFactory factory, 
+  public void init(InetSocketAddress addr, ChannelInitializer<Channel> initializer, 
       int numRetries) throws ConnectTimeoutException {
     this.numRetries = numRetries;
     
-    init(addr, pipeFactory, factory);
+    init(addr, initializer);
   }
 
-  public void init(InetSocketAddress addr, ChannelPipelineFactory pipeFactory, ClientSocketChannelFactory factory)
+  public void init(InetSocketAddress addr, ChannelInitializer<Channel> initializer)
       throws ConnectTimeoutException {
-    this.bootstrap = new ClientBootstrap(factory);
-    this.bootstrap.setPipelineFactory(pipeFactory);
-    // TODO - should be configurable
-    this.bootstrap.setOption("connectTimeoutMillis", 10000);
-    this.bootstrap.setOption("connectResponseTimeoutMillis", 10000);
-    this.bootstrap.setOption("receiveBufferSize", 1048576 * 10);
-    this.bootstrap.setOption("tcpNoDelay", true);
-    this.bootstrap.setOption("keepAlive", true);
+    this.bootstrap = new Bootstrap();
+    this.bootstrap
+      .channel(NioSocketChannel.class)
+      .handler(initializer)
+      .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+      .option(ChannelOption.SO_REUSEADDR, true)
+      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
+      .option(ChannelOption.SO_RCVBUF, 1048576 * 10)
+      .option(ChannelOption.TCP_NODELAY, true);
 
     connect(addr);
   }
+
+  private void connectUsingNetty(InetSocketAddress address, GenericFutureListener<ChannelFuture> listener) {
+
+    this.channelFuture = bootstrap.clone().group(RpcChannelFactory.getSharedClientEventloopGroup())
+            .connect(address)
+            .addListener(listener);
+  }
   
   private void handleConnectionInternally(final InetSocketAddress addr) throws ConnectTimeoutException {
-    this.channelFuture = bootstrap.connect(addr);
-
     final CountDownLatch latch = new CountDownLatch(1);
-    this.channelFuture.addListener(new ChannelFutureListener() {
-      private final AtomicInteger retryCount = new AtomicInteger();
-      
-      @Override
-      public void operationComplete(ChannelFuture future) throws Exception {
-        if (!future.isSuccess()) {
-          if (numRetries > retryCount.getAndIncrement()) {
-            Thread.sleep(PAUSE);
-            channelFuture = bootstrap.connect(addr);
-            channelFuture.addListener(this);
-            
-            LOG.debug("Connecting to " + addr + " has been failed. Retrying to connect.");
-          }
-          else {
-            latch.countDown();
-
-            LOG.error("Max retry count has been exceeded. attempts=" + numRetries);
-          }
-        }
-        else {
-          latch.countDown();
-        }
-      }
-    });
+    GenericFutureListener<ChannelFuture> listener = new RetryConnectionListener(addr, latch);
+    connectUsingNetty(addr, listener);
 
     try {
       latch.await(CLIENT_CONNECTION_TIMEOUT_SEC, TimeUnit.SECONDS);
@@ -103,7 +91,7 @@ public abstract class NettyClientBase implements Closeable {
 
     if (!channelFuture.isSuccess()) {
       throw new ConnectTimeoutException("Connect error to " + addr +
-          " caused by " + ExceptionUtils.getMessage(channelFuture.getCause()));
+          " caused by " + ExceptionUtils.getMessage(channelFuture.cause()));
     }
   }
 
@@ -115,34 +103,67 @@ public abstract class NettyClientBase implements Closeable {
     handleConnectionInternally(addr);
   }
 
-  public boolean isConnected() {
-    return getChannel().isConnected();
+  class RetryConnectionListener implements GenericFutureListener<ChannelFuture> {
+    private final AtomicInteger retryCount = new AtomicInteger();
+    private final InetSocketAddress address;
+    private final CountDownLatch latch;
+
+    RetryConnectionListener(InetSocketAddress address, CountDownLatch latch) {
+      this.address = address;
+      this.latch = latch;
+    }
+
+    @Override
+    public void operationComplete(ChannelFuture channelFuture) throws Exception {
+      if (!channelFuture.isSuccess()) {
+        channelFuture.channel().close();
+
+        if (numRetries > retryCount.getAndIncrement()) {
+          final GenericFutureListener<ChannelFuture> currentListener = this;
+
+          RpcChannelFactory.getSharedClientEventloopGroup().schedule(new Runnable() {
+            @Override
+            public void run() {
+              connectUsingNetty(address, currentListener);
+            }
+          }, PAUSE, TimeUnit.MILLISECONDS);
+
+          LOG.debug("Connecting to " + address + " has been failed. Retrying to connect.");
+        }
+        else {
+          latch.countDown();
+
+          LOG.error("Max retry count has been exceeded. attempts=" + numRetries);
+        }
+      }
+      else {
+        latch.countDown();
+      }
+    }
+  }
+
+  public boolean isActive() {
+    return getChannel().isActive();
   }
 
   public InetSocketAddress getRemoteAddress() {
-    if (channelFuture == null || channelFuture.getChannel() == null) {
+    if (channelFuture == null || channelFuture.channel() == null) {
       return null;
     }
-    return (InetSocketAddress) channelFuture.getChannel().getRemoteAddress();
+    return (InetSocketAddress) channelFuture.channel().remoteAddress();
   }
 
   public Channel getChannel() {
-    return channelFuture.getChannel();
+    return channelFuture.channel();
   }
 
   @Override
   public void close() {
-    if(this.channelFuture != null && getChannel().isOpen()) {
-      try {
-        getChannel().close().awaitUninterruptibly();
-      } catch (Throwable ce) {
-        LOG.warn(ce);
-      }
+    if (channelFuture != null && getChannel().isActive()) {
+      getChannel().close();
     }
 
-    if(this.bootstrap != null) {
-      // This line will shutdown the factory
-      // this.bootstrap.releaseExternalResources();
+    if (this.bootstrap != null) {
       InetSocketAddress address = getRemoteAddress();
       if (address != null) {
         LOG.debug("Proxy is disconnected from " + address.getHostName() + ":" + address.getPort());

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
index ef090ff..1b45ac9 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
@@ -20,19 +20,23 @@ package org.apache.tajo.rpc;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.util.concurrent.GlobalEventExecutor;
 
 import java.io.IOException;
 import java.net.DatagramSocket;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.util.Random;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class NettyServerBase {
@@ -43,10 +47,10 @@ public class NettyServerBase {
   protected String serviceName;
   protected InetSocketAddress serverAddr;
   protected InetSocketAddress bindAddress;
-  protected ChannelPipelineFactory pipelineFactory;
+  protected ChannelInitializer<Channel> initializer;
   protected ServerBootstrap bootstrap;
-  protected Channel channel;
-  protected ChannelGroup accepted = new DefaultChannelGroup();
+  protected ChannelFuture channelFuture;
+  protected ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
 
   private InetSocketAddress initIsa;
 
@@ -63,19 +67,19 @@ public class NettyServerBase {
     this.serviceName = name;
   }
 
-  public void init(ChannelPipelineFactory pipeline, int workerNum) {
-    ChannelFactory factory = RpcChannelFactory.createServerChannelFactory(serviceName, workerNum);
-
-    pipelineFactory = pipeline;
-    bootstrap = new ServerBootstrap(factory);
-    bootstrap.setPipelineFactory(pipelineFactory);
-    // TODO - should be configurable
-    bootstrap.setOption("reuseAddress", true);
-    bootstrap.setOption("child.tcpNoDelay", true);
-    bootstrap.setOption("child.keepAlive", true);
-    bootstrap.setOption("child.connectTimeoutMillis", 10000);
-    bootstrap.setOption("child.connectResponseTimeoutMillis", 10000);
-    bootstrap.setOption("child.receiveBufferSize", 1048576 * 10);
+  public void init(ChannelInitializer<Channel> initializer, int workerNum) {
+    bootstrap = RpcChannelFactory.createServerChannelFactory(serviceName, workerNum);
+
+    this.initializer = initializer;
+    bootstrap
+      .channel(NioServerSocketChannel.class)
+      .childHandler(initializer)
+      .option(ChannelOption.SO_REUSEADDR, true)
+      .option(ChannelOption.TCP_NODELAY, true)
+      .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+      .childOption(ChannelOption.TCP_NODELAY, true)
+      .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
+      .childOption(ChannelOption.SO_RCVBUF, 1048576 * 10);
   }
 
   public InetSocketAddress getListenAddress() {
@@ -98,28 +102,41 @@ public class NettyServerBase {
       serverAddr = initIsa;
     }
 
-    this.channel = bootstrap.bind(serverAddr);
-    this.bindAddress = (InetSocketAddress) channel.getLocalAddress();
+    this.channelFuture = bootstrap.clone().bind(serverAddr).syncUninterruptibly();
+    this.bindAddress = (InetSocketAddress) channelFuture.channel().localAddress();
 
     LOG.info("Rpc (" + serviceName + ") listens on " + this.bindAddress);
   }
 
   public Channel getChannel() {
-    return this.channel;
+    return this.channelFuture.channel();
   }
 
   public void shutdown() {
-    if(channel != null) {
-      channel.close().awaitUninterruptibly();
-    }
+    shutdown(false);
+  }
 
+  public void shutdown(boolean waitUntilThreadsStop) {
     try {
-      accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
+      accepted.close();
     } catch (Throwable t) {
       LOG.error(t.getMessage(), t);
     }
+
     if(bootstrap != null) {
-      bootstrap.releaseExternalResources();
+      if (bootstrap.childGroup() != null) {
+        bootstrap.childGroup().shutdownGracefully();
+        if (waitUntilThreadsStop) {
+          bootstrap.childGroup().terminationFuture().awaitUninterruptibly();
+        }
+      }
+
+      if (bootstrap.group() != null) {
+        bootstrap.group().shutdownGracefully();
+        if (waitUntilThreadsStop) {
+          bootstrap.childGroup().terminationFuture().awaitUninterruptibly();
+        }
+      }
     }
 
     if (bindAddress != null) {
@@ -138,13 +155,14 @@ public class NettyServerBase {
   // each system has a different starting port number within the given range.
   private static final AtomicInteger nextPortNum =
       new AtomicInteger(startPortRange+ rnd.nextInt(endPortRange - startPortRange));
+  private static final Object lockObject = new Object();
 
 
   private synchronized static int getUnusedPort() throws IOException {
     while (true) {
       int port = nextPortNum.getAndIncrement();
       if (port >= endPortRange) {
-        synchronized (nextPortNum) {
+        synchronized (lockObject) {
           nextPortNum.set(startPortRange);
           port = nextPortNum.getAndIncrement();
         }

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/NullCallback.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NullCallback.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NullCallback.java
index 70135a6..9b7f8ac 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NullCallback.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NullCallback.java
@@ -20,7 +20,7 @@ package org.apache.tajo.rpc;
 
 import com.google.protobuf.RpcCallback;
 
-public class NullCallback implements RpcCallback {
+public class NullCallback implements RpcCallback<Object> {
   private final static NullCallback instance;
 
   static {

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java
new file mode 100644
index 0000000..6a340dc
--- /dev/null
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+
+import com.google.protobuf.MessageLite;
+
+class ProtoChannelInitializer extends ChannelInitializer<Channel> {
+  private final MessageLite defaultInstance;
+  private final ChannelHandler handler;
+
+  public ProtoChannelInitializer(ChannelHandler handler, MessageLite defaultInstance) {
+    this.handler = handler;
+    this.defaultInstance = defaultInstance;
+  }
+
+  @Override
+  protected void initChannel(Channel channel) throws Exception {
+    ChannelPipeline pipeline = channel.pipeline();
+    pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
+    pipeline.addLast("protobufDecoder", new ProtobufDecoder(defaultInstance));
+    pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
+    pipeline.addLast("protobufEncoder", new ProtobufEncoder());
+    pipeline.addLast("handler", handler);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoPipelineFactory.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoPipelineFactory.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoPipelineFactory.java
deleted file mode 100644
index 7aa03e7..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoPipelineFactory.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.MessageLite;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelUpstreamHandler;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
-import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
-import org.jboss.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
-import org.jboss.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
-
-public class ProtoPipelineFactory implements ChannelPipelineFactory {
-  private final ChannelUpstreamHandler handler;
-  private final MessageLite defaultInstance;
-
-  public ProtoPipelineFactory(ChannelUpstreamHandler handlerFactory,
-      MessageLite defaultInstance) {
-    this.handler = handlerFactory;
-    this.defaultInstance = defaultInstance;
-  }
-
-  public ChannelPipeline getPipeline() throws Exception {
-    ChannelPipeline p = Channels.pipeline();
-    p.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
-    p.addLast("protobufDecoder", new ProtobufDecoder(defaultInstance));
-    p.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
-    p.addLast("protobufEncoder", new ProtobufEncoder());
-    p.addLast("handler", handler);
-    return p;
-  }
-}