You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by br...@apache.org on 2013/09/30 21:37:34 UTC
svn commit: r1527741 - in
/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src:
main/java/org/apache/hadoop/nfs/nfs3/ main/java/org/apache/hadoop/oncrpc/
main/java/org/apache/hadoop/oncrpc/security/
main/java/org/apache/hadoop/portmap/...
Author: brandonli
Date: Mon Sep 30 19:37:33 2013
New Revision: 1527741
URL: http://svn.apache.org/r1527741
Log:
HDFS-5230. Merging change r1527726 from trunk
Added:
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcInfo.java
- copied unchanged from r1527726, hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcInfo.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcResponse.java
- copied unchanged from r1527726, hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcResponse.java
Removed:
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java
Modified:
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/security/Verifier.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java?rev=1527741&r1=1527740&r2=1527741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java Mon Sep 30 19:37:33 2013
@@ -22,13 +22,8 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mount.MountdBase;
import org.apache.hadoop.oncrpc.RpcProgram;
-import org.apache.hadoop.oncrpc.RpcUtil;
import org.apache.hadoop.oncrpc.SimpleTcpServer;
-import org.apache.hadoop.oncrpc.SimpleTcpServerHandler;
import org.apache.hadoop.portmap.PortmapMapping;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
/**
* Nfs server. Supports NFS v3 using {@link RpcProgram}.
@@ -72,19 +67,7 @@ public abstract class Nfs3Base {
private void startTCPServer() {
SimpleTcpServer tcpServer = new SimpleTcpServer(nfsPort,
- rpcProgram, 0) {
- @Override
- public ChannelPipelineFactory getPipelineFactory() {
- return new ChannelPipelineFactory() {
- @Override
- public ChannelPipeline getPipeline() {
- return Channels.pipeline(
- RpcUtil.constructRpcFrameDecoder(),
- new SimpleTcpServerHandler(rpcProgram));
- }
- };
- }
- };
+ rpcProgram, 0);
tcpServer.run();
}
}
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java?rev=1527741&r1=1527740&r2=1527741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java Mon Sep 30 19:37:33 2013
@@ -44,7 +44,7 @@ import com.google.common.annotations.Vis
public class RpcCallCache {
public static class CacheEntry {
- private XDR response; // null if no response has been sent
+ private RpcResponse response; // null if no response has been sent
public CacheEntry() {
response = null;
@@ -58,11 +58,11 @@ public class RpcCallCache {
return response != null;
}
- public XDR getResponse() {
+ public RpcResponse getResponse() {
return response;
}
- public void setResponse(XDR response) {
+ public void setResponse(RpcResponse response) {
this.response = response;
}
}
@@ -128,13 +128,13 @@ public class RpcCallCache {
}
/** Mark a request as completed and add corresponding response to the cache */
- public void callCompleted(InetAddress clientId, int xid, XDR response) {
+ public void callCompleted(InetAddress clientId, int xid, RpcResponse response) {
ClientRequest req = new ClientRequest(clientId, xid);
CacheEntry e;
synchronized(map) {
e = map.get(req);
}
- e.setResponse(response);
+ e.response = response;
}
/**
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java?rev=1527741&r1=1527740&r2=1527741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java Mon Sep 30 19:37:33 2013
@@ -18,22 +18,24 @@
package org.apache.hadoop.oncrpc;
import java.io.IOException;
-import java.net.InetAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
-import org.apache.hadoop.oncrpc.RpcCallCache.CacheEntry;
-import org.apache.hadoop.oncrpc.security.VerifierNone;
+import org.apache.hadoop.oncrpc.security.Verifier;
import org.apache.hadoop.portmap.PortmapMapping;
import org.apache.hadoop.portmap.PortmapRequest;
-import org.jboss.netty.channel.Channel;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
/**
* Class for writing RPC server programs based on RFC 1050. Extend this class
* and implement {@link #handleInternal} to handle the requests received.
*/
-public abstract class RpcProgram {
+public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
private static final Log LOG = LogFactory.getLog(RpcProgram.class);
public static final int RPCB_PORT = 111;
private final String program;
@@ -42,7 +44,6 @@ public abstract class RpcProgram {
private final int progNumber;
private final int lowProgVersion;
private final int highProgVersion;
- private final RpcCallCache rpcCallCache;
/**
* Constructor
@@ -53,19 +54,15 @@ public abstract class RpcProgram {
* @param progNumber program number as defined in RFC 1050
* @param lowProgVersion lowest version of the specification supported
* @param highProgVersion highest version of the specification supported
- * @param cacheSize size of cache to handle duplciate requests. Size <= 0
- * indicates no cache.
*/
protected RpcProgram(String program, String host, int port, int progNumber,
- int lowProgVersion, int highProgVersion, int cacheSize) {
+ int lowProgVersion, int highProgVersion) {
this.program = program;
this.host = host;
this.port = port;
this.progNumber = progNumber;
this.lowProgVersion = lowProgVersion;
this.highProgVersion = highProgVersion;
- this.rpcCallCache = cacheSize > 0 ? new RpcCallCache(program, cacheSize)
- : null;
}
/**
@@ -103,92 +100,50 @@ public abstract class RpcProgram {
}
}
- /**
- * Handle an RPC request.
- * @param rpcCall RPC call that is received
- * @param in xdr with cursor at reading the remaining bytes of a method call
- * @param out xdr output corresponding to Rpc reply
- * @param client making the Rpc request
- * @param channel connection over which Rpc request is received
- * @return response xdr response
- */
- protected abstract XDR handleInternal(RpcCall rpcCall, XDR in, XDR out,
- InetAddress client, Channel channel);
-
- public XDR handle(XDR xdr, InetAddress client, Channel channel) {
- XDR out = new XDR();
- RpcCall rpcCall = RpcCall.read(xdr);
- if (LOG.isDebugEnabled()) {
- LOG.debug(program + " procedure #" + rpcCall.getProcedure());
- }
-
- if (!checkProgram(rpcCall.getProgram())) {
- return programMismatch(out, rpcCall);
- }
-
- if (!checkProgramVersion(rpcCall.getVersion())) {
- return programVersionMismatch(out, rpcCall);
- }
-
- // Check for duplicate requests in the cache for non-idempotent requests
- boolean idempotent = rpcCallCache != null && !isIdempotent(rpcCall);
- if (idempotent) {
- CacheEntry entry = rpcCallCache.checkOrAddToCache(client, rpcCall.getXid());
- if (entry != null) { // in ache
- if (entry.isCompleted()) {
- LOG.info("Sending the cached reply to retransmitted request "
- + rpcCall.getXid());
- return entry.getResponse();
- } else { // else request is in progress
- LOG.info("Retransmitted request, transaction still in progress "
- + rpcCall.getXid());
- // TODO: ignore the request?
- }
- }
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ throws Exception {
+ RpcInfo info = (RpcInfo) e.getMessage();
+ RpcCall call = (RpcCall) info.header();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(program + " procedure #" + call.getProcedure());
}
- XDR response = handleInternal(rpcCall, xdr, out, client, channel);
- if (response.size() == 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("No sync response, expect an async response for request XID="
- + rpcCall.getXid());
- }
+ if (this.progNumber != call.getProgram()) {
+ LOG.warn("Invalid RPC call program " + call.getProgram());
+ RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
+ AcceptState.PROG_UNAVAIL, Verifier.VERIFIER_NONE);
+
+ XDR out = new XDR();
+ reply.write(out);
+ ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+ .buffer());
+ RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
+ RpcUtil.sendRpcResponse(ctx, rsp);
+ return;
+ }
+
+ int ver = call.getVersion();
+ if (ver < lowProgVersion || ver > highProgVersion) {
+ LOG.warn("Invalid RPC call version " + ver);
+ RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
+ AcceptState.PROG_MISMATCH, Verifier.VERIFIER_NONE);
+
+ XDR out = new XDR();
+ reply.write(out);
+ out.writeInt(lowProgVersion);
+ out.writeInt(highProgVersion);
+ ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+ .buffer());
+ RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
+ RpcUtil.sendRpcResponse(ctx, rsp);
+ return;
}
- // Add the request to the cache
- if (idempotent) {
- rpcCallCache.callCompleted(client, rpcCall.getXid(), response);
- }
- return response;
- }
-
- private XDR programMismatch(XDR out, RpcCall call) {
- LOG.warn("Invalid RPC call program " + call.getProgram());
- RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
- AcceptState.PROG_UNAVAIL, new VerifierNone());
- reply.write(out);
- return out;
- }
-
- private XDR programVersionMismatch(XDR out, RpcCall call) {
- LOG.warn("Invalid RPC call version " + call.getVersion());
- RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
- AcceptState.PROG_MISMATCH, new VerifierNone());
- reply.write(out);
- out.writeInt(lowProgVersion);
- out.writeInt(highProgVersion);
- return out;
- }
-
- private boolean checkProgram(int progNumber) {
- return this.progNumber == progNumber;
- }
-
- /** Return true if a the program version in rpcCall is supported */
- private boolean checkProgramVersion(int programVersion) {
- return programVersion >= lowProgVersion
- && programVersion <= highProgVersion;
+ handleInternal(ctx, info);
}
+
+ protected abstract void handleInternal(ChannelHandlerContext ctx, RpcInfo info);
@Override
public String toString() {
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java?rev=1527741&r1=1527740&r2=1527741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java Mon Sep 30 19:37:33 2013
@@ -17,17 +17,23 @@
*/
package org.apache.hadoop.oncrpc;
+import java.nio.ByteBuffer;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
-public class RpcUtil {
+public final class RpcUtil {
/**
- * The XID in RPC call. It is used for starting with new seed after each reboot.
+ * The XID in RPC call. It is used for starting with new seed after each
+ * reboot.
*/
private static int xid = (int) (System.currentTimeMillis() / 1000) << 12;
@@ -35,10 +41,27 @@ public class RpcUtil {
return xid = ++xid + caller.hashCode();
}
+ public static void sendRpcResponse(ChannelHandlerContext ctx,
+ RpcResponse response) {
+ Channels.fireMessageReceived(ctx, response);
+ }
+
public static FrameDecoder constructRpcFrameDecoder() {
return new RpcFrameDecoder();
}
+ public static final SimpleChannelUpstreamHandler STAGE_RPC_MESSAGE_PARSER = new RpcMessageParserStage();
+ public static final SimpleChannelUpstreamHandler STAGE_RPC_TCP_RESPONSE = new RpcTcpResponseStage();
+ public static final SimpleChannelUpstreamHandler STAGE_RPC_UDP_RESPONSE = new RpcUdpResponseStage();
+
+ /**
+ * An RPC client can separate a RPC message into several frames (i.e.,
+ * fragments) when transferring it across the wire. RpcFrameDecoder
+ * reconstructs a full RPC message from these fragments.
+ *
+ * RpcFrameDecoder is a stateful pipeline stage. It has to be constructed for
+ * each RPC client.
+ */
static class RpcFrameDecoder extends FrameDecoder {
public static final Log LOG = LogFactory.getLog(RpcFrameDecoder.class);
private ChannelBuffer currentFrame;
@@ -78,4 +101,68 @@ public class RpcUtil {
}
}
}
+
+ /**
+ * RpcMessageParserStage parses the network bytes and encapsulates the RPC
+ * request into a RpcInfo instance.
+ */
+ static final class RpcMessageParserStage extends SimpleChannelUpstreamHandler {
+ private static final Log LOG = LogFactory
+ .getLog(RpcMessageParserStage.class);
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ throws Exception {
+ ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+ ByteBuffer b = buf.toByteBuffer().asReadOnlyBuffer();
+ XDR in = new XDR(b, XDR.State.READING);
+
+ RpcInfo info = null;
+ try {
+ RpcCall callHeader = RpcCall.read(in);
+ ChannelBuffer dataBuffer = ChannelBuffers.wrappedBuffer(in.buffer()
+ .slice());
+ info = new RpcInfo(callHeader, dataBuffer, ctx, e.getChannel(),
+ e.getRemoteAddress());
+ } catch (Exception exc) {
+ LOG.info("Malfromed RPC request from " + e.getRemoteAddress());
+ }
+
+ if (info != null) {
+ Channels.fireMessageReceived(ctx, info);
+ }
+ }
+ }
+
+ /**
+ * RpcTcpResponseStage sends an RpcResponse across the wire with the
+ * appropriate fragment header.
+ */
+ private static class RpcTcpResponseStage extends SimpleChannelUpstreamHandler {
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ throws Exception {
+ RpcResponse r = (RpcResponse) e.getMessage();
+ byte[] fragmentHeader = XDR.recordMark(r.data().readableBytes(), true);
+ ChannelBuffer header = ChannelBuffers.wrappedBuffer(fragmentHeader);
+ ChannelBuffer d = ChannelBuffers.wrappedBuffer(header, r.data());
+ e.getChannel().write(d);
+ }
+ }
+
+ /**
+ * RpcUdpResponseStage sends an RpcResponse as a UDP packet, which does not
+ * require a fragment header.
+ */
+ private static final class RpcUdpResponseStage extends
+ SimpleChannelUpstreamHandler {
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ throws Exception {
+ RpcResponse r = (RpcResponse) e.getMessage();
+ e.getChannel().write(r.data(), r.remoteAddress());
+ }
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java?rev=1527741&r1=1527740&r2=1527741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java Mon Sep 30 19:37:33 2013
@@ -27,6 +27,7 @@ import org.jboss.netty.channel.ChannelFa
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
/**
@@ -35,8 +36,7 @@ import org.jboss.netty.channel.socket.ni
public class SimpleTcpServer {
public static final Log LOG = LogFactory.getLog(SimpleTcpServer.class);
protected final int port;
- protected final ChannelPipelineFactory pipelineFactory;
- protected final RpcProgram rpcProgram;
+ protected final SimpleChannelUpstreamHandler rpcProgram;
/** The maximum number of I/O worker threads */
protected final int workerCount;
@@ -50,18 +50,6 @@ public class SimpleTcpServer {
this.port = port;
this.rpcProgram = program;
this.workerCount = workercount;
- this.pipelineFactory = getPipelineFactory();
- }
-
- public ChannelPipelineFactory getPipelineFactory() {
- return new ChannelPipelineFactory() {
- @Override
- public ChannelPipeline getPipeline() {
- return Channels.pipeline(
- RpcUtil.constructRpcFrameDecoder(),
- new SimpleTcpServerHandler(rpcProgram));
- }
- };
}
public void run() {
@@ -78,7 +66,15 @@ public class SimpleTcpServer {
}
ServerBootstrap bootstrap = new ServerBootstrap(factory);
- bootstrap.setPipelineFactory(pipelineFactory);
+ bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ return Channels.pipeline(RpcUtil.constructRpcFrameDecoder(),
+ RpcUtil.STAGE_RPC_MESSAGE_PARSER, rpcProgram,
+ RpcUtil.STAGE_RPC_TCP_RESPONSE);
+ }
+ });
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.keepAlive", true);
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java?rev=1527741&r1=1527740&r2=1527741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java Mon Sep 30 19:37:33 2013
@@ -23,9 +23,8 @@ import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.DatagramChannelFactory;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
@@ -38,20 +37,13 @@ public class SimpleUdpServer {
private final int RECEIVE_BUFFER_SIZE = 65536;
protected final int port;
- protected final ChannelPipelineFactory pipelineFactory;
- protected final RpcProgram rpcProgram;
+ protected final SimpleChannelUpstreamHandler rpcProgram;
protected final int workerCount;
- public SimpleUdpServer(int port, RpcProgram program, int workerCount) {
+ public SimpleUdpServer(int port, SimpleChannelUpstreamHandler program, int workerCount) {
this.port = port;
this.rpcProgram = program;
this.workerCount = workerCount;
- this.pipelineFactory = new ChannelPipelineFactory() {
- @Override
- public ChannelPipeline getPipeline() {
- return Channels.pipeline(new SimpleUdpServerHandler(rpcProgram));
- }
- };
}
public void run() {
@@ -60,8 +52,9 @@ public class SimpleUdpServer {
Executors.newCachedThreadPool(), workerCount);
ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);
- ChannelPipeline p = b.getPipeline();
- p.addLast("handler", new SimpleUdpServerHandler(rpcProgram));
+ b.setPipeline(Channels.pipeline(
+ RpcUtil.STAGE_RPC_MESSAGE_PARSER, rpcProgram,
+ RpcUtil.STAGE_RPC_UDP_RESPONSE));
b.setOption("broadcast", "false");
b.setOption("sendBufferSize", SEND_BUFFER_SIZE);
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java?rev=1527741&r1=1527740&r2=1527741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java Mon Sep 30 19:37:33 2013
@@ -93,6 +93,10 @@ public final class XDR {
return n;
}
+ public ByteBuffer buffer() {
+ return buf.duplicate();
+ }
+
public int size() {
// TODO: This overloading intends to be compatible with the semantics of
// the previous version of the class. This function should be separated into
@@ -219,7 +223,7 @@ public final class XDR {
return xdr.buf.remaining() >= len;
}
- private static byte[] recordMark(int size, boolean last) {
+ static byte[] recordMark(int size, boolean last) {
byte[] b = new byte[SIZEOF_INT];
ByteBuffer buf = ByteBuffer.wrap(b);
buf.putInt(!last ? size : size | 0x80000000);
@@ -259,9 +263,8 @@ public final class XDR {
@VisibleForTesting
public byte[] getBytes() {
- ByteBuffer d = buf.duplicate();
- byte[] b = new byte[d.position()];
- d.flip();
+ ByteBuffer d = asReadOnlyWrap().buffer();
+ byte[] b = new byte[d.remaining()];
d.get(b);
return b;
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/security/Verifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/security/Verifier.java?rev=1527741&r1=1527740&r2=1527741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/security/Verifier.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/security/Verifier.java Mon Sep 30 19:37:33 2013
@@ -18,16 +18,17 @@
package org.apache.hadoop.oncrpc.security;
import org.apache.hadoop.oncrpc.XDR;
-import org.apache.hadoop.oncrpc.security.RpcAuthInfo.AuthFlavor;
/**
* Base class for verifier. Currently our authentication only supports 3 types
- * of auth flavors: {@link AuthFlavor#AUTH_NONE}, {@link AuthFlavor#AUTH_SYS},
- * and {@link AuthFlavor#RPCSEC_GSS}. Thus for verifier we only need to handle
+ * of auth flavors: {@link RpcAuthInfo.AuthFlavor#AUTH_NONE}, {@link RpcAuthInfo.AuthFlavor#AUTH_SYS},
+ * and {@link RpcAuthInfo.AuthFlavor#RPCSEC_GSS}. Thus for verifier we only need to handle
* AUTH_NONE and RPCSEC_GSS
*/
public abstract class Verifier extends RpcAuthInfo {
+ public static final Verifier VERIFIER_NONE = new VerifierNone();
+
protected Verifier(AuthFlavor flavor) {
super(flavor);
}
@@ -61,6 +62,4 @@ public abstract class Verifier extends R
}
verifier.write(xdr);
}
-
-
}
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java?rev=1527741&r1=1527740&r2=1527741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java Mon Sep 30 19:37:33 2013
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.portmap;
-import java.net.InetAddress;
import java.util.HashMap;
import java.util.Map.Entry;
import java.util.Set;
@@ -26,10 +25,15 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.oncrpc.RpcAcceptedReply;
import org.apache.hadoop.oncrpc.RpcCall;
+import org.apache.hadoop.oncrpc.RpcInfo;
import org.apache.hadoop.oncrpc.RpcProgram;
+import org.apache.hadoop.oncrpc.RpcResponse;
+import org.apache.hadoop.oncrpc.RpcUtil;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.VerifierNone;
-import org.jboss.netty.channel.Channel;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.ChannelHandlerContext;
/**
* An rpcbind request handler.
@@ -44,7 +48,7 @@ public class RpcProgramPortmap extends R
private final HashMap<String, PortmapMapping> map;
public RpcProgramPortmap() {
- super("portmap", "localhost", RPCB_PORT, PROGRAM, VERSION, VERSION, 0);
+ super("portmap", "localhost", RPCB_PORT, PROGRAM, VERSION, VERSION);
map = new HashMap<String, PortmapMapping>(256);
}
@@ -130,10 +134,15 @@ public class RpcProgramPortmap extends R
}
@Override
- public XDR handleInternal(RpcCall rpcCall, XDR in, XDR out,
- InetAddress client, Channel channel) {
+ public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
+ RpcCall rpcCall = (RpcCall) info.header();
final Procedure portmapProc = Procedure.fromValue(rpcCall.getProcedure());
int xid = rpcCall.getXid();
+ byte[] data = new byte[info.data().readableBytes()];
+ info.data().readBytes(data);
+ XDR in = new XDR(data);
+ XDR out = new XDR();
+
if (portmapProc == Procedure.PMAPPROC_NULL) {
out = nullOp(xid, in, out);
} else if (portmapProc == Procedure.PMAPPROC_SET) {
@@ -148,11 +157,14 @@ public class RpcProgramPortmap extends R
out = getport(xid, in, out);
} else {
LOG.info("PortmapHandler unknown rpc procedure=" + portmapProc);
- RpcAcceptedReply.getInstance(xid,
- RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write(
- out);
+ RpcAcceptedReply reply = RpcAcceptedReply.getInstance(xid,
+ RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone());
+ reply.write(out);
}
- return out;
+
+ ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
+ RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
+ RpcUtil.sendRpcResponse(ctx, rsp);
}
@Override
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java?rev=1527741&r1=1527740&r2=1527741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java Mon Sep 30 19:37:33 2013
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertEqu
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import java.net.InetAddress;
import java.nio.ByteBuffer;
import org.apache.hadoop.oncrpc.RpcUtil.RpcFrameDecoder;
@@ -30,6 +29,7 @@ import org.apache.hadoop.oncrpc.security
import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.junit.Test;
@@ -38,7 +38,7 @@ import org.mockito.Mockito;
public class TestFrameDecoder {
private static int port = 12345; // some random server port
- private static XDR result = null;
+ private static int resultSize;
static void testRequest(XDR request) {
SimpleTcpClient tcpClient = new SimpleTcpClient("localhost", port, request,
@@ -49,18 +49,20 @@ public class TestFrameDecoder {
static class TestRpcProgram extends RpcProgram {
protected TestRpcProgram(String program, String host, int port,
- int progNumber, int lowProgVersion, int highProgVersion, int cacheSize) {
- super(program, host, port, progNumber, lowProgVersion, highProgVersion,
- cacheSize);
+ int progNumber, int lowProgVersion, int highProgVersion) {
+ super(program, host, port, progNumber, lowProgVersion, highProgVersion);
}
@Override
- public XDR handleInternal(RpcCall rpcCall, XDR in, XDR out,
- InetAddress client, Channel channel) {
- // Get the final complete request and return a void response.
- result = in;
- RpcAcceptedReply.getAcceptInstance(1234, new VerifierNone()).write(out);
- return out;
+ protected void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
+ resultSize = info.data().readableBytes();
+ RpcAcceptedReply reply = RpcAcceptedReply.getAcceptInstance(1234,
+ new VerifierNone());
+ XDR out = new XDR();
+ reply.write(out);
+ ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
+ RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
+ RpcUtil.sendRpcResponse(ctx, rsp);
}
@Override
@@ -147,21 +149,22 @@ public class TestFrameDecoder {
public void testFrames() {
RpcProgram program = new TestFrameDecoder.TestRpcProgram("TestRpcProgram",
- "localhost", port, 100000, 1, 2, 100);
+ "localhost", port, 100000, 1, 2);
SimpleTcpServer tcpServer = new SimpleTcpServer(port, program, 1);
tcpServer.run();
XDR xdrOut = createGetportMount();
+ int headerSize = xdrOut.size();
int bufsize = 2 * 1024 * 1024;
byte[] buffer = new byte[bufsize];
xdrOut.writeFixedOpaque(buffer);
- int requestSize = xdrOut.size();
+ int requestSize = xdrOut.size() - headerSize;
// Send the request to the server
testRequest(xdrOut);
// Verify the server got the request with right size
- assertTrue(requestSize == result.size());
+ assertEquals(requestSize, resultSize);
}
static void createPortmapXDRheader(XDR xdr_out, int procedure) {
@@ -173,10 +176,6 @@ public class TestFrameDecoder {
static XDR createGetportMount() {
XDR xdr_out = new XDR();
createPortmapXDRheader(xdr_out, 3);
- xdr_out.writeInt(0); // AUTH_NULL
- xdr_out.writeInt(0); // cred len
- xdr_out.writeInt(0); // verifier AUTH_NULL
- xdr_out.writeInt(0); // verf len
return xdr_out;
}
/*
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java?rev=1527741&r1=1527740&r2=1527741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java Mon Sep 30 19:37:33 2013
@@ -32,6 +32,8 @@ import org.apache.hadoop.oncrpc.RpcCallC
import org.apache.hadoop.oncrpc.RpcCallCache.ClientRequest;
import org.junit.Test;
+import static org.mockito.Mockito.*;
+
/**
* Unit tests for {@link RpcCallCache}
*/
@@ -67,7 +69,7 @@ public class TestRpcCallCache {
validateInprogressCacheEntry(e);
// Set call as completed
- XDR response = new XDR();
+ RpcResponse response = mock(RpcResponse.class);
cache.callCompleted(clientIp, xid, response);
e = cache.checkOrAddToCache(clientIp, xid);
validateCompletedCacheEntry(e, response);
@@ -79,7 +81,7 @@ public class TestRpcCallCache {
assertNull(c.getResponse());
}
- private void validateCompletedCacheEntry(CacheEntry c, XDR response) {
+ private void validateCompletedCacheEntry(CacheEntry c, RpcResponse response) {
assertFalse(c.isInProgress());
assertTrue(c.isCompleted());
assertEquals(response, c.getResponse());
@@ -93,7 +95,7 @@ public class TestRpcCallCache {
assertFalse(c.isCompleted());
assertNull(c.getResponse());
- XDR response = new XDR();
+ RpcResponse response = mock(RpcResponse.class);
c.setResponse(response);
validateCompletedCacheEntry(c, response);
}