You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2022/10/10 21:27:55 UTC
[hadoop] branch branch-3.3 updated: HADOOP-11245. Update NFS gateway to use Netty4 (#2832) (#4997)
This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 6847ec0647c HADOOP-11245. Update NFS gateway to use Netty4 (#2832) (#4997)
6847ec0647c is described below
commit 6847ec0647c3063bcbf9cf0315a77e247cae8534
Author: Ashutosh Gupta <as...@st.niituniversity.in>
AuthorDate: Mon Oct 10 22:27:43 2022 +0100
HADOOP-11245. Update NFS gateway to use Netty4 (#2832) (#4997)
Reviewed-by: Tsz-Wo Nicholas Sze <sz...@apache.org>
Co-authored-by: Wei-Chiu Chuang <we...@apache.org>
---
hadoop-common-project/hadoop-nfs/pom.xml | 2 +-
.../java/org/apache/hadoop/mount/MountdBase.java | 14 ++-
.../java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java | 7 +-
.../apache/hadoop/oncrpc/RegistrationClient.java | 13 +--
.../java/org/apache/hadoop/oncrpc/RpcInfo.java | 12 +-
.../java/org/apache/hadoop/oncrpc/RpcProgram.java | 19 ++--
.../java/org/apache/hadoop/oncrpc/RpcResponse.java | 23 ++--
.../java/org/apache/hadoop/oncrpc/RpcUtil.java | 123 +++++++++++---------
.../org/apache/hadoop/oncrpc/SimpleTcpClient.java | 78 ++++++++-----
.../hadoop/oncrpc/SimpleTcpClientHandler.java | 30 ++---
.../org/apache/hadoop/oncrpc/SimpleTcpServer.java | 76 +++++++------
.../org/apache/hadoop/oncrpc/SimpleUdpServer.java | 65 +++++++----
.../main/java/org/apache/hadoop/oncrpc/XDR.java | 12 +-
.../java/org/apache/hadoop/portmap/Portmap.java | 126 +++++++++++++--------
.../apache/hadoop/portmap/RpcProgramPortmap.java | 46 ++++----
.../org/apache/hadoop/oncrpc/TestFrameDecoder.java | 100 ++++++++--------
.../org/apache/hadoop/portmap/TestPortmap.java | 2 +-
hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml | 2 +-
.../hadoop/hdfs/nfs/mount/RpcProgramMountd.java | 12 +-
.../org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java | 12 +-
.../apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java | 2 +-
.../hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java | 14 ++-
.../org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java | 2 +-
.../apache/hadoop/hdfs/nfs/nfs3/WriteManager.java | 2 +-
.../hadoop/hdfs/nfs/TestOutOfOrderWrite.java | 32 +++---
.../hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java | 2 +-
.../apache/hadoop/hdfs/nfs/nfs3/TestWrites.java | 2 +-
27 files changed, 472 insertions(+), 358 deletions(-)
diff --git a/hadoop-common-project/hadoop-nfs/pom.xml b/hadoop-common-project/hadoop-nfs/pom.xml
index c0b4e14177c..baabb0fdc6e 100644
--- a/hadoop-common-project/hadoop-nfs/pom.xml
+++ b/hadoop-common-project/hadoop-nfs/pom.xml
@@ -90,7 +90,7 @@
</dependency>
<dependency>
<groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
+ <artifactId>netty-all</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/mount/MountdBase.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/mount/MountdBase.java
index 0ff3084bf3e..58d3e51f2bd 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/mount/MountdBase.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/mount/MountdBase.java
@@ -41,6 +41,8 @@ abstract public class MountdBase {
private final RpcProgram rpcProgram;
private int udpBoundPort; // Will set after server starts
private int tcpBoundPort; // Will set after server starts
+ private SimpleUdpServer udpServer = null;
+ private SimpleTcpServer tcpServer = null;
public RpcProgram getRpcProgram() {
return rpcProgram;
@@ -57,7 +59,7 @@ abstract public class MountdBase {
/* Start UDP server */
private void startUDPServer() {
- SimpleUdpServer udpServer = new SimpleUdpServer(rpcProgram.getPort(),
+ udpServer = new SimpleUdpServer(rpcProgram.getPort(),
rpcProgram, 1);
rpcProgram.startDaemons();
try {
@@ -76,7 +78,7 @@ abstract public class MountdBase {
/* Start TCP server */
private void startTCPServer() {
- SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
+ tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
rpcProgram, 1);
rpcProgram.startDaemons();
try {
@@ -118,6 +120,14 @@ abstract public class MountdBase {
rpcProgram.unregister(PortmapMapping.TRANSPORT_TCP, tcpBoundPort);
tcpBoundPort = 0;
}
+ if (udpServer != null) {
+ udpServer.shutdown();
+ udpServer = null;
+ }
+ if (tcpServer != null) {
+ tcpServer.shutdown();
+ tcpServer = null;
+ }
}
/**
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java
index ff83a5f19be..e6ea29b42bf 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java
@@ -35,6 +35,7 @@ public abstract class Nfs3Base {
public static final Logger LOG = LoggerFactory.getLogger(Nfs3Base.class);
private final RpcProgram rpcProgram;
private int nfsBoundPort; // Will set after server starts
+ private SimpleTcpServer tcpServer = null;
public RpcProgram getRpcProgram() {
return rpcProgram;
@@ -61,7 +62,7 @@ public abstract class Nfs3Base {
}
private void startTCPServer() {
- SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
+ tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
rpcProgram, 0);
rpcProgram.startDaemons();
try {
@@ -84,6 +85,10 @@ public abstract class Nfs3Base {
nfsBoundPort = 0;
}
rpcProgram.stopDaemons();
+ if (tcpServer != null) {
+ tcpServer.shutdown();
+ tcpServer = null;
+ }
}
/**
* Priority of the nfsd shutdown hook.
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java
index c8528ba4d55..c96f1d53bb4 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java
@@ -19,10 +19,9 @@ package org.apache.hadoop.oncrpc;
import java.util.Arrays;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.MessageEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,10 +57,10 @@ public class RegistrationClient extends SimpleTcpClient {
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
- ChannelBuffer buf = (ChannelBuffer) e.getMessage(); // Read reply
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ ByteBuf buf = (ByteBuf) msg; // Read reply
if (!validMessageLength(buf.readableBytes())) {
- e.getChannel().close();
+ ctx.channel().close();
return;
}
@@ -83,7 +82,7 @@ public class RegistrationClient extends SimpleTcpClient {
RpcDeniedReply deniedReply = (RpcDeniedReply) reply;
handle(deniedReply);
}
- e.getChannel().close(); // shutdown now that request is complete
+ ctx.channel().close(); // shutdown now that request is complete
}
private void handle(RpcDeniedReply deniedReply) {
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcInfo.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcInfo.java
index b434d79285c..aba8e9ea262 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcInfo.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcInfo.java
@@ -19,9 +19,9 @@ package org.apache.hadoop.oncrpc;
import java.net.SocketAddress;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
/**
* RpcInfo records all contextual information of an RPC message. It contains
@@ -29,11 +29,11 @@ import org.jboss.netty.channel.ChannelHandlerContext;
*/
public final class RpcInfo {
private final RpcMessage header;
- private final ChannelBuffer data;
+ private final ByteBuf data;
private final Channel channel;
private final SocketAddress remoteAddress;
- public RpcInfo(RpcMessage header, ChannelBuffer data,
+ public RpcInfo(RpcMessage header, ByteBuf data,
ChannelHandlerContext channelContext, Channel channel,
SocketAddress remoteAddress) {
this.header = header;
@@ -46,7 +46,7 @@ public final class RpcInfo {
return header;
}
- public ChannelBuffer data() {
+ public ByteBuf data() {
return data;
}
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
index bafb49716b6..8b8d558255f 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
@@ -22,16 +22,15 @@ import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.apache.hadoop.portmap.PortmapMapping;
import org.apache.hadoop.portmap.PortmapRequest;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,7 +38,7 @@ import org.slf4j.LoggerFactory;
* Class for writing RPC server programs based on RFC 1050. Extend this class
* and implement {@link #handleInternal} to handle the requests received.
*/
-public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
+public abstract class RpcProgram extends ChannelInboundHandlerAdapter {
static final Logger LOG = LoggerFactory.getLogger(RpcProgram.class);
public static final int RPCB_PORT = 111;
private final String program;
@@ -161,9 +160,9 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
public void stopDaemons() {}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
- RpcInfo info = (RpcInfo) e.getMessage();
+ RpcInfo info = (RpcInfo) msg;
RpcCall call = (RpcCall) info.header();
SocketAddress remoteAddress = info.remoteAddress();
@@ -221,7 +220,7 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
out.writeInt(lowProgVersion);
out.writeInt(highProgVersion);
}
- ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+ ByteBuf b = Unpooled.wrappedBuffer(out.asReadOnlyWrap()
.buffer());
RpcResponse rsp = new RpcResponse(b, remoteAddress);
RpcUtil.sendRpcResponse(ctx, rsp);
@@ -234,7 +233,7 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
RpcReply.ReplyState.MSG_DENIED,
RpcDeniedReply.RejectState.AUTH_ERROR, new VerifierNone());
reply.write(out);
- ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+ ByteBuf buf = Unpooled.wrappedBuffer(out.asReadOnlyWrap()
.buffer());
RpcResponse rsp = new RpcResponse(buf, remoteAddress);
RpcUtil.sendRpcResponse(ctx, rsp);
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcResponse.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcResponse.java
index 2e45e6100b1..0d6431f68bd 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcResponse.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcResponse.java
@@ -19,27 +19,30 @@ package org.apache.hadoop.oncrpc;
import java.net.SocketAddress;
-import org.jboss.netty.buffer.ChannelBuffer;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.DefaultAddressedEnvelope;
/**
* RpcResponse encapsulates a response to a RPC request. It contains the data
* that is going to cross the wire, as well as the information of the remote
* peer.
*/
-public class RpcResponse {
- private final ChannelBuffer data;
- private final SocketAddress remoteAddress;
+public class RpcResponse extends
+ DefaultAddressedEnvelope<ByteBuf, SocketAddress> {
+ public RpcResponse(ByteBuf message, SocketAddress recipient) {
+ super(message, recipient, null);
+ }
- public RpcResponse(ChannelBuffer data, SocketAddress remoteAddress) {
- this.data = data;
- this.remoteAddress = remoteAddress;
+ public RpcResponse(ByteBuf message, SocketAddress recipient,
+ SocketAddress sender) {
+ super(message, recipient, sender);
}
- public ChannelBuffer data() {
- return data;
+ public ByteBuf data() {
+ return this.content();
}
public SocketAddress remoteAddress() {
- return remoteAddress;
+ return this.recipient();
}
}
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
index cebebd27d0c..e8bc27d687f 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
@@ -17,16 +17,18 @@
*/
package org.apache.hadoop.oncrpc;
+import java.net.SocketAddress;
import java.nio.ByteBuffer;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.handler.codec.frame.FrameDecoder;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.socket.DatagramPacket;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,16 +45,16 @@ public final class RpcUtil {
public static void sendRpcResponse(ChannelHandlerContext ctx,
RpcResponse response) {
- Channels.fireMessageReceived(ctx, response);
+ ctx.fireChannelRead(response);
}
- public static FrameDecoder constructRpcFrameDecoder() {
+ public static ByteToMessageDecoder constructRpcFrameDecoder() {
return new RpcFrameDecoder();
}
- public static final SimpleChannelUpstreamHandler STAGE_RPC_MESSAGE_PARSER = new RpcMessageParserStage();
- public static final SimpleChannelUpstreamHandler STAGE_RPC_TCP_RESPONSE = new RpcTcpResponseStage();
- public static final SimpleChannelUpstreamHandler STAGE_RPC_UDP_RESPONSE = new RpcUdpResponseStage();
+ public static final ChannelInboundHandlerAdapter STAGE_RPC_MESSAGE_PARSER = new RpcMessageParserStage();
+ public static final ChannelInboundHandlerAdapter STAGE_RPC_TCP_RESPONSE = new RpcTcpResponseStage();
+ public static final ChannelInboundHandlerAdapter STAGE_RPC_UDP_RESPONSE = new RpcUdpResponseStage();
/**
* An RPC client can separate a RPC message into several frames (i.e.,
@@ -62,44 +64,39 @@ public final class RpcUtil {
* RpcFrameDecoder is a stateful pipeline stage. It has to be constructed for
* each RPC client.
*/
- static class RpcFrameDecoder extends FrameDecoder {
+ static class RpcFrameDecoder extends ByteToMessageDecoder {
public static final Logger LOG =
LoggerFactory.getLogger(RpcFrameDecoder.class);
- private ChannelBuffer currentFrame;
+ private volatile boolean isLast;
@Override
- protected Object decode(ChannelHandlerContext ctx, Channel channel,
- ChannelBuffer buf) {
+ protected void decode(ChannelHandlerContext ctx, ByteBuf buf,
+ List<Object> out) {
- if (buf.readableBytes() < 4)
- return null;
+ if (buf.readableBytes() < 4) {
+ return;
+ }
buf.markReaderIndex();
byte[] fragmentHeader = new byte[4];
buf.readBytes(fragmentHeader);
int length = XDR.fragmentSize(fragmentHeader);
- boolean isLast = XDR.isLastFragment(fragmentHeader);
+ isLast = XDR.isLastFragment(fragmentHeader);
if (buf.readableBytes() < length) {
buf.resetReaderIndex();
- return null;
+ return;
}
- ChannelBuffer newFragment = buf.readSlice(length);
- if (currentFrame == null) {
- currentFrame = newFragment;
- } else {
- currentFrame = ChannelBuffers.wrappedBuffer(currentFrame, newFragment);
- }
+ ByteBuf newFragment = buf.readSlice(length);
+ newFragment.retain();
+ out.add(newFragment);
+ }
- if (isLast) {
- ChannelBuffer completeFrame = currentFrame;
- currentFrame = null;
- return completeFrame;
- } else {
- return null;
- }
+ @VisibleForTesting
+ public boolean isLast() {
+ return isLast;
}
}
@@ -107,30 +104,44 @@ public final class RpcUtil {
* RpcMessageParserStage parses the network bytes and encapsulates the RPC
* request into a RpcInfo instance.
*/
- static final class RpcMessageParserStage extends SimpleChannelUpstreamHandler {
+ @ChannelHandler.Sharable
+ static final class RpcMessageParserStage extends ChannelInboundHandlerAdapter {
private static final Logger LOG = LoggerFactory
.getLogger(RpcMessageParserStage.class);
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
- ChannelBuffer buf = (ChannelBuffer) e.getMessage();
- ByteBuffer b = buf.toByteBuffer().asReadOnlyBuffer();
+ ByteBuf buf;
+ SocketAddress remoteAddress;
+ if (msg instanceof DatagramPacket) {
+ DatagramPacket packet = (DatagramPacket)msg;
+ buf = packet.content();
+ remoteAddress = packet.sender();
+ } else {
+ buf = (ByteBuf) msg;
+ remoteAddress = ctx.channel().remoteAddress();
+ }
+
+ ByteBuffer b = buf.nioBuffer().asReadOnlyBuffer();
XDR in = new XDR(b, XDR.State.READING);
RpcInfo info = null;
try {
RpcCall callHeader = RpcCall.read(in);
- ChannelBuffer dataBuffer = ChannelBuffers.wrappedBuffer(in.buffer()
+ ByteBuf dataBuffer = Unpooled.wrappedBuffer(in.buffer()
.slice());
- info = new RpcInfo(callHeader, dataBuffer, ctx, e.getChannel(),
- e.getRemoteAddress());
+
+ info = new RpcInfo(callHeader, dataBuffer, ctx, ctx.channel(),
+ remoteAddress);
} catch (Exception exc) {
- LOG.info("Malformed RPC request from " + e.getRemoteAddress());
+ LOG.info("Malformed RPC request from " + remoteAddress);
+ } finally {
+ buf.release();
}
if (info != null) {
- Channels.fireMessageReceived(ctx, info);
+ ctx.fireChannelRead(info);
}
}
}
@@ -139,16 +150,17 @@ public final class RpcUtil {
* RpcTcpResponseStage sends an RpcResponse across the wire with the
* appropriate fragment header.
*/
- private static class RpcTcpResponseStage extends SimpleChannelUpstreamHandler {
+ @ChannelHandler.Sharable
+ private static class RpcTcpResponseStage extends ChannelInboundHandlerAdapter {
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
- RpcResponse r = (RpcResponse) e.getMessage();
+ RpcResponse r = (RpcResponse) msg;
byte[] fragmentHeader = XDR.recordMark(r.data().readableBytes(), true);
- ChannelBuffer header = ChannelBuffers.wrappedBuffer(fragmentHeader);
- ChannelBuffer d = ChannelBuffers.wrappedBuffer(header, r.data());
- e.getChannel().write(d);
+ ByteBuf header = Unpooled.wrappedBuffer(fragmentHeader);
+ ByteBuf d = Unpooled.wrappedBuffer(header, r.data());
+ ctx.channel().writeAndFlush(d);
}
}
@@ -156,14 +168,17 @@ public final class RpcUtil {
* RpcUdpResponseStage sends an RpcResponse as a UDP packet, which does not
* require a fragment header.
*/
+ @ChannelHandler.Sharable
private static final class RpcUdpResponseStage extends
- SimpleChannelUpstreamHandler {
+ ChannelInboundHandlerAdapter {
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
- RpcResponse r = (RpcResponse) e.getMessage();
- e.getChannel().write(r.data(), r.remoteAddress());
+ RpcResponse r = (RpcResponse) msg;
+ // TODO: check out https://github.com/netty/netty/issues/1282 for
+ // correct usage
+ ctx.channel().writeAndFlush(r.data());
}
}
}
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java
index 32e1b4b8392..7cfef6439b0 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java
@@ -18,15 +18,16 @@
package org.apache.hadoop.oncrpc;
import java.net.InetSocketAddress;
-import java.util.concurrent.Executors;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
* A simple TCP based RPC client which just sends a request to a server.
@@ -35,8 +36,9 @@ public class SimpleTcpClient {
protected final String host;
protected final int port;
protected final XDR request;
- protected ChannelPipelineFactory pipelineFactory;
protected final boolean oneShot;
+ private NioEventLoopGroup workerGroup;
+ private ChannelFuture future;
public SimpleTcpClient(String host, int port, XDR request) {
this(host,port, request, true);
@@ -48,40 +50,54 @@ public class SimpleTcpClient {
this.request = request;
this.oneShot = oneShot;
}
-
- protected ChannelPipelineFactory setPipelineFactory() {
- this.pipelineFactory = new ChannelPipelineFactory() {
+
+ protected ChannelInitializer<SocketChannel> setChannelHandler() {
+ return new ChannelInitializer<SocketChannel>() {
@Override
- public ChannelPipeline getPipeline() {
- return Channels.pipeline(
+ protected void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline p = ch.pipeline();
+ p.addLast(
RpcUtil.constructRpcFrameDecoder(),
- new SimpleTcpClientHandler(request));
+ new SimpleTcpClientHandler(request)
+ );
}
};
- return this.pipelineFactory;
}
+ @VisibleForTesting
public void run() {
// Configure the client.
- ChannelFactory factory = new NioClientSocketChannelFactory(
- Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), 1, 1);
- ClientBootstrap bootstrap = new ClientBootstrap(factory);
-
- // Set up the pipeline factory.
- bootstrap.setPipelineFactory(setPipelineFactory());
-
- bootstrap.setOption("tcpNoDelay", true);
- bootstrap.setOption("keepAlive", true);
+ workerGroup = new NioEventLoopGroup();
+ Bootstrap bootstrap = new Bootstrap()
+ .group(workerGroup)
+ .channel(NioSocketChannel.class);
- // Start the connection attempt.
- ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
+ try {
+ future = bootstrap.handler(setChannelHandler())
+ .option(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .connect(new InetSocketAddress(host, port)).sync();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } finally {
+ if (oneShot) {
+ stop();
+ }
+ }
+ }
- if (oneShot) {
- // Wait until the connection is closed or the connection attempt fails.
- future.getChannel().getCloseFuture().awaitUninterruptibly();
+ public void stop() {
+ try {
+ if (future != null) {
+ // Wait until the connection is closed or the connection attempt fails.
+ future.channel().closeFuture().sync();
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } finally {
// Shut down thread pools to exit.
- bootstrap.releaseExternalResources();
+ workerGroup.shutdownGracefully();
}
}
}
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java
index 23b6682361c..1acefc857f8 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java
@@ -17,19 +17,19 @@
*/
package org.apache.hadoop.oncrpc;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A simple TCP based RPC client handler used by {@link SimpleTcpServer}.
*/
-public class SimpleTcpClientHandler extends SimpleChannelHandler {
+public class SimpleTcpClientHandler extends ChannelInboundHandlerAdapter {
public static final Logger LOG =
LoggerFactory.getLogger(SimpleTcpClient.class);
protected final XDR request;
@@ -39,13 +39,13 @@ public class SimpleTcpClientHandler extends SimpleChannelHandler {
}
@Override
- public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
// Send the request
if (LOG.isDebugEnabled()) {
LOG.debug("sending PRC request");
}
- ChannelBuffer outBuf = XDR.writeMessageTcp(request, true);
- e.getChannel().write(outBuf);
+ ByteBuf outBuf = XDR.writeMessageTcp(request, true);
+ ctx.channel().writeAndFlush(outBuf);
}
/**
@@ -53,13 +53,13 @@ public class SimpleTcpClientHandler extends SimpleChannelHandler {
* more interaction with the server.
*/
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
- e.getChannel().close();
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ ctx.channel().close();
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
- LOG.warn("Unexpected exception from downstream: ", e.getCause());
- e.getChannel().close();
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ LOG.warn("Unexpected exception from downstream: ", cause.getCause());
+ ctx.channel().close();
}
}
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java
index 177fa3d80b1..29155c80b18 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java
@@ -20,14 +20,17 @@ package org.apache.hadoop.oncrpc;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,9 +42,11 @@ public class SimpleTcpServer {
LoggerFactory.getLogger(SimpleTcpServer.class);
protected final int port;
protected int boundPort = -1; // Will be set after server starts
- protected final SimpleChannelUpstreamHandler rpcProgram;
+ protected final ChannelInboundHandlerAdapter rpcProgram;
private ServerBootstrap server;
private Channel ch;
+ private EventLoopGroup bossGroup;
+ private EventLoopGroup workerGroup;
/** The maximum number of I/O worker threads */
protected final int workerCount;
@@ -57,37 +62,32 @@ public class SimpleTcpServer {
this.workerCount = workercount;
}
- public void run() {
+ public void run() throws InterruptedException {
// Configure the Server.
- ChannelFactory factory;
- if (workerCount == 0) {
- // Use default workers: 2 * the number of available processors
- factory = new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
- } else {
- factory = new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
- workerCount);
- }
+ bossGroup = new NioEventLoopGroup();
+ workerGroup = new NioEventLoopGroup(workerCount, Executors.newCachedThreadPool());
- server = new ServerBootstrap(factory);
- server.setPipelineFactory(new ChannelPipelineFactory() {
+ server = new ServerBootstrap();
+ server.group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .childHandler(new ChannelInitializer<SocketChannel>() {
@Override
- public ChannelPipeline getPipeline() throws Exception {
- return Channels.pipeline(RpcUtil.constructRpcFrameDecoder(),
+ protected void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline p = ch.pipeline();
+ p.addLast(RpcUtil.constructRpcFrameDecoder(),
RpcUtil.STAGE_RPC_MESSAGE_PARSER, rpcProgram,
RpcUtil.STAGE_RPC_TCP_RESPONSE);
- }
- });
- server.setOption("child.tcpNoDelay", true);
- server.setOption("child.keepAlive", true);
- server.setOption("child.reuseAddress", true);
- server.setOption("reuseAddress", true);
+ }})
+ .childOption(ChannelOption.TCP_NODELAY, true)
+ .childOption(ChannelOption.SO_KEEPALIVE, true)
+ .childOption(ChannelOption.SO_REUSEADDR, true)
+ .option(ChannelOption.SO_REUSEADDR, true);
// Listen to TCP port
- ch = server.bind(new InetSocketAddress(port));
- InetSocketAddress socketAddr = (InetSocketAddress) ch.getLocalAddress();
+ ChannelFuture f = server.bind(new InetSocketAddress(port)).sync();
+ ch = f.channel();
+ InetSocketAddress socketAddr = (InetSocketAddress) ch.localAddress();
boundPort = socketAddr.getPort();
LOG.info("Started listening to TCP requests at port " + boundPort + " for "
@@ -102,9 +102,17 @@ public class SimpleTcpServer {
public void shutdown() {
if (ch != null) {
ch.close().awaitUninterruptibly();
+ ch = null;
+ }
+
+ if (workerGroup != null) {
+ workerGroup.shutdownGracefully();
+ workerGroup = null;
}
- if (server != null) {
- server.releaseExternalResources();
+
+ if (bossGroup != null) {
+ bossGroup.shutdownGracefully();
+ bossGroup = null;
}
}
}
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java
index e65003ca64b..516503c323a 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java
@@ -20,12 +20,16 @@ package org.apache.hadoop.oncrpc;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
-import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.channel.socket.DatagramChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioDatagramChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,36 +43,45 @@ public class SimpleUdpServer {
private final int RECEIVE_BUFFER_SIZE = 65536;
protected final int port;
- protected final SimpleChannelUpstreamHandler rpcProgram;
+ protected final ChannelInboundHandlerAdapter rpcProgram;
protected final int workerCount;
protected int boundPort = -1; // Will be set after server starts
- private ConnectionlessBootstrap server;
+ private Bootstrap server;
private Channel ch;
+ private EventLoopGroup workerGroup;
- public SimpleUdpServer(int port, SimpleChannelUpstreamHandler program,
+ public SimpleUdpServer(int port, ChannelInboundHandlerAdapter program,
int workerCount) {
this.port = port;
this.rpcProgram = program;
this.workerCount = workerCount;
}
- public void run() {
- // Configure the client.
- DatagramChannelFactory f = new NioDatagramChannelFactory(
- Executors.newCachedThreadPool(), workerCount);
+ public void run() throws InterruptedException {
+ workerGroup = new NioEventLoopGroup(workerCount, Executors.newCachedThreadPool());
- server = new ConnectionlessBootstrap(f);
- server.setPipeline(Channels.pipeline(RpcUtil.STAGE_RPC_MESSAGE_PARSER,
- rpcProgram, RpcUtil.STAGE_RPC_UDP_RESPONSE));
-
- server.setOption("broadcast", "false");
- server.setOption("sendBufferSize", SEND_BUFFER_SIZE);
- server.setOption("receiveBufferSize", RECEIVE_BUFFER_SIZE);
- server.setOption("reuseAddress", true);
+ server = new Bootstrap();
+ server.group(workerGroup)
+ .channel(NioDatagramChannel.class)
+ .option(ChannelOption.SO_BROADCAST, true)
+ .option(ChannelOption.SO_SNDBUF, SEND_BUFFER_SIZE)
+ .option(ChannelOption.SO_RCVBUF, RECEIVE_BUFFER_SIZE)
+ .option(ChannelOption.SO_REUSEADDR, true)
+ .handler(new ChannelInitializer<NioDatagramChannel>() {
+ @Override protected void initChannel(NioDatagramChannel ch)
+ throws Exception {
+ ChannelPipeline p = ch.pipeline();
+ p.addLast(
+ RpcUtil.STAGE_RPC_MESSAGE_PARSER,
+ rpcProgram,
+ RpcUtil.STAGE_RPC_UDP_RESPONSE);
+ }
+ });
// Listen to the UDP port
- ch = server.bind(new InetSocketAddress(port));
- InetSocketAddress socketAddr = (InetSocketAddress) ch.getLocalAddress();
+ ChannelFuture f = server.bind(new InetSocketAddress(port)).sync();
+ ch = f.channel();
+ InetSocketAddress socketAddr = (InetSocketAddress) ch.localAddress();
boundPort = socketAddr.getPort();
LOG.info("Started listening to UDP requests at port " + boundPort + " for "
@@ -83,9 +96,11 @@ public class SimpleUdpServer {
public void shutdown() {
if (ch != null) {
ch.close().awaitUninterruptibly();
+ ch = null;
}
- if (server != null) {
- server.releaseExternalResources();
+ if (workerGroup != null) {
+ workerGroup.shutdownGracefully();
+ workerGroup = null;
}
}
}
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java
index 419eff831f0..6000fd57a1b 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java
@@ -20,8 +20,8 @@ package org.apache.hadoop.oncrpc;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
@@ -242,7 +242,7 @@ public final class XDR {
* @param last specifies last request or not
* @return TCP buffer
*/
- public static ChannelBuffer writeMessageTcp(XDR request, boolean last) {
+ public static ByteBuf writeMessageTcp(XDR request, boolean last) {
Preconditions.checkState(request.state == XDR.State.WRITING);
ByteBuffer b = request.buf.duplicate();
b.flip();
@@ -250,7 +250,7 @@ public final class XDR {
ByteBuffer headerBuf = ByteBuffer.wrap(fragmentHeader);
// TODO: Investigate whether making a copy of the buffer is necessary.
- return ChannelBuffers.copiedBuffer(headerBuf, b);
+ return Unpooled.wrappedBuffer(headerBuf, b);
}
/**
@@ -258,10 +258,10 @@ public final class XDR {
* @param response XDR response
* @return UDP buffer
*/
- public static ChannelBuffer writeMessageUdp(XDR response) {
+ public static ByteBuf writeMessageUdp(XDR response) {
Preconditions.checkState(response.state == XDR.State.READING);
// TODO: Investigate whether making a copy of the buffer is necessary.
- return ChannelBuffers.copiedBuffer(response.buf);
+ return Unpooled.copiedBuffer(response.buf);
}
public static int fragmentSize(byte[] mark) {
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java
index 80f43828ea8..1a8a305436c 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java
@@ -22,21 +22,27 @@ import java.net.SocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioDatagramChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.GlobalEventExecutor;
import org.apache.hadoop.oncrpc.RpcProgram;
import org.apache.hadoop.oncrpc.RpcUtil;
import org.apache.hadoop.util.StringUtils;
-import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.timeout.IdleStateHandler;
-import org.jboss.netty.util.HashedWheelTimer;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
@@ -49,11 +55,17 @@ final class Portmap {
private static final Logger LOG = LoggerFactory.getLogger(Portmap.class);
private static final int DEFAULT_IDLE_TIME_MILLISECONDS = 5000;
- private ConnectionlessBootstrap udpServer;
+ private Bootstrap udpServer;
private ServerBootstrap tcpServer;
- private ChannelGroup allChannels = new DefaultChannelGroup();
+ private ChannelGroup allChannels = new DefaultChannelGroup(
+ GlobalEventExecutor.INSTANCE);
private Channel udpChannel;
private Channel tcpChannel;
+
+ EventLoopGroup bossGroup;
+ EventLoopGroup workerGroup;
+ EventLoopGroup udpGroup;
+
private final RpcProgramPortmap handler = new RpcProgramPortmap(allChannels);
public static void main(String[] args) {
@@ -73,18 +85,19 @@ final class Portmap {
void shutdown() {
allChannels.close().awaitUninterruptibly();
- tcpServer.releaseExternalResources();
- udpServer.releaseExternalResources();
+ bossGroup.shutdownGracefully();
+ workerGroup.shutdownGracefully();
+ udpGroup.shutdownGracefully();
}
@VisibleForTesting
SocketAddress getTcpServerLocalAddress() {
- return tcpChannel.getLocalAddress();
+ return tcpChannel.localAddress();
}
@VisibleForTesting
SocketAddress getUdpServerLoAddress() {
- return udpChannel.getLocalAddress();
+ return udpChannel.localAddress();
}
@VisibleForTesting
@@ -93,38 +106,55 @@ final class Portmap {
}
void start(final int idleTimeMilliSeconds, final SocketAddress tcpAddress,
- final SocketAddress udpAddress) {
-
- tcpServer = new ServerBootstrap(new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
- tcpServer.setPipelineFactory(new ChannelPipelineFactory() {
- private final HashedWheelTimer timer = new HashedWheelTimer();
- private final IdleStateHandler idleStateHandler = new IdleStateHandler(
- timer, 0, 0, idleTimeMilliSeconds, TimeUnit.MILLISECONDS);
-
- @Override
- public ChannelPipeline getPipeline() throws Exception {
- return Channels.pipeline(RpcUtil.constructRpcFrameDecoder(),
- RpcUtil.STAGE_RPC_MESSAGE_PARSER, idleStateHandler, handler,
- RpcUtil.STAGE_RPC_TCP_RESPONSE);
- }
- });
- tcpServer.setOption("reuseAddress", true);
- tcpServer.setOption("child.reuseAddress", true);
-
- udpServer = new ConnectionlessBootstrap(new NioDatagramChannelFactory(
- Executors.newCachedThreadPool()));
-
- udpServer.setPipeline(Channels.pipeline(RpcUtil.STAGE_RPC_MESSAGE_PARSER,
- handler, RpcUtil.STAGE_RPC_UDP_RESPONSE));
- udpServer.setOption("reuseAddress", true);
-
- tcpChannel = tcpServer.bind(tcpAddress);
- udpChannel = udpServer.bind(udpAddress);
+ final SocketAddress udpAddress) throws InterruptedException {
+
+ bossGroup = new NioEventLoopGroup();
+ workerGroup = new NioEventLoopGroup(0, Executors.newCachedThreadPool());
+
+ tcpServer = new ServerBootstrap();
+ tcpServer.group(bossGroup, workerGroup)
+ .option(ChannelOption.SO_REUSEADDR, true)
+ .childOption(ChannelOption.SO_REUSEADDR, true)
+ .channel(NioServerSocketChannel.class)
+ .childHandler(new ChannelInitializer<SocketChannel>() {
+ private final IdleStateHandler idleStateHandler = new IdleStateHandler(
+ 0, 0, idleTimeMilliSeconds, TimeUnit.MILLISECONDS);
+
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline p = ch.pipeline();
+
+ p.addLast(RpcUtil.constructRpcFrameDecoder(),
+ RpcUtil.STAGE_RPC_MESSAGE_PARSER, idleStateHandler, handler,
+ RpcUtil.STAGE_RPC_TCP_RESPONSE);
+ }});
+
+ udpGroup = new NioEventLoopGroup(0, Executors.newCachedThreadPool());
+
+ udpServer = new Bootstrap();
+ udpServer.group(udpGroup)
+ .channel(NioDatagramChannel.class)
+ .handler(new ChannelInitializer<NioDatagramChannel>() {
+ @Override protected void initChannel(NioDatagramChannel ch)
+ throws Exception {
+ ChannelPipeline p = ch.pipeline();
+ p.addLast(
+ new LoggingHandler(LogLevel.DEBUG),
+ RpcUtil.STAGE_RPC_MESSAGE_PARSER, handler, RpcUtil.STAGE_RPC_UDP_RESPONSE);
+ }
+ })
+ .option(ChannelOption.SO_REUSEADDR, true);
+
+ ChannelFuture tcpChannelFuture = null;
+ tcpChannelFuture = tcpServer.bind(tcpAddress);
+ ChannelFuture udpChannelFuture = udpServer.bind(udpAddress);
+ tcpChannel = tcpChannelFuture.sync().channel();
+ udpChannel = udpChannelFuture.sync().channel();
+
allChannels.add(tcpChannel);
allChannels.add(udpChannel);
- LOG.info("Portmap server started at tcp://" + tcpChannel.getLocalAddress()
- + ", udp://" + udpChannel.getLocalAddress());
+ LOG.info("Portmap server started at tcp://" + tcpChannel.localAddress()
+ + ", udp://" + udpChannel.localAddress());
}
}
diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java
index 0bc380f614c..7b33a644fbe 100644
--- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java
+++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java
@@ -19,6 +19,14 @@ package org.apache.hadoop.portmap;
import java.util.concurrent.ConcurrentHashMap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
import org.apache.hadoop.oncrpc.RpcAcceptedReply;
import org.apache.hadoop.oncrpc.RpcCall;
import org.apache.hadoop.oncrpc.RpcInfo;
@@ -27,20 +35,12 @@ import org.apache.hadoop.oncrpc.RpcResponse;
import org.apache.hadoop.oncrpc.RpcUtil;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.VerifierNone;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.handler.timeout.IdleState;
-import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
-import org.jboss.netty.handler.timeout.IdleStateEvent;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler {
+@ChannelHandler.Sharable
+final class RpcProgramPortmap extends IdleStateHandler {
static final int PROGRAM = 100000;
static final int VERSION = 2;
@@ -60,6 +60,8 @@ final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler {
private final ChannelGroup allChannels;
RpcProgramPortmap(ChannelGroup allChannels) {
+ super(1, 1, 1);
+ // FIXME: set default idle timeout 1 second.
this.allChannels = allChannels;
PortmapMapping m = new PortmapMapping(PROGRAM, VERSION,
PortmapMapping.TRANSPORT_TCP, RpcProgram.RPCB_PORT);
@@ -151,14 +153,14 @@ final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler {
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
- RpcInfo info = (RpcInfo) e.getMessage();
+ RpcInfo info = (RpcInfo) msg;
RpcCall rpcCall = (RpcCall) info.header();
final int portmapProc = rpcCall.getProcedure();
int xid = rpcCall.getXid();
- XDR in = new XDR(info.data().toByteBuffer().asReadOnlyBuffer(),
+ XDR in = new XDR(info.data().nioBuffer().asReadOnlyBuffer(),
XDR.State.READING);
XDR out = new XDR();
@@ -181,29 +183,29 @@ final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler {
reply.write(out);
}
- ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+ ByteBuf buf = Unpooled.wrappedBuffer(out.asReadOnlyWrap()
.buffer());
RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
RpcUtil.sendRpcResponse(ctx, rsp);
}
@Override
- public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
+ public void channelActive(ChannelHandlerContext ctx)
throws Exception {
- allChannels.add(e.getChannel());
+ allChannels.add(ctx.channel());
}
@Override
public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
throws Exception {
- if (e.getState() == IdleState.ALL_IDLE) {
- e.getChannel().close();
+ if (e.state() == IdleState.ALL_IDLE) {
+ ctx.channel().close();
}
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
- LOG.warn("Encountered ", e.getCause());
- e.getChannel().close();
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) {
+ LOG.warn("Encountered ", t);
+ ctx.channel().close();
}
}
diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
index 0e416b3738d..6d103fdd781 100644
--- a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
+++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
@@ -22,19 +22,19 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Random;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelException;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
import org.apache.hadoop.oncrpc.RpcUtil.RpcFrameDecoder;
import org.apache.hadoop.oncrpc.security.CredentialsNone;
import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.apache.hadoop.test.GenericTestUtils;
-import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelException;
-import org.jboss.netty.channel.ChannelHandlerContext;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.event.Level;
@@ -55,6 +55,7 @@ public class TestFrameDecoder {
tcpClient.run();
}
+ @ChannelHandler.Sharable
static class TestRpcProgram extends RpcProgram {
protected TestRpcProgram(String program, String host, int port,
@@ -83,7 +84,7 @@ public class TestFrameDecoder {
new VerifierNone());
XDR out = new XDR();
reply.write(out);
- ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
+ ByteBuf b = Unpooled.wrappedBuffer(out.asReadOnlyWrap().buffer());
RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
RpcUtil.sendRpcResponse(ctx, rsp);
}
@@ -99,13 +100,14 @@ public class TestFrameDecoder {
RpcFrameDecoder decoder = new RpcFrameDecoder();
// Test "Length field is not received yet"
- ByteBuffer buffer = ByteBuffer.allocate(1);
- ChannelBuffer buf = new ByteBufferBackedChannelBuffer(buffer);
- ChannelBuffer channelBuffer = (ChannelBuffer) decoder.decode(
- Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class),
- buf);
- assertTrue(channelBuffer == null);
+ ByteBuf buf = Unpooled.directBuffer(1);
+ List<Object> outputBufs = new ArrayList<>();
+ decoder.decode(
+ Mockito.mock(ChannelHandlerContext.class), buf,
+ outputBufs);
+ assertTrue(outputBufs.isEmpty());
+ decoder = new RpcFrameDecoder();
// Test all bytes are not received yet
byte[] fragment = new byte[4 + 9];
fragment[0] = (byte) (1 << 7); // final fragment
@@ -114,15 +116,16 @@ public class TestFrameDecoder {
fragment[3] = (byte) 10; // fragment size = 10 bytes
assertTrue(XDR.isLastFragment(fragment));
assertTrue(XDR.fragmentSize(fragment)==10);
+ buf.release();
- buffer = ByteBuffer.allocate(4 + 9);
- buffer.put(fragment);
- buffer.flip();
- buf = new ByteBufferBackedChannelBuffer(buffer);
- channelBuffer = (ChannelBuffer) decoder.decode(
- Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class),
- buf);
- assertTrue(channelBuffer == null);
+ buf = Unpooled.directBuffer(4 + 9);
+ buf.writeBytes(fragment);
+ outputBufs = new ArrayList<>();
+ decoder.decode(
+ Mockito.mock(ChannelHandlerContext.class), buf,
+ outputBufs);
+ assertTrue(decoder.isLast());
+ buf.release();
}
@Test
@@ -137,16 +140,15 @@ public class TestFrameDecoder {
fragment1[3] = (byte) 10; // fragment size = 10 bytes
assertFalse(XDR.isLastFragment(fragment1));
assertTrue(XDR.fragmentSize(fragment1)==10);
+
+ List<Object> outputBufs = new ArrayList<>();
// decoder should wait for the final fragment
- ByteBuffer buffer = ByteBuffer.allocate(4 + 10);
- buffer.put(fragment1);
- buffer.flip();
- ChannelBuffer buf = new ByteBufferBackedChannelBuffer(buffer);
- ChannelBuffer channelBuffer = (ChannelBuffer) decoder.decode(
- Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class),
- buf);
- assertTrue(channelBuffer == null);
+ ByteBuf buf = Unpooled.directBuffer(4 + 10, 4 + 10);
+ buf.writeBytes(fragment1);
+ decoder.decode(
+ Mockito.mock(ChannelHandlerContext.class), buf,
+ outputBufs);
byte[] fragment2 = new byte[4 + 10];
fragment2[0] = (byte) (1 << 7); // final fragment
@@ -155,21 +157,22 @@ public class TestFrameDecoder {
fragment2[3] = (byte) 10; // fragment size = 10 bytes
assertTrue(XDR.isLastFragment(fragment2));
assertTrue(XDR.fragmentSize(fragment2)==10);
+ buf.release();
- buffer = ByteBuffer.allocate(4 + 10);
- buffer.put(fragment2);
- buffer.flip();
- buf = new ByteBufferBackedChannelBuffer(buffer);
- channelBuffer = (ChannelBuffer) decoder.decode(
- Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class),
- buf);
- assertTrue(channelBuffer != null);
- // Complete frame should have to total size 10+10=20
- assertEquals(20, channelBuffer.readableBytes());
+ buf = Unpooled.directBuffer(4 + 10, 4 + 10);
+ buf.writeBytes(fragment2);
+ decoder.decode(
+ Mockito.mock(ChannelHandlerContext.class), buf,
+ outputBufs);
+ // Expect two completed frames each 10 bytes
+ decoder.isLast();
+ assertEquals(2, outputBufs.size());
+ outputBufs.forEach(b -> assertEquals(((ByteBuf)b).readableBytes(), 10));
+ buf.release();
}
@Test
- public void testFrames() {
+ public void testFrames() throws InterruptedException {
int serverPort = startRpcServer(true);
XDR xdrOut = createGetportMount();
@@ -187,7 +190,7 @@ public class TestFrameDecoder {
}
@Test
- public void testUnprivilegedPort() {
+ public void testUnprivilegedPort() throws InterruptedException {
// Don't allow connections from unprivileged ports. Given that this test is
// presumably not being run by root, this will be the case.
int serverPort = startRpcServer(false);
@@ -218,23 +221,28 @@ public class TestFrameDecoder {
assertEquals(requestSize, resultSize);
}
- private static int startRpcServer(boolean allowInsecurePorts) {
+ private static int startRpcServer(boolean allowInsecurePorts)
+ throws InterruptedException {
Random rand = new Random();
int serverPort = 30000 + rand.nextInt(10000);
int retries = 10; // A few retries in case initial choice is in use.
while (true) {
+ SimpleTcpServer tcpServer = null;
try {
RpcProgram program = new TestFrameDecoder.TestRpcProgram("TestRpcProgram",
"localhost", serverPort, 100000, 1, 2, allowInsecurePorts);
- SimpleTcpServer tcpServer = new SimpleTcpServer(serverPort, program, 1);
+ tcpServer = new SimpleTcpServer(serverPort, program, 1);
tcpServer.run();
break; // Successfully bound a port, break out.
- } catch (ChannelException ce) {
+ } catch (InterruptedException | ChannelException e) {
+ if (tcpServer != null) {
+ tcpServer.shutdown();
+ }
if (retries-- > 0) {
serverPort += rand.nextInt(20); // Port in use? Try another.
} else {
- throw ce; // Out of retries.
+ throw e; // Out of retries.
}
}
}
diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java
index 6941c4a04e9..8ebf9d03c6c 100644
--- a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java
+++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java
@@ -43,7 +43,7 @@ public class TestPortmap {
private int xid;
@BeforeClass
- public static void setup() {
+ public static void setup() throws InterruptedException {
pm.start(SHORT_TIMEOUT_MILLISECONDS, new InetSocketAddress("localhost", 0),
new InetSocketAddress("localhost", 0));
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml
index e24080f9873..03ac256ace8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml
@@ -47,7 +47,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
</dependency>
<dependency>
<groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
+ <artifactId>netty-all</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java
index 3b0327ad4a1..2ba1bb060ce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java
@@ -26,6 +26,10 @@ import java.util.Collections;
import java.util.List;
import java.util.HashMap;
+import io.netty.channel.ChannelHandler;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileSystem;
@@ -51,15 +55,13 @@ import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.ChannelHandlerContext;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
* RPC program corresponding to mountd daemon. See {@link Mountd}.
*/
+@ChannelHandler.Sharable
public class RpcProgramMountd extends RpcProgram implements MountInterface {
private static final Logger LOG =
LoggerFactory.getLogger(RpcProgramMountd.class);
@@ -262,8 +264,8 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface {
RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write(
out);
}
- ChannelBuffer buf =
- ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
+ ByteBuf buf =
+ Unpooled.wrappedBuffer(out.asReadOnlyWrap().buffer());
RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
RpcUtil.sendRpcResponse(ctx, rsp);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java
index c6da1981f37..c58dc5976b3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java
@@ -22,6 +22,8 @@ import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.file.FileSystemException;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsConstants;
@@ -39,8 +41,6 @@ import org.apache.hadoop.nfs.nfs3.response.WccAttr;
import org.apache.hadoop.nfs.nfs3.response.WccData;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.security.IdMappingServiceProvider;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.Channel;
/**
* Utility/helper methods related to NFS
@@ -147,16 +147,16 @@ public class Nfs3Utils {
if (RpcProgramNfs3.LOG.isDebugEnabled()) {
RpcProgramNfs3.LOG.debug(WRITE_RPC_END + xid);
}
- ChannelBuffer outBuf = XDR.writeMessageTcp(out, true);
- channel.write(outBuf);
+ ByteBuf outBuf = XDR.writeMessageTcp(out, true);
+ channel.writeAndFlush(outBuf);
}
public static void writeChannelCommit(Channel channel, XDR out, int xid) {
if (RpcProgramNfs3.LOG.isDebugEnabled()) {
RpcProgramNfs3.LOG.debug("Commit done:" + xid);
}
- ChannelBuffer outBuf = XDR.writeMessageTcp(out, true);
- channel.write(outBuf);
+ ByteBuf outBuf = XDR.writeMessageTcp(out, true);
+ channel.writeAndFlush(outBuf);
}
private static boolean isSet(int access, int bits) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
index 528ead7a003..8358c056cac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
+import io.netty.channel.Channel;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
@@ -55,7 +56,6 @@ import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.apache.hadoop.security.IdMappingServiceProvider;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;
-import org.jboss.netty.channel.Channel;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
index d436eac598b..f6cb4350e40 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
@@ -28,6 +28,11 @@ import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.EnumSet;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException;
@@ -129,10 +134,6 @@ import org.apache.hadoop.security.ShellBasedIdMapping;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.util.JvmPauseMonitor;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
@@ -141,6 +142,7 @@ import org.slf4j.LoggerFactory;
/**
* RPC program corresponding to nfs daemon. See {@link Nfs3}.
*/
+@ChannelHandler.Sharable
public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
public static final int DEFAULT_UMASK = 0022;
public static final FsPermission umask = new FsPermission(
@@ -2180,7 +2182,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
RpcDeniedReply.RejectState.AUTH_ERROR, new VerifierNone());
rdr.write(reply);
- ChannelBuffer buf = ChannelBuffers.wrappedBuffer(reply.asReadOnlyWrap()
+ ByteBuf buf = Unpooled.wrappedBuffer(reply.asReadOnlyWrap()
.buffer());
RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
RpcUtil.sendRpcResponse(ctx, rsp);
@@ -2291,7 +2293,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
}
// TODO: currently we just return VerifierNone
out = response.serialize(out, xid, new VerifierNone());
- ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+ ByteBuf buf = Unpooled.wrappedBuffer(out.asReadOnlyWrap()
.buffer());
RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
index 76859247bf2..d5c9d4f5592 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
@@ -22,12 +22,12 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
-import org.jboss.netty.channel.Channel;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
index 28893710408..a1b6e12eebf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.nfs.nfs3;
import java.io.IOException;
import java.util.EnumSet;
+import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -43,7 +44,6 @@ import org.apache.hadoop.nfs.nfs3.response.WccData;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.apache.hadoop.security.IdMappingServiceProvider;
-import org.jboss.netty.channel.Channel;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java
index 4e53c72bec8..31528a2db87 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java
@@ -21,6 +21,12 @@ package org.apache.hadoop.hdfs.nfs;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
@@ -42,13 +48,6 @@ import org.apache.hadoop.oncrpc.SimpleTcpClientHandler;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.CredentialsNone;
import org.apache.hadoop.oncrpc.security.VerifierNone;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.MessageEvent;
public class TestOutOfOrderWrite {
public final static Logger LOG =
@@ -100,9 +99,9 @@ public class TestOutOfOrderWrite {
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
// Get handle from create response
- ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+ ByteBuf buf = (ByteBuf) msg;
XDR rsp = new XDR(buf.array());
if (rsp.getBytes().length == 0) {
LOG.info("rsp length is zero, why?");
@@ -125,7 +124,7 @@ public class TestOutOfOrderWrite {
rsp.readBoolean(); // value follow
handle = new FileHandle();
handle.deserialize(rsp);
- channel = e.getChannel();
+ channel = ctx.channel();
}
}
@@ -136,16 +135,17 @@ public class TestOutOfOrderWrite {
}
@Override
- protected ChannelPipelineFactory setPipelineFactory() {
- this.pipelineFactory = new ChannelPipelineFactory() {
+ protected ChannelInitializer<SocketChannel> setChannelHandler() {
+ return new ChannelInitializer<SocketChannel>() {
@Override
- public ChannelPipeline getPipeline() {
- return Channels.pipeline(
+ protected void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline p = ch.pipeline();
+ p.addLast(
RpcUtil.constructRpcFrameDecoder(),
- new WriteHandler(request));
+ new WriteHandler(request)
+ );
}
};
- return this.pipelineFactory;
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java
index 30ecc0b824b..07954c00d64 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java
@@ -28,6 +28,7 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.EnumSet;
+import io.netty.channel.Channel;
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -92,7 +93,6 @@ import org.apache.hadoop.oncrpc.security.SecurityHandler;
import org.apache.hadoop.security.IdMappingConstant;
import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
-import org.jboss.netty.channel.Channel;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
index f7a92fac535..0f03c6da93b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
@@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.ConcurrentNavigableMap;
+import io.netty.channel.Channel;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -52,7 +53,6 @@ import org.apache.hadoop.oncrpc.security.SecurityHandler;
import org.apache.hadoop.security.ShellBasedIdMapping;
import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
-import org.jboss.netty.channel.Channel;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org