You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2022/10/10 21:47:13 UTC

[hadoop] branch branch-3.3.5 updated: HADOOP-11245. Update NFS gateway to use Netty4 (#2832) (#4997)

This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch branch-3.3.5
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3.5 by this push:
     new 0d067f69da7 HADOOP-11245. Update NFS gateway to use Netty4 (#2832) (#4997)
0d067f69da7 is described below

commit 0d067f69da76efcb90a7bb3a1c003c9498811d24
Author: Ashutosh Gupta <as...@st.niituniversity.in>
AuthorDate: Mon Oct 10 22:27:43 2022 +0100

    HADOOP-11245. Update NFS gateway to use Netty4 (#2832) (#4997)
    
    Reviewed-by: Tsz-Wo Nicholas Sze <sz...@apache.org>
    
    Co-authored-by: Wei-Chiu Chuang <we...@apache.org>
    (cherry picked from commit 6847ec0647c3063bcbf9cf0315a77e247cae8534)
---
 hadoop-common-project/hadoop-nfs/pom.xml           |   2 +-
 .../java/org/apache/hadoop/mount/MountdBase.java   |  14 ++-
 .../java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java  |   7 +-
 .../apache/hadoop/oncrpc/RegistrationClient.java   |  13 +--
 .../java/org/apache/hadoop/oncrpc/RpcInfo.java     |  12 +-
 .../java/org/apache/hadoop/oncrpc/RpcProgram.java  |  19 ++--
 .../java/org/apache/hadoop/oncrpc/RpcResponse.java |  23 ++--
 .../java/org/apache/hadoop/oncrpc/RpcUtil.java     | 123 +++++++++++---------
 .../org/apache/hadoop/oncrpc/SimpleTcpClient.java  |  78 ++++++++-----
 .../hadoop/oncrpc/SimpleTcpClientHandler.java      |  30 ++---
 .../org/apache/hadoop/oncrpc/SimpleTcpServer.java  |  76 +++++++------
 .../org/apache/hadoop/oncrpc/SimpleUdpServer.java  |  65 +++++++----
 .../main/java/org/apache/hadoop/oncrpc/XDR.java    |  12 +-
 .../java/org/apache/hadoop/portmap/Portmap.java    | 126 +++++++++++++--------
 .../apache/hadoop/portmap/RpcProgramPortmap.java   |  46 ++++----
 .../org/apache/hadoop/oncrpc/TestFrameDecoder.java | 100 ++++++++--------
 .../org/apache/hadoop/portmap/TestPortmap.java     |   2 +-
 hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml        |   2 +-
 .../hadoop/hdfs/nfs/mount/RpcProgramMountd.java    |  12 +-
 .../org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java |  12 +-
 .../apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java   |   2 +-
 .../hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java       |  14 ++-
 .../org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java  |   2 +-
 .../apache/hadoop/hdfs/nfs/nfs3/WriteManager.java  |   2 +-
 .../hadoop/hdfs/nfs/TestOutOfOrderWrite.java       |  32 +++---
 .../hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java   |   2 +-
 .../apache/hadoop/hdfs/nfs/nfs3/TestWrites.java    |   2 +-
 27 files changed, 472 insertions(+), 358 deletions(-)

diff --git a/hadoop-common-project/hadoop-nfs/pom.xml b/hadoop-common-project/hadoop-nfs/pom.xml
index cdda01fc640..ca4b97a3f62 100644
--- a/hadoop-common-project/hadoop-nfs/pom.xml
+++ b/hadoop-common-project/hadoop-nfs/pom.xml
@@ -90,7 +90,7 @@
     </dependency>
     <dependency>
       <groupId>io.netty</groupId>
-      <artifactId>netty</artifactId>
+      <artifactId>netty-all</artifactId>
       <scope>compile</scope>
     </dependency>
     <dependency>
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/mount/MountdBase.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/mount/MountdBase.java
index 0ff3084bf3e..58d3e51f2bd 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/mount/MountdBase.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/mount/MountdBase.java
@@ -41,6 +41,8 @@ abstract public class MountdBase {
   private final RpcProgram rpcProgram;
   private int udpBoundPort; // Will set after server starts
   private int tcpBoundPort; // Will set after server starts
+  private SimpleUdpServer udpServer = null;
+  private SimpleTcpServer tcpServer = null;
 
   public RpcProgram getRpcProgram() {
     return rpcProgram;
@@ -57,7 +59,7 @@ abstract public class MountdBase {
 
   /* Start UDP server */
   private void startUDPServer() {
-    SimpleUdpServer udpServer = new SimpleUdpServer(rpcProgram.getPort(),
+    udpServer = new SimpleUdpServer(rpcProgram.getPort(),
         rpcProgram, 1);
     rpcProgram.startDaemons();
     try {
@@ -76,7 +78,7 @@ abstract public class MountdBase {
 
   /* Start TCP server */
   private void startTCPServer() {
-    SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
+    tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
         rpcProgram, 1);
     rpcProgram.startDaemons();
     try {
@@ -118,6 +120,14 @@ abstract public class MountdBase {
       rpcProgram.unregister(PortmapMapping.TRANSPORT_TCP, tcpBoundPort);
       tcpBoundPort = 0;
     }
+    if (udpServer != null) {
+      udpServer.shutdown();
+      udpServer = null;
+    }
+    if (tcpServer != null) {
+      tcpServer.shutdown();
+      tcpServer = null;
+    }
   }
 
   /**
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java
index ff83a5f19be..e6ea29b42bf 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java
@@ -35,6 +35,7 @@ public abstract class Nfs3Base {
   public static final Logger LOG = LoggerFactory.getLogger(Nfs3Base.class);
   private final RpcProgram rpcProgram;
   private int nfsBoundPort; // Will set after server starts
+  private SimpleTcpServer tcpServer = null;
 
   public RpcProgram getRpcProgram() {
     return rpcProgram;
@@ -61,7 +62,7 @@ public abstract class Nfs3Base {
   }
 
   private void startTCPServer() {
-    SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
+    tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
         rpcProgram, 0);
     rpcProgram.startDaemons();
     try {
@@ -84,6 +85,10 @@ public abstract class Nfs3Base {
       nfsBoundPort = 0;
     }
     rpcProgram.stopDaemons();
+    if (tcpServer != null) {
+      tcpServer.shutdown();
+      tcpServer = null;
+    }
   }
   /**
    * Priority of the nfsd shutdown hook.
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java
index c8528ba4d55..c96f1d53bb4 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java
@@ -19,10 +19,9 @@ package org.apache.hadoop.oncrpc;
 
 import java.util.Arrays;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
 import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.MessageEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,10 +57,10 @@ public class RegistrationClient extends SimpleTcpClient {
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
-      ChannelBuffer buf = (ChannelBuffer) e.getMessage(); // Read reply
+    public void channelRead(ChannelHandlerContext ctx, Object msg) {
+      ByteBuf buf = (ByteBuf) msg; // Read reply
       if (!validMessageLength(buf.readableBytes())) {
-        e.getChannel().close();
+        ctx.channel().close();
         return;
       }
 
@@ -83,7 +82,7 @@ public class RegistrationClient extends SimpleTcpClient {
         RpcDeniedReply deniedReply = (RpcDeniedReply) reply;
         handle(deniedReply);
       }
-      e.getChannel().close(); // shutdown now that request is complete
+      ctx.channel().close(); // shutdown now that request is complete
     }
 
     private void handle(RpcDeniedReply deniedReply) {
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcInfo.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcInfo.java
index b434d79285c..aba8e9ea262 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcInfo.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcInfo.java
@@ -19,9 +19,9 @@ package org.apache.hadoop.oncrpc;
 
 import java.net.SocketAddress;
 
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
 
 /**
  * RpcInfo records all contextual information of an RPC message. It contains
@@ -29,11 +29,11 @@ import org.jboss.netty.channel.ChannelHandlerContext;
  */
 public final class RpcInfo {
   private final RpcMessage header;
-  private final ChannelBuffer data;
+  private final ByteBuf data;
   private final Channel channel;
   private final SocketAddress remoteAddress;
 
-  public RpcInfo(RpcMessage header, ChannelBuffer data,
+  public RpcInfo(RpcMessage header, ByteBuf data,
       ChannelHandlerContext channelContext, Channel channel,
       SocketAddress remoteAddress) {
     this.header = header;
@@ -46,7 +46,7 @@ public final class RpcInfo {
     return header;
   }
 
-  public ChannelBuffer data() {
+  public ByteBuf data() {
     return data;
   }
 
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
index bafb49716b6..8b8d558255f 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
@@ -22,16 +22,15 @@ import java.net.DatagramSocket;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
 import org.apache.hadoop.oncrpc.security.VerifierNone;
 import org.apache.hadoop.portmap.PortmapMapping;
 import org.apache.hadoop.portmap.PortmapRequest;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,7 +38,7 @@ import org.slf4j.LoggerFactory;
  * Class for writing RPC server programs based on RFC 1050. Extend this class
  * and implement {@link #handleInternal} to handle the requests received.
  */
-public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
+public abstract class RpcProgram extends ChannelInboundHandlerAdapter {
   static final Logger LOG = LoggerFactory.getLogger(RpcProgram.class);
   public static final int RPCB_PORT = 111;
   private final String program;
@@ -161,9 +160,9 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
   public void stopDaemons() {}
   
   @Override
-  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+  public void channelRead(ChannelHandlerContext ctx, Object msg)
       throws Exception {
-    RpcInfo info = (RpcInfo) e.getMessage();
+    RpcInfo info = (RpcInfo) msg;
     RpcCall call = (RpcCall) info.header();
     
     SocketAddress remoteAddress = info.remoteAddress();
@@ -221,7 +220,7 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
       out.writeInt(lowProgVersion);
       out.writeInt(highProgVersion);
     }
-    ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+    ByteBuf b = Unpooled.wrappedBuffer(out.asReadOnlyWrap()
         .buffer());
     RpcResponse rsp = new RpcResponse(b, remoteAddress);
     RpcUtil.sendRpcResponse(ctx, rsp);
@@ -234,7 +233,7 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
         RpcReply.ReplyState.MSG_DENIED,
         RpcDeniedReply.RejectState.AUTH_ERROR, new VerifierNone());
     reply.write(out);
-    ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+    ByteBuf buf = Unpooled.wrappedBuffer(out.asReadOnlyWrap()
         .buffer());
     RpcResponse rsp = new RpcResponse(buf, remoteAddress);
     RpcUtil.sendRpcResponse(ctx, rsp);
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcResponse.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcResponse.java
index 2e45e6100b1..0d6431f68bd 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcResponse.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcResponse.java
@@ -19,27 +19,30 @@ package org.apache.hadoop.oncrpc;
 
 import java.net.SocketAddress;
 
-import org.jboss.netty.buffer.ChannelBuffer;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.DefaultAddressedEnvelope;
 
 /**
  * RpcResponse encapsulates a response to a RPC request. It contains the data
  * that is going to cross the wire, as well as the information of the remote
  * peer.
  */
-public class RpcResponse {
-  private final ChannelBuffer data;
-  private final SocketAddress remoteAddress;
+public class RpcResponse extends
+    DefaultAddressedEnvelope<ByteBuf, SocketAddress> {
+  public RpcResponse(ByteBuf message, SocketAddress recipient) {
+    super(message, recipient, null);
+  }
 
-  public RpcResponse(ChannelBuffer data, SocketAddress remoteAddress) {
-    this.data = data;
-    this.remoteAddress = remoteAddress;
+  public RpcResponse(ByteBuf message, SocketAddress recipient,
+      SocketAddress sender) {
+    super(message, recipient, sender);
   }
 
-  public ChannelBuffer data() {
-    return data;
+  public ByteBuf data() {
+    return this.content();
   }
 
   public SocketAddress remoteAddress() {
-    return remoteAddress;
+    return this.recipient();
   }
 }
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
index cebebd27d0c..e8bc27d687f 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
@@ -17,16 +17,18 @@
  */
 package org.apache.hadoop.oncrpc;
 
+import java.net.SocketAddress;
 import java.nio.ByteBuffer;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.handler.codec.frame.FrameDecoder;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.socket.DatagramPacket;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,16 +45,16 @@ public final class RpcUtil {
 
   public static void sendRpcResponse(ChannelHandlerContext ctx,
       RpcResponse response) {
-    Channels.fireMessageReceived(ctx, response);
+    ctx.fireChannelRead(response);
   }
 
-  public static FrameDecoder constructRpcFrameDecoder() {
+  public static ByteToMessageDecoder constructRpcFrameDecoder() {
     return new RpcFrameDecoder();
   }
 
-  public static final SimpleChannelUpstreamHandler STAGE_RPC_MESSAGE_PARSER = new RpcMessageParserStage();
-  public static final SimpleChannelUpstreamHandler STAGE_RPC_TCP_RESPONSE = new RpcTcpResponseStage();
-  public static final SimpleChannelUpstreamHandler STAGE_RPC_UDP_RESPONSE = new RpcUdpResponseStage();
+  public static final ChannelInboundHandlerAdapter STAGE_RPC_MESSAGE_PARSER = new RpcMessageParserStage();
+  public static final ChannelInboundHandlerAdapter STAGE_RPC_TCP_RESPONSE = new RpcTcpResponseStage();
+  public static final ChannelInboundHandlerAdapter STAGE_RPC_UDP_RESPONSE = new RpcUdpResponseStage();
 
   /**
    * An RPC client can separate a RPC message into several frames (i.e.,
@@ -62,44 +64,39 @@ public final class RpcUtil {
    * RpcFrameDecoder is a stateful pipeline stage. It has to be constructed for
    * each RPC client.
    */
-  static class RpcFrameDecoder extends FrameDecoder {
+  static class RpcFrameDecoder extends ByteToMessageDecoder {
     public static final Logger LOG =
         LoggerFactory.getLogger(RpcFrameDecoder.class);
-    private ChannelBuffer currentFrame;
+    private volatile boolean isLast;
 
     @Override
-    protected Object decode(ChannelHandlerContext ctx, Channel channel,
-        ChannelBuffer buf) {
+    protected void decode(ChannelHandlerContext ctx, ByteBuf buf,
+        List<Object> out) {
 
-      if (buf.readableBytes() < 4)
-        return null;
+      if (buf.readableBytes() < 4) {
+        return;
+      }
 
       buf.markReaderIndex();
 
       byte[] fragmentHeader = new byte[4];
       buf.readBytes(fragmentHeader);
       int length = XDR.fragmentSize(fragmentHeader);
-      boolean isLast = XDR.isLastFragment(fragmentHeader);
+      isLast = XDR.isLastFragment(fragmentHeader);
 
       if (buf.readableBytes() < length) {
         buf.resetReaderIndex();
-        return null;
+        return;
       }
 
-      ChannelBuffer newFragment = buf.readSlice(length);
-      if (currentFrame == null) {
-        currentFrame = newFragment;
-      } else {
-        currentFrame = ChannelBuffers.wrappedBuffer(currentFrame, newFragment);
-      }
+      ByteBuf newFragment = buf.readSlice(length);
+      newFragment.retain();
+      out.add(newFragment);
+    }
 
-      if (isLast) {
-        ChannelBuffer completeFrame = currentFrame;
-        currentFrame = null;
-        return completeFrame;
-      } else {
-        return null;
-      }
+    @VisibleForTesting
+    public boolean isLast() {
+      return isLast;
     }
   }
 
@@ -107,30 +104,44 @@ public final class RpcUtil {
    * RpcMessageParserStage parses the network bytes and encapsulates the RPC
    * request into a RpcInfo instance.
    */
-  static final class RpcMessageParserStage extends SimpleChannelUpstreamHandler {
+  @ChannelHandler.Sharable
+  static final class RpcMessageParserStage extends ChannelInboundHandlerAdapter {
     private static final Logger LOG = LoggerFactory
         .getLogger(RpcMessageParserStage.class);
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+    public void channelRead(ChannelHandlerContext ctx, Object msg)
         throws Exception {
-      ChannelBuffer buf = (ChannelBuffer) e.getMessage();
-      ByteBuffer b = buf.toByteBuffer().asReadOnlyBuffer();
+      ByteBuf buf;
+      SocketAddress remoteAddress;
+      if (msg instanceof DatagramPacket) {
+        DatagramPacket packet = (DatagramPacket)msg;
+        buf = packet.content();
+        remoteAddress = packet.sender();
+      } else {
+        buf = (ByteBuf) msg;
+        remoteAddress = ctx.channel().remoteAddress();
+      }
+
+      ByteBuffer b = buf.nioBuffer().asReadOnlyBuffer();
       XDR in = new XDR(b, XDR.State.READING);
 
       RpcInfo info = null;
       try {
         RpcCall callHeader = RpcCall.read(in);
-        ChannelBuffer dataBuffer = ChannelBuffers.wrappedBuffer(in.buffer()
+        ByteBuf dataBuffer = Unpooled.wrappedBuffer(in.buffer()
             .slice());
-        info = new RpcInfo(callHeader, dataBuffer, ctx, e.getChannel(),
-            e.getRemoteAddress());
+
+        info = new RpcInfo(callHeader, dataBuffer, ctx, ctx.channel(),
+            remoteAddress);
       } catch (Exception exc) {
-        LOG.info("Malformed RPC request from " + e.getRemoteAddress());
+        LOG.info("Malformed RPC request from " + remoteAddress);
+      } finally {
+        buf.release();
       }
 
       if (info != null) {
-        Channels.fireMessageReceived(ctx, info);
+        ctx.fireChannelRead(info);
       }
     }
   }
@@ -139,16 +150,17 @@ public final class RpcUtil {
    * RpcTcpResponseStage sends an RpcResponse across the wire with the
    * appropriate fragment header.
    */
-  private static class RpcTcpResponseStage extends SimpleChannelUpstreamHandler {
+  @ChannelHandler.Sharable
+  private static class RpcTcpResponseStage extends ChannelInboundHandlerAdapter {
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+    public void channelRead(ChannelHandlerContext ctx, Object msg)
         throws Exception {
-      RpcResponse r = (RpcResponse) e.getMessage();
+      RpcResponse r = (RpcResponse) msg;
       byte[] fragmentHeader = XDR.recordMark(r.data().readableBytes(), true);
-      ChannelBuffer header = ChannelBuffers.wrappedBuffer(fragmentHeader);
-      ChannelBuffer d = ChannelBuffers.wrappedBuffer(header, r.data());
-      e.getChannel().write(d);
+      ByteBuf header = Unpooled.wrappedBuffer(fragmentHeader);
+      ByteBuf d = Unpooled.wrappedBuffer(header, r.data());
+      ctx.channel().writeAndFlush(d);
     }
   }
 
@@ -156,14 +168,17 @@ public final class RpcUtil {
    * RpcUdpResponseStage sends an RpcResponse as a UDP packet, which does not
    * require a fragment header.
    */
+  @ChannelHandler.Sharable
   private static final class RpcUdpResponseStage extends
-      SimpleChannelUpstreamHandler {
+      ChannelInboundHandlerAdapter {
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+    public void channelRead(ChannelHandlerContext ctx, Object msg)
         throws Exception {
-      RpcResponse r = (RpcResponse) e.getMessage();
-      e.getChannel().write(r.data(), r.remoteAddress());
+      RpcResponse r = (RpcResponse) msg;
+      // TODO: check out https://github.com/netty/netty/issues/1282 for
+      // correct usage
+      ctx.channel().writeAndFlush(r.data());
     }
   }
 }
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java
index 32e1b4b8392..7cfef6439b0 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java
@@ -18,15 +18,16 @@
 package org.apache.hadoop.oncrpc;
 
 import java.net.InetSocketAddress;
-import java.util.concurrent.Executors;
 
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 
 /**
  * A simple TCP based RPC client which just sends a request to a server.
@@ -35,8 +36,9 @@ public class SimpleTcpClient {
   protected final String host;
   protected final int port;
   protected final XDR request;
-  protected ChannelPipelineFactory pipelineFactory;
   protected final boolean oneShot;
+  private NioEventLoopGroup workerGroup;
+  private ChannelFuture future;
   
   public SimpleTcpClient(String host, int port, XDR request) {
     this(host,port, request, true);
@@ -48,40 +50,54 @@ public class SimpleTcpClient {
     this.request = request;
     this.oneShot = oneShot;
   }
-  
-  protected ChannelPipelineFactory setPipelineFactory() {
-    this.pipelineFactory = new ChannelPipelineFactory() {
+
+  protected ChannelInitializer<SocketChannel> setChannelHandler() {
+    return new ChannelInitializer<SocketChannel>() {
       @Override
-      public ChannelPipeline getPipeline() {
-        return Channels.pipeline(
+      protected void initChannel(SocketChannel ch) throws Exception {
+        ChannelPipeline p = ch.pipeline();
+        p.addLast(
             RpcUtil.constructRpcFrameDecoder(),
-            new SimpleTcpClientHandler(request));
+            new SimpleTcpClientHandler(request)
+        );
       }
     };
-    return this.pipelineFactory;
   }
 
+  @VisibleForTesting
   public void run() {
     // Configure the client.
-    ChannelFactory factory = new NioClientSocketChannelFactory(
-        Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), 1, 1);
-    ClientBootstrap bootstrap = new ClientBootstrap(factory);
-
-    // Set up the pipeline factory.
-    bootstrap.setPipelineFactory(setPipelineFactory());
-
-    bootstrap.setOption("tcpNoDelay", true);
-    bootstrap.setOption("keepAlive", true);
+    workerGroup = new NioEventLoopGroup();
+    Bootstrap bootstrap = new Bootstrap()
+        .group(workerGroup)
+        .channel(NioSocketChannel.class);
 
-    // Start the connection attempt.
-    ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
+    try {
+      future = bootstrap.handler(setChannelHandler())
+          .option(ChannelOption.TCP_NODELAY, true)
+          .option(ChannelOption.SO_KEEPALIVE, true)
+          .connect(new InetSocketAddress(host, port)).sync();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } finally {
+      if (oneShot) {
+        stop();
+      }
+    }
+  }
 
-    if (oneShot) {
-      // Wait until the connection is closed or the connection attempt fails.
-      future.getChannel().getCloseFuture().awaitUninterruptibly();
+  public void stop() {
+    try {
+      if (future != null) {
+        // Wait until the connection is closed or the connection attempt fails.
+        future.channel().closeFuture().sync();
+      }
 
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } finally {
       // Shut down thread pools to exit.
-      bootstrap.releaseExternalResources();
+      workerGroup.shutdownGracefully();
     }
   }
 }
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java
index 23b6682361c..1acefc857f8 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java
@@ -17,19 +17,19 @@
  */
 package org.apache.hadoop.oncrpc;
 
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.ReferenceCountUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * A simple TCP based RPC client handler used by {@link SimpleTcpServer}.
  */
-public class SimpleTcpClientHandler extends SimpleChannelHandler {
+public class SimpleTcpClientHandler extends ChannelInboundHandlerAdapter {
   public static final Logger LOG =
       LoggerFactory.getLogger(SimpleTcpClient.class);
   protected final XDR request;
@@ -39,13 +39,13 @@ public class SimpleTcpClientHandler extends SimpleChannelHandler {
   }
 
   @Override
-  public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
+  public void channelActive(ChannelHandlerContext ctx) throws Exception {
     // Send the request
     if (LOG.isDebugEnabled()) {
       LOG.debug("sending PRC request");
     }
-    ChannelBuffer outBuf = XDR.writeMessageTcp(request, true);
-    e.getChannel().write(outBuf);
+    ByteBuf outBuf = XDR.writeMessageTcp(request, true);
+    ctx.channel().writeAndFlush(outBuf);
   }
 
   /**
@@ -53,13 +53,13 @@ public class SimpleTcpClientHandler extends SimpleChannelHandler {
    * more interaction with the server.
    */
   @Override
-  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
-    e.getChannel().close();
+  public void channelRead(ChannelHandlerContext ctx, Object msg) {
+    ctx.channel().close();
   }
 
   @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-    LOG.warn("Unexpected exception from downstream: ", e.getCause());
-    e.getChannel().close();
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+    LOG.warn("Unexpected exception from downstream: ", cause.getCause());
+    ctx.channel().close();
   }
 }
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java
index 177fa3d80b1..29155c80b18 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java
@@ -20,14 +20,17 @@ package org.apache.hadoop.oncrpc;
 import java.net.InetSocketAddress;
 import java.util.concurrent.Executors;
 
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,9 +42,11 @@ public class SimpleTcpServer {
       LoggerFactory.getLogger(SimpleTcpServer.class);
   protected final int port;
   protected int boundPort = -1; // Will be set after server starts
-  protected final SimpleChannelUpstreamHandler rpcProgram;
+  protected final ChannelInboundHandlerAdapter rpcProgram;
   private ServerBootstrap server;
   private Channel ch;
+  private EventLoopGroup bossGroup;
+  private EventLoopGroup workerGroup;
 
   /** The maximum number of I/O worker threads */
   protected final int workerCount;
@@ -57,37 +62,32 @@ public class SimpleTcpServer {
     this.workerCount = workercount;
   }
 
-  public void run() {
+  public void run() throws InterruptedException {
     // Configure the Server.
-    ChannelFactory factory;
-    if (workerCount == 0) {
-      // Use default workers: 2 * the number of available processors
-      factory = new NioServerSocketChannelFactory(
-          Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
-    } else {
-      factory = new NioServerSocketChannelFactory(
-          Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
-          workerCount);
-    }
+    bossGroup = new NioEventLoopGroup();
+    workerGroup = new NioEventLoopGroup(workerCount, Executors.newCachedThreadPool());
 
-    server = new ServerBootstrap(factory);
-    server.setPipelineFactory(new ChannelPipelineFactory() {
+    server = new ServerBootstrap();
 
+    server.group(bossGroup, workerGroup)
+        .channel(NioServerSocketChannel.class)
+        .childHandler(new ChannelInitializer<SocketChannel>() {
       @Override
-      public ChannelPipeline getPipeline() throws Exception {
-        return Channels.pipeline(RpcUtil.constructRpcFrameDecoder(),
+      protected void initChannel(SocketChannel ch) throws Exception {
+        ChannelPipeline p = ch.pipeline();
+        p.addLast(RpcUtil.constructRpcFrameDecoder(),
             RpcUtil.STAGE_RPC_MESSAGE_PARSER, rpcProgram,
             RpcUtil.STAGE_RPC_TCP_RESPONSE);
-      }
-    });
-    server.setOption("child.tcpNoDelay", true);
-    server.setOption("child.keepAlive", true);
-    server.setOption("child.reuseAddress", true);
-    server.setOption("reuseAddress", true);
+      }})
+        .childOption(ChannelOption.TCP_NODELAY, true)
+        .childOption(ChannelOption.SO_KEEPALIVE, true)
+        .childOption(ChannelOption.SO_REUSEADDR, true)
+        .option(ChannelOption.SO_REUSEADDR, true);
 
     // Listen to TCP port
-    ch = server.bind(new InetSocketAddress(port));
-    InetSocketAddress socketAddr = (InetSocketAddress) ch.getLocalAddress();
+    ChannelFuture f = server.bind(new InetSocketAddress(port)).sync();
+    ch = f.channel();
+    InetSocketAddress socketAddr = (InetSocketAddress) ch.localAddress();
     boundPort = socketAddr.getPort();
 
     LOG.info("Started listening to TCP requests at port " + boundPort + " for "
@@ -102,9 +102,17 @@ public class SimpleTcpServer {
   public void shutdown() {
     if (ch != null) {
       ch.close().awaitUninterruptibly();
+      ch = null;
+    }
+
+    if (workerGroup != null) {
+      workerGroup.shutdownGracefully();
+      workerGroup = null;
     }
-    if (server != null) {
-      server.releaseExternalResources();
+
+    if (bossGroup != null) {
+      bossGroup.shutdownGracefully();
+      bossGroup = null;
     }
   }
 }
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java
index e65003ca64b..516503c323a 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java
@@ -20,12 +20,16 @@ package org.apache.hadoop.oncrpc;
 import java.net.InetSocketAddress;
 import java.util.concurrent.Executors;
 
-import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.channel.socket.DatagramChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioDatagramChannel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,36 +43,45 @@ public class SimpleUdpServer {
   private final int RECEIVE_BUFFER_SIZE = 65536;
 
   protected final int port;
-  protected final SimpleChannelUpstreamHandler rpcProgram;
+  protected final ChannelInboundHandlerAdapter rpcProgram;
   protected final int workerCount;
   protected int boundPort = -1; // Will be set after server starts
-  private ConnectionlessBootstrap server;
+  private Bootstrap server;
   private Channel ch;
+  private EventLoopGroup workerGroup;
 
-  public SimpleUdpServer(int port, SimpleChannelUpstreamHandler program,
+  public SimpleUdpServer(int port, ChannelInboundHandlerAdapter program,
       int workerCount) {
     this.port = port;
     this.rpcProgram = program;
     this.workerCount = workerCount;
   }
 
-  public void run() {
-    // Configure the client.
-    DatagramChannelFactory f = new NioDatagramChannelFactory(
-        Executors.newCachedThreadPool(), workerCount);
+  public void run() throws InterruptedException {
+    workerGroup = new NioEventLoopGroup(workerCount, Executors.newCachedThreadPool());
 
-    server = new ConnectionlessBootstrap(f);
-    server.setPipeline(Channels.pipeline(RpcUtil.STAGE_RPC_MESSAGE_PARSER,
-        rpcProgram, RpcUtil.STAGE_RPC_UDP_RESPONSE));
-
-    server.setOption("broadcast", "false");
-    server.setOption("sendBufferSize", SEND_BUFFER_SIZE);
-    server.setOption("receiveBufferSize", RECEIVE_BUFFER_SIZE);
-    server.setOption("reuseAddress", true);
+    server = new Bootstrap();
+    server.group(workerGroup)
+        .channel(NioDatagramChannel.class)
+        .option(ChannelOption.SO_BROADCAST, true)
+        .option(ChannelOption.SO_SNDBUF, SEND_BUFFER_SIZE)
+        .option(ChannelOption.SO_RCVBUF, RECEIVE_BUFFER_SIZE)
+        .option(ChannelOption.SO_REUSEADDR, true)
+        .handler(new ChannelInitializer<NioDatagramChannel>() {
+          @Override protected void initChannel(NioDatagramChannel ch)
+              throws Exception {
+            ChannelPipeline p = ch.pipeline();
+            p.addLast(
+                RpcUtil.STAGE_RPC_MESSAGE_PARSER,
+                rpcProgram,
+                RpcUtil.STAGE_RPC_UDP_RESPONSE);
+          }
+        });
 
     // Listen to the UDP port
-    ch = server.bind(new InetSocketAddress(port));
-    InetSocketAddress socketAddr = (InetSocketAddress) ch.getLocalAddress();
+    ChannelFuture f = server.bind(new InetSocketAddress(port)).sync();
+    ch = f.channel();
+    InetSocketAddress socketAddr = (InetSocketAddress) ch.localAddress();
     boundPort = socketAddr.getPort();
 
     LOG.info("Started listening to UDP requests at port " + boundPort + " for "
@@ -83,9 +96,11 @@ public class SimpleUdpServer {
   public void shutdown() {
     if (ch != null) {
       ch.close().awaitUninterruptibly();
+      ch = null;
     }
-    if (server != null) {
-      server.releaseExternalResources();
+    if (workerGroup != null) {
+      workerGroup.shutdownGracefully();
+      workerGroup = null;
     }
   }
 }
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java
index 419eff831f0..6000fd57a1b 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java
@@ -20,8 +20,8 @@ package org.apache.hadoop.oncrpc;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
@@ -242,7 +242,7 @@ public final class XDR {
    * @param last specifies last request or not
    * @return TCP buffer
    */
-  public static ChannelBuffer writeMessageTcp(XDR request, boolean last) {
+  public static ByteBuf writeMessageTcp(XDR request, boolean last) {
     Preconditions.checkState(request.state == XDR.State.WRITING);
     ByteBuffer b = request.buf.duplicate();
     b.flip();
@@ -250,7 +250,7 @@ public final class XDR {
     ByteBuffer headerBuf = ByteBuffer.wrap(fragmentHeader);
 
     // TODO: Investigate whether making a copy of the buffer is necessary.
-    return ChannelBuffers.copiedBuffer(headerBuf, b);
+    return Unpooled.wrappedBuffer(headerBuf, b);
   }
 
   /**
@@ -258,10 +258,10 @@ public final class XDR {
    * @param response XDR response
    * @return UDP buffer
    */
-  public static ChannelBuffer writeMessageUdp(XDR response) {
+  public static ByteBuf writeMessageUdp(XDR response) {
     Preconditions.checkState(response.state == XDR.State.READING);
     // TODO: Investigate whether making a copy of the buffer is necessary.
-    return ChannelBuffers.copiedBuffer(response.buf);
+    return Unpooled.copiedBuffer(response.buf);
   }
 
   public static int fragmentSize(byte[] mark) {
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java
index 80f43828ea8..1a8a305436c 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java
@@ -22,21 +22,27 @@ import java.net.SocketAddress;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioDatagramChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.GlobalEventExecutor;
 import org.apache.hadoop.oncrpc.RpcProgram;
 import org.apache.hadoop.oncrpc.RpcUtil;
 import org.apache.hadoop.util.StringUtils;
-import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.timeout.IdleStateHandler;
-import org.jboss.netty.util.HashedWheelTimer;
 
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
@@ -49,11 +55,17 @@ final class Portmap {
   private static final Logger LOG = LoggerFactory.getLogger(Portmap.class);
   private static final int DEFAULT_IDLE_TIME_MILLISECONDS = 5000;
 
-  private ConnectionlessBootstrap udpServer;
+  private Bootstrap udpServer;
   private ServerBootstrap tcpServer;
-  private ChannelGroup allChannels = new DefaultChannelGroup();
+  private ChannelGroup allChannels = new DefaultChannelGroup(
+      GlobalEventExecutor.INSTANCE);
   private Channel udpChannel;
   private Channel tcpChannel;
+
+  EventLoopGroup bossGroup;
+  EventLoopGroup workerGroup;
+  EventLoopGroup udpGroup;
+
   private final RpcProgramPortmap handler = new RpcProgramPortmap(allChannels);
 
   public static void main(String[] args) {
@@ -73,18 +85,19 @@ final class Portmap {
 
   void shutdown() {
     allChannels.close().awaitUninterruptibly();
-    tcpServer.releaseExternalResources();
-    udpServer.releaseExternalResources();
+    bossGroup.shutdownGracefully();
+    workerGroup.shutdownGracefully();
+    udpGroup.shutdownGracefully();
   }
 
   @VisibleForTesting
   SocketAddress getTcpServerLocalAddress() {
-    return tcpChannel.getLocalAddress();
+    return tcpChannel.localAddress();
   }
 
   @VisibleForTesting
   SocketAddress getUdpServerLoAddress() {
-    return udpChannel.getLocalAddress();
+    return udpChannel.localAddress();
   }
 
   @VisibleForTesting
@@ -93,38 +106,55 @@ final class Portmap {
   }
 
   void start(final int idleTimeMilliSeconds, final SocketAddress tcpAddress,
-      final SocketAddress udpAddress) {
-
-    tcpServer = new ServerBootstrap(new NioServerSocketChannelFactory(
-        Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
-    tcpServer.setPipelineFactory(new ChannelPipelineFactory() {
-      private final HashedWheelTimer timer = new HashedWheelTimer();
-      private final IdleStateHandler idleStateHandler = new IdleStateHandler(
-          timer, 0, 0, idleTimeMilliSeconds, TimeUnit.MILLISECONDS);
-
-      @Override
-      public ChannelPipeline getPipeline() throws Exception {
-        return Channels.pipeline(RpcUtil.constructRpcFrameDecoder(),
-            RpcUtil.STAGE_RPC_MESSAGE_PARSER, idleStateHandler, handler,
-            RpcUtil.STAGE_RPC_TCP_RESPONSE);
-      }
-    });
-    tcpServer.setOption("reuseAddress", true);
-    tcpServer.setOption("child.reuseAddress", true);
-
-    udpServer = new ConnectionlessBootstrap(new NioDatagramChannelFactory(
-        Executors.newCachedThreadPool()));
-
-    udpServer.setPipeline(Channels.pipeline(RpcUtil.STAGE_RPC_MESSAGE_PARSER,
-        handler, RpcUtil.STAGE_RPC_UDP_RESPONSE));
-    udpServer.setOption("reuseAddress", true);
-
-    tcpChannel = tcpServer.bind(tcpAddress);
-    udpChannel = udpServer.bind(udpAddress);
+      final SocketAddress udpAddress) throws InterruptedException {
+
+    bossGroup = new NioEventLoopGroup();
+    workerGroup = new NioEventLoopGroup(0, Executors.newCachedThreadPool());
+
+    tcpServer = new ServerBootstrap();
+    tcpServer.group(bossGroup, workerGroup)
+        .option(ChannelOption.SO_REUSEADDR, true)
+        .childOption(ChannelOption.SO_REUSEADDR, true)
+        .channel(NioServerSocketChannel.class)
+        .childHandler(new ChannelInitializer<SocketChannel>() {
+          private final IdleStateHandler idleStateHandler = new IdleStateHandler(
+              0, 0, idleTimeMilliSeconds, TimeUnit.MILLISECONDS);
+
+          @Override
+          protected void initChannel(SocketChannel ch) throws Exception {
+            ChannelPipeline p = ch.pipeline();
+
+            p.addLast(RpcUtil.constructRpcFrameDecoder(),
+                RpcUtil.STAGE_RPC_MESSAGE_PARSER, idleStateHandler, handler,
+                RpcUtil.STAGE_RPC_TCP_RESPONSE);
+          }});
+
+    udpGroup = new NioEventLoopGroup(0, Executors.newCachedThreadPool());
+
+    udpServer = new Bootstrap();
+    udpServer.group(udpGroup)
+        .channel(NioDatagramChannel.class)
+        .handler(new ChannelInitializer<NioDatagramChannel>() {
+          @Override protected void initChannel(NioDatagramChannel ch)
+              throws Exception {
+            ChannelPipeline p = ch.pipeline();
+            p.addLast(
+                new LoggingHandler(LogLevel.DEBUG),
+                RpcUtil.STAGE_RPC_MESSAGE_PARSER, handler, RpcUtil.STAGE_RPC_UDP_RESPONSE);
+          }
+        })
+        .option(ChannelOption.SO_REUSEADDR, true);
+
+    ChannelFuture tcpChannelFuture = null;
+    tcpChannelFuture = tcpServer.bind(tcpAddress);
+    ChannelFuture udpChannelFuture = udpServer.bind(udpAddress);
+    tcpChannel = tcpChannelFuture.sync().channel();
+    udpChannel = udpChannelFuture.sync().channel();
+
     allChannels.add(tcpChannel);
     allChannels.add(udpChannel);
 
-    LOG.info("Portmap server started at tcp://" + tcpChannel.getLocalAddress()
-        + ", udp://" + udpChannel.getLocalAddress());
+    LOG.info("Portmap server started at tcp://" + tcpChannel.localAddress()
+        + ", udp://" + udpChannel.localAddress());
   }
 }
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java
index 0bc380f614c..7b33a644fbe 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java
@@ -19,6 +19,14 @@ package org.apache.hadoop.portmap;
 
 import java.util.concurrent.ConcurrentHashMap;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
 import org.apache.hadoop.oncrpc.RpcAcceptedReply;
 import org.apache.hadoop.oncrpc.RpcCall;
 import org.apache.hadoop.oncrpc.RpcInfo;
@@ -27,20 +35,12 @@ import org.apache.hadoop.oncrpc.RpcResponse;
 import org.apache.hadoop.oncrpc.RpcUtil;
 import org.apache.hadoop.oncrpc.XDR;
 import org.apache.hadoop.oncrpc.security.VerifierNone;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.handler.timeout.IdleState;
-import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
-import org.jboss.netty.handler.timeout.IdleStateEvent;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler {
+@ChannelHandler.Sharable
+final class RpcProgramPortmap extends IdleStateHandler {
   static final int PROGRAM = 100000;
   static final int VERSION = 2;
 
@@ -60,6 +60,8 @@ final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler {
   private final ChannelGroup allChannels;
 
   RpcProgramPortmap(ChannelGroup allChannels) {
+    super(1, 1, 1);
+    // FIXME: set default idle timeout 1 second.
     this.allChannels = allChannels;
     PortmapMapping m = new PortmapMapping(PROGRAM, VERSION,
         PortmapMapping.TRANSPORT_TCP, RpcProgram.RPCB_PORT);
@@ -151,14 +153,14 @@ final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler {
   }
 
   @Override
-  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+  public void channelRead(ChannelHandlerContext ctx, Object msg)
       throws Exception {
 
-    RpcInfo info = (RpcInfo) e.getMessage();
+    RpcInfo info = (RpcInfo) msg;
     RpcCall rpcCall = (RpcCall) info.header();
     final int portmapProc = rpcCall.getProcedure();
     int xid = rpcCall.getXid();
-    XDR in = new XDR(info.data().toByteBuffer().asReadOnlyBuffer(),
+    XDR in = new XDR(info.data().nioBuffer().asReadOnlyBuffer(),
         XDR.State.READING);
     XDR out = new XDR();
 
@@ -181,29 +183,29 @@ final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler {
       reply.write(out);
     }
 
-    ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+    ByteBuf buf = Unpooled.wrappedBuffer(out.asReadOnlyWrap()
         .buffer());
     RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
     RpcUtil.sendRpcResponse(ctx, rsp);
   }
 
   @Override
-  public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
+  public void channelActive(ChannelHandlerContext ctx)
       throws Exception {
-    allChannels.add(e.getChannel());
+    allChannels.add(ctx.channel());
   }
 
   @Override
   public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
       throws Exception {
-    if (e.getState() == IdleState.ALL_IDLE) {
-      e.getChannel().close();
+    if (e.state() == IdleState.ALL_IDLE) {
+      ctx.channel().close();
     }
   }
 
   @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-    LOG.warn("Encountered ", e.getCause());
-    e.getChannel().close();
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) {
+    LOG.warn("Encountered ", t);
+    ctx.channel().close();
   }
 }
diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
index 0e416b3738d..6d103fdd781 100644
--- a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
+++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
@@ -22,19 +22,19 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Random;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelException;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
 import org.apache.hadoop.oncrpc.RpcUtil.RpcFrameDecoder;
 import org.apache.hadoop.oncrpc.security.CredentialsNone;
 import org.apache.hadoop.oncrpc.security.VerifierNone;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelException;
-import org.jboss.netty.channel.ChannelHandlerContext;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.slf4j.event.Level;
@@ -55,6 +55,7 @@ public class TestFrameDecoder {
     tcpClient.run();
   }
 
+  @ChannelHandler.Sharable
   static class TestRpcProgram extends RpcProgram {
 
     protected TestRpcProgram(String program, String host, int port,
@@ -83,7 +84,7 @@ public class TestFrameDecoder {
           new VerifierNone());
       XDR out = new XDR();
       reply.write(out);
-      ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
+      ByteBuf b = Unpooled.wrappedBuffer(out.asReadOnlyWrap().buffer());
       RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
       RpcUtil.sendRpcResponse(ctx, rsp);
     }
@@ -99,13 +100,14 @@ public class TestFrameDecoder {
     RpcFrameDecoder decoder = new RpcFrameDecoder();
 
     // Test "Length field is not received yet"
-    ByteBuffer buffer = ByteBuffer.allocate(1);
-    ChannelBuffer buf = new ByteBufferBackedChannelBuffer(buffer);
-    ChannelBuffer channelBuffer = (ChannelBuffer) decoder.decode(
-        Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class),
-        buf);
-    assertTrue(channelBuffer == null);
+    ByteBuf buf = Unpooled.directBuffer(1);
+    List<Object> outputBufs = new ArrayList<>();
+    decoder.decode(
+        Mockito.mock(ChannelHandlerContext.class), buf,
+        outputBufs);
+    assertTrue(outputBufs.isEmpty());
 
+    decoder = new RpcFrameDecoder();
     // Test all bytes are not received yet
     byte[] fragment = new byte[4 + 9];
     fragment[0] = (byte) (1 << 7); // final fragment
@@ -114,15 +116,16 @@ public class TestFrameDecoder {
     fragment[3] = (byte) 10; // fragment size = 10 bytes
     assertTrue(XDR.isLastFragment(fragment));
     assertTrue(XDR.fragmentSize(fragment)==10);
+    buf.release();
 
-    buffer = ByteBuffer.allocate(4 + 9);
-    buffer.put(fragment);
-    buffer.flip();
-    buf = new ByteBufferBackedChannelBuffer(buffer);
-    channelBuffer = (ChannelBuffer) decoder.decode(
-        Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class),
-        buf);
-    assertTrue(channelBuffer == null);
+    buf = Unpooled.directBuffer(4 + 9);
+    buf.writeBytes(fragment);
+    outputBufs = new ArrayList<>();
+    decoder.decode(
+        Mockito.mock(ChannelHandlerContext.class), buf,
+        outputBufs);
+    assertTrue(decoder.isLast());
+    buf.release();
   }
   
   @Test
@@ -137,16 +140,15 @@ public class TestFrameDecoder {
     fragment1[3] = (byte) 10; // fragment size = 10 bytes
     assertFalse(XDR.isLastFragment(fragment1));
     assertTrue(XDR.fragmentSize(fragment1)==10);
+
+    List<Object> outputBufs = new ArrayList<>();
     
     // decoder should wait for the final fragment
-    ByteBuffer buffer = ByteBuffer.allocate(4 + 10);    
-    buffer.put(fragment1);
-    buffer.flip();
-    ChannelBuffer buf = new ByteBufferBackedChannelBuffer(buffer);
-    ChannelBuffer channelBuffer = (ChannelBuffer) decoder.decode(
-        Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class),
-        buf);
-    assertTrue(channelBuffer == null);
+    ByteBuf buf = Unpooled.directBuffer(4 + 10, 4 + 10);
+    buf.writeBytes(fragment1);
+    decoder.decode(
+        Mockito.mock(ChannelHandlerContext.class), buf,
+        outputBufs);
 
     byte[] fragment2 = new byte[4 + 10];
     fragment2[0] = (byte) (1 << 7); // final fragment
@@ -155,21 +157,22 @@ public class TestFrameDecoder {
     fragment2[3] = (byte) 10; // fragment size = 10 bytes
     assertTrue(XDR.isLastFragment(fragment2));
     assertTrue(XDR.fragmentSize(fragment2)==10);
+    buf.release();
 
-    buffer = ByteBuffer.allocate(4 + 10);
-    buffer.put(fragment2);
-    buffer.flip();
-    buf = new ByteBufferBackedChannelBuffer(buffer);
-    channelBuffer = (ChannelBuffer) decoder.decode(
-        Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class),
-        buf);
-    assertTrue(channelBuffer != null);
-    // Complete frame should have to total size 10+10=20
-    assertEquals(20, channelBuffer.readableBytes());
+    buf = Unpooled.directBuffer(4 + 10, 4 + 10);
+    buf.writeBytes(fragment2);
+    decoder.decode(
+        Mockito.mock(ChannelHandlerContext.class), buf,
+        outputBufs);
+    // Expect two completed frames each 10 bytes
+    decoder.isLast();
+    assertEquals(2, outputBufs.size());
+    outputBufs.forEach(b -> assertEquals(((ByteBuf)b).readableBytes(), 10));
+    buf.release();
   }
 
   @Test
-  public void testFrames() {
+  public void testFrames() throws InterruptedException {
     int serverPort = startRpcServer(true);
 
     XDR xdrOut = createGetportMount();
@@ -187,7 +190,7 @@ public class TestFrameDecoder {
   }
   
   @Test
-  public void testUnprivilegedPort() {
+  public void testUnprivilegedPort() throws InterruptedException {
     // Don't allow connections from unprivileged ports. Given that this test is
     // presumably not being run by root, this will be the case.
     int serverPort = startRpcServer(false);
@@ -218,23 +221,28 @@ public class TestFrameDecoder {
     assertEquals(requestSize, resultSize);
   }
   
-  private static int startRpcServer(boolean allowInsecurePorts) {
+  private static int startRpcServer(boolean allowInsecurePorts)
+      throws InterruptedException {
     Random rand = new Random();
     int serverPort = 30000 + rand.nextInt(10000);
     int retries = 10;    // A few retries in case initial choice is in use.
 
     while (true) {
+      SimpleTcpServer tcpServer = null;
       try {
         RpcProgram program = new TestFrameDecoder.TestRpcProgram("TestRpcProgram",
             "localhost", serverPort, 100000, 1, 2, allowInsecurePorts);
-        SimpleTcpServer tcpServer = new SimpleTcpServer(serverPort, program, 1);
+        tcpServer = new SimpleTcpServer(serverPort, program, 1);
         tcpServer.run();
         break;          // Successfully bound a port, break out.
-      } catch (ChannelException ce) {
+      } catch (InterruptedException | ChannelException e) {
+        if (tcpServer != null) {
+          tcpServer.shutdown();
+        }
         if (retries-- > 0) {
           serverPort += rand.nextInt(20); // Port in use? Try another.
         } else {
-          throw ce;     // Out of retries.
+          throw e;     // Out of retries.
         }
       }
     }
diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java
index 6941c4a04e9..8ebf9d03c6c 100644
--- a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java
+++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java
@@ -43,7 +43,7 @@ public class TestPortmap {
   private int xid;
 
   @BeforeClass
-  public static void setup() {
+  public static void setup() throws InterruptedException {
     pm.start(SHORT_TIMEOUT_MILLISECONDS, new InetSocketAddress("localhost", 0),
         new InetSocketAddress("localhost", 0));
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml
index e86b5340ff5..7135efa2fcc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml
@@ -47,7 +47,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
     </dependency>
     <dependency>
       <groupId>io.netty</groupId>
-      <artifactId>netty</artifactId>
+      <artifactId>netty-all</artifactId>
       <scope>compile</scope>
     </dependency>
     <dependency>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java
index 3b0327ad4a1..2ba1bb060ce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java
@@ -26,6 +26,10 @@ import java.util.Collections;
 import java.util.List;
 import java.util.HashMap;
 
+import io.netty.channel.ChannelHandler;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FileSystem;
@@ -51,15 +55,13 @@ import org.apache.hadoop.oncrpc.XDR;
 import org.apache.hadoop.oncrpc.security.VerifierNone;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.ChannelHandlerContext;
 
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 
 /**
  * RPC program corresponding to mountd daemon. See {@link Mountd}.
  */
+@ChannelHandler.Sharable
 public class RpcProgramMountd extends RpcProgram implements MountInterface {
   private static final Logger LOG =
       LoggerFactory.getLogger(RpcProgramMountd.class);
@@ -262,8 +264,8 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface {
           RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write(
           out);
     }
-    ChannelBuffer buf =
-        ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
+    ByteBuf buf =
+        Unpooled.wrappedBuffer(out.asReadOnlyWrap().buffer());
     RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
     RpcUtil.sendRpcResponse(ctx, rsp);
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java
index c6da1981f37..c58dc5976b3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java
@@ -22,6 +22,8 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.nio.file.FileSystemException;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsConstants;
@@ -39,8 +41,6 @@ import org.apache.hadoop.nfs.nfs3.response.WccAttr;
 import org.apache.hadoop.nfs.nfs3.response.WccData;
 import org.apache.hadoop.oncrpc.XDR;
 import org.apache.hadoop.security.IdMappingServiceProvider;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.Channel;
 
 /**
  * Utility/helper methods related to NFS
@@ -147,16 +147,16 @@ public class Nfs3Utils {
     if (RpcProgramNfs3.LOG.isDebugEnabled()) {
       RpcProgramNfs3.LOG.debug(WRITE_RPC_END + xid);
     }
-    ChannelBuffer outBuf = XDR.writeMessageTcp(out, true);
-    channel.write(outBuf);
+    ByteBuf outBuf = XDR.writeMessageTcp(out, true);
+    channel.writeAndFlush(outBuf);
   }
   
   public static void writeChannelCommit(Channel channel, XDR out, int xid) {
     if (RpcProgramNfs3.LOG.isDebugEnabled()) {
       RpcProgramNfs3.LOG.debug("Commit done:" + xid);
     }
-    ChannelBuffer outBuf = XDR.writeMessageTcp(out, true);
-    channel.write(outBuf);
+    ByteBuf outBuf = XDR.writeMessageTcp(out, true);
+    channel.writeAndFlush(outBuf);
   }
 
   private static boolean isSet(int access, int bits) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
index 528ead7a003..8358c056cac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicLong;
 
+import io.netty.channel.Channel;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
@@ -55,7 +56,6 @@ import org.apache.hadoop.oncrpc.security.VerifierNone;
 import org.apache.hadoop.security.IdMappingServiceProvider;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.Time;
-import org.jboss.netty.channel.Channel;
 
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
index d436eac598b..f6cb4350e40 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
@@ -28,6 +28,11 @@ import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.util.EnumSet;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException;
@@ -129,10 +134,6 @@ import org.apache.hadoop.security.ShellBasedIdMapping;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.util.JvmPauseMonitor;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
 
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
@@ -141,6 +142,7 @@ import org.slf4j.LoggerFactory;
 /**
  * RPC program corresponding to nfs daemon. See {@link Nfs3}.
  */
+@ChannelHandler.Sharable
 public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
   public static final int DEFAULT_UMASK = 0022;
   public static final FsPermission umask = new FsPermission(
@@ -2180,7 +2182,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
             RpcDeniedReply.RejectState.AUTH_ERROR, new VerifierNone());
         rdr.write(reply);
 
-        ChannelBuffer buf = ChannelBuffers.wrappedBuffer(reply.asReadOnlyWrap()
+        ByteBuf buf = Unpooled.wrappedBuffer(reply.asReadOnlyWrap()
             .buffer());
         RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
         RpcUtil.sendRpcResponse(ctx, rsp);
@@ -2291,7 +2293,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
     }
     // TODO: currently we just return VerifierNone
     out = response.serialize(out, xid, new VerifierNone());
-    ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+    ByteBuf buf = Unpooled.wrappedBuffer(out.asReadOnlyWrap()
         .buffer());
     RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
index 76859247bf2..d5c9d4f5592 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
@@ -22,12 +22,12 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 
+import io.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.nfs.nfs3.FileHandle;
 import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
-import org.jboss.netty.channel.Channel;
 
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
index 28893710408..a1b6e12eebf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.nfs.nfs3;
 import java.io.IOException;
 import java.util.EnumSet;
 
+import io.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -43,7 +44,6 @@ import org.apache.hadoop.nfs.nfs3.response.WccData;
 import org.apache.hadoop.oncrpc.XDR;
 import org.apache.hadoop.oncrpc.security.VerifierNone;
 import org.apache.hadoop.security.IdMappingServiceProvider;
-import org.jboss.netty.channel.Channel;
 
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java
index 4e53c72bec8..31528a2db87 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java
@@ -21,6 +21,12 @@ package org.apache.hadoop.hdfs.nfs;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
@@ -42,13 +48,6 @@ import org.apache.hadoop.oncrpc.SimpleTcpClientHandler;
 import org.apache.hadoop.oncrpc.XDR;
 import org.apache.hadoop.oncrpc.security.CredentialsNone;
 import org.apache.hadoop.oncrpc.security.VerifierNone;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.MessageEvent;
 
 public class TestOutOfOrderWrite {
   public final static Logger LOG =
@@ -100,9 +99,9 @@ public class TestOutOfOrderWrite {
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+    public void channelRead(ChannelHandlerContext ctx, Object msg) {
       // Get handle from create response
-      ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+      ByteBuf buf = (ByteBuf) msg;
       XDR rsp = new XDR(buf.array());
       if (rsp.getBytes().length == 0) {
         LOG.info("rsp length is zero, why?");
@@ -125,7 +124,7 @@ public class TestOutOfOrderWrite {
       rsp.readBoolean(); // value follow
       handle = new FileHandle();
       handle.deserialize(rsp);
-      channel = e.getChannel();
+      channel = ctx.channel();
     }
   }
 
@@ -136,16 +135,17 @@ public class TestOutOfOrderWrite {
     }
 
     @Override
-    protected ChannelPipelineFactory setPipelineFactory() {
-      this.pipelineFactory = new ChannelPipelineFactory() {
+    protected ChannelInitializer<SocketChannel>  setChannelHandler() {
+      return new ChannelInitializer<SocketChannel>() {
         @Override
-        public ChannelPipeline getPipeline() {
-          return Channels.pipeline(
+        protected void initChannel(SocketChannel ch) throws Exception {
+          ChannelPipeline p = ch.pipeline();
+          p.addLast(
               RpcUtil.constructRpcFrameDecoder(),
-              new WriteHandler(request));
+              new WriteHandler(request)
+          );
         }
       };
-      return this.pipelineFactory;
     }
 
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java
index 30ecc0b824b..07954c00d64 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java
@@ -28,6 +28,7 @@ import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.EnumSet;
 
+import io.netty.channel.Channel;
 import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -92,7 +93,6 @@ import org.apache.hadoop.oncrpc.security.SecurityHandler;
 import org.apache.hadoop.security.IdMappingConstant;
 import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
 import org.apache.hadoop.security.authorize.ProxyUsers;
-import org.jboss.netty.channel.Channel;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
index f7a92fac535..0f03c6da93b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
@@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.concurrent.ConcurrentNavigableMap;
 
+import io.netty.channel.Channel;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -52,7 +53,6 @@ import org.apache.hadoop.oncrpc.security.SecurityHandler;
 import org.apache.hadoop.security.ShellBasedIdMapping;
 import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
 import org.apache.hadoop.security.authorize.ProxyUsers;
-import org.jboss.netty.channel.Channel;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org