You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2013/10/30 23:22:15 UTC
svn commit: r1537330 [7/7] - in
/hadoop/common/branches/YARN-321/hadoop-common-project: ./
hadoop-annotations/ hadoop-auth/ hadoop-common/ hadoop-common/dev-support/
hadoop-common/src/ hadoop-common/src/main/bin/ hadoop-common/src/main/conf/
hadoop-com...
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAcceptedReply.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAcceptedReply.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAcceptedReply.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAcceptedReply.java Wed Oct 30 22:21:59 2013
@@ -17,7 +17,7 @@
*/
package org.apache.hadoop.oncrpc;
-import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
+import org.apache.hadoop.oncrpc.security.Verifier;
/**
* Represents RPC message MSG_ACCEPTED reply body. See RFC 1831 for details.
@@ -41,43 +41,42 @@ public class RpcAcceptedReply extends Rp
return ordinal();
}
};
+
+ public static RpcAcceptedReply getAcceptInstance(int xid,
+ Verifier verifier) {
+ return getInstance(xid, AcceptState.SUCCESS, verifier);
+ }
+
+ public static RpcAcceptedReply getInstance(int xid, AcceptState state,
+ Verifier verifier) {
+ return new RpcAcceptedReply(xid, ReplyState.MSG_ACCEPTED, verifier,
+ state);
+ }
- private final RpcAuthInfo verifier;
private final AcceptState acceptState;
- RpcAcceptedReply(int xid, RpcMessage.Type messageType, ReplyState state,
- RpcAuthInfo verifier, AcceptState acceptState) {
- super(xid, messageType, state);
- this.verifier = verifier;
+ RpcAcceptedReply(int xid, ReplyState state, Verifier verifier,
+ AcceptState acceptState) {
+ super(xid, state, verifier);
this.acceptState = acceptState;
}
- public static RpcAcceptedReply read(int xid, RpcMessage.Type messageType,
- ReplyState replyState, XDR xdr) {
- RpcAuthInfo verifier = RpcAuthInfo.read(xdr);
+ public static RpcAcceptedReply read(int xid, ReplyState replyState, XDR xdr) {
+ Verifier verifier = Verifier.readFlavorAndVerifier(xdr);
AcceptState acceptState = AcceptState.fromValue(xdr.readInt());
- return new RpcAcceptedReply(xid, messageType, replyState, verifier,
- acceptState);
- }
-
- public RpcAuthInfo getVerifier() {
- return verifier;
+ return new RpcAcceptedReply(xid, replyState, verifier, acceptState);
}
public AcceptState getAcceptState() {
return acceptState;
}
- public static XDR voidReply(XDR xdr, int xid) {
- return voidReply(xdr, xid, AcceptState.SUCCESS);
- }
-
- public static XDR voidReply(XDR xdr, int xid, AcceptState acceptState) {
+ @Override
+ public XDR write(XDR xdr) {
xdr.writeInt(xid);
- xdr.writeInt(RpcMessage.Type.RPC_REPLY.getValue());
- xdr.writeInt(ReplyState.MSG_ACCEPTED.getValue());
- xdr.writeInt(AuthFlavor.AUTH_NONE.getValue());
- xdr.writeVariableOpaque(new byte[0]);
+ xdr.writeInt(messageType.getValue());
+ xdr.writeInt(replyState.getValue());
+ Verifier.writeFlavorAndVerifier(verifier, xdr);
xdr.writeInt(acceptState.getValue());
return xdr;
}
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCall.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCall.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCall.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCall.java Wed Oct 30 22:21:59 2013
@@ -19,6 +19,8 @@ package org.apache.hadoop.oncrpc;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.oncrpc.security.Credentials;
+import org.apache.hadoop.oncrpc.security.Verifier;
/**
* Represents an RPC message of type RPC call as defined in RFC 1831
@@ -26,21 +28,36 @@ import org.apache.commons.logging.LogFac
public class RpcCall extends RpcMessage {
public static final int RPC_VERSION = 2;
private static final Log LOG = LogFactory.getLog(RpcCall.class);
+
+ public static RpcCall read(XDR xdr) {
+ return new RpcCall(xdr.readInt(), RpcMessage.Type.fromValue(xdr.readInt()),
+ xdr.readInt(), xdr.readInt(), xdr.readInt(), xdr.readInt(),
+ Credentials.readFlavorAndCredentials(xdr),
+ Verifier.readFlavorAndVerifier(xdr));
+ }
+
+ public static RpcCall getInstance(int xid, int program, int version,
+ int procedure, Credentials cred, Verifier verifier) {
+ return new RpcCall(xid, RpcMessage.Type.RPC_CALL, 2, program, version,
+ procedure, cred, verifier);
+ }
+
private final int rpcVersion;
private final int program;
private final int version;
private final int procedure;
- private final RpcAuthInfo credential;
- private final RpcAuthInfo verifier;
+ private final Credentials credentials;
+ private final Verifier verifier;
- protected RpcCall(int xid, RpcMessage.Type messageType, int rpcVersion, int program,
- int version, int procedure, RpcAuthInfo credential, RpcAuthInfo verifier) {
+ protected RpcCall(int xid, RpcMessage.Type messageType, int rpcVersion,
+ int program, int version, int procedure, Credentials credential,
+ Verifier verifier) {
super(xid, messageType);
this.rpcVersion = rpcVersion;
this.program = program;
this.version = version;
this.procedure = procedure;
- this.credential = credential;
+ this.credentials = credential;
this.verifier = verifier;
if (LOG.isTraceEnabled()) {
LOG.trace(this);
@@ -79,29 +96,25 @@ public class RpcCall extends RpcMessage
return procedure;
}
- public RpcAuthInfo getCredential() {
- return credential;
+ public Credentials getCredential() {
+ return credentials;
}
- public RpcAuthInfo getVerifier() {
+ public Verifier getVerifier() {
return verifier;
}
- public static RpcCall read(XDR xdr) {
- return new RpcCall(xdr.readInt(), RpcMessage.Type.fromValue(xdr.readInt()),
- xdr.readInt(), xdr.readInt(),
- xdr.readInt(), xdr.readInt(), RpcAuthInfo.read(xdr),
- RpcAuthInfo.read(xdr));
- }
-
- public static void write(XDR out, int xid, int program, int progVersion,
- int procedure) {
- out.writeInt(xid);
- out.writeInt(RpcMessage.Type.RPC_CALL.getValue());
- out.writeInt(2);
- out.writeInt(program);
- out.writeInt(progVersion);
- out.writeInt(procedure);
+ @Override
+ public XDR write(XDR xdr) {
+ xdr.writeInt(xid);
+ xdr.writeInt(RpcMessage.Type.RPC_CALL.getValue());
+ xdr.writeInt(2);
+ xdr.writeInt(program);
+ xdr.writeInt(version);
+ xdr.writeInt(procedure);
+ Credentials.writeFlavorAndCredentials(credentials, xdr);
+ Verifier.writeFlavorAndVerifier(verifier, xdr);
+ return xdr;
}
@Override
@@ -109,6 +122,6 @@ public class RpcCall extends RpcMessage
return String.format("Xid:%d, messageType:%s, rpcVersion:%d, program:%d,"
+ " version:%d, procedure:%d, credential:%s, verifier:%s", xid,
messageType, rpcVersion, program, version, procedure,
- credential.toString(), verifier.toString());
+ credentials.toString(), verifier.toString());
}
}
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java Wed Oct 30 22:21:59 2013
@@ -44,7 +44,7 @@ import com.google.common.annotations.Vis
public class RpcCallCache {
public static class CacheEntry {
- private XDR response; // null if no response has been sent
+ private RpcResponse response; // null if no response has been sent
public CacheEntry() {
response = null;
@@ -58,11 +58,11 @@ public class RpcCallCache {
return response != null;
}
- public XDR getResponse() {
+ public RpcResponse getResponse() {
return response;
}
- public void setResponse(XDR response) {
+ public void setResponse(RpcResponse response) {
this.response = response;
}
}
@@ -128,13 +128,13 @@ public class RpcCallCache {
}
/** Mark a request as completed and add corresponding response to the cache */
- public void callCompleted(InetAddress clientId, int xid, XDR response) {
+ public void callCompleted(InetAddress clientId, int xid, RpcResponse response) {
ClientRequest req = new ClientRequest(clientId, xid);
CacheEntry e;
synchronized(map) {
e = map.get(req);
}
- e.setResponse(response);
+ e.response = response;
}
/**
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcDeniedReply.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcDeniedReply.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcDeniedReply.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcDeniedReply.java Wed Oct 30 22:21:59 2013
@@ -17,7 +17,7 @@
*/
package org.apache.hadoop.oncrpc;
-import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
+import org.apache.hadoop.oncrpc.security.Verifier;
/**
* Represents RPC message MSG_DENIED reply body. See RFC 1831 for details.
@@ -40,16 +40,16 @@ public class RpcDeniedReply extends RpcR
private final RejectState rejectState;
- RpcDeniedReply(int xid, RpcMessage.Type messageType, ReplyState replyState,
- RejectState rejectState) {
- super(xid, messageType, replyState);
+ public RpcDeniedReply(int xid, ReplyState replyState,
+ RejectState rejectState, Verifier verifier) {
+ super(xid, replyState, verifier);
this.rejectState = rejectState;
}
- public static RpcDeniedReply read(int xid, RpcMessage.Type messageType,
- ReplyState replyState, XDR xdr) {
+ public static RpcDeniedReply read(int xid, ReplyState replyState, XDR xdr) {
+ Verifier verifier = Verifier.readFlavorAndVerifier(xdr);
RejectState rejectState = RejectState.fromValue(xdr.readInt());
- return new RpcDeniedReply(xid, messageType, replyState, rejectState);
+ return new RpcDeniedReply(xid, replyState, rejectState, verifier);
}
public RejectState getRejectState() {
@@ -59,17 +59,17 @@ public class RpcDeniedReply extends RpcR
@Override
public String toString() {
return new StringBuffer().append("xid:").append(xid)
- .append(",messageType:").append(messageType).append("rejectState:")
+ .append(",messageType:").append(messageType).append("verifier_flavor:")
+ .append(verifier.getFlavor()).append("rejectState:")
.append(rejectState).toString();
}
- public static XDR voidReply(XDR xdr, int xid, ReplyState msgAccepted,
- RejectState rejectState) {
+ @Override
+ public XDR write(XDR xdr) {
xdr.writeInt(xid);
- xdr.writeInt(RpcMessage.Type.RPC_REPLY.getValue());
- xdr.writeInt(msgAccepted.getValue());
- xdr.writeInt(AuthFlavor.AUTH_NONE.getValue());
- xdr.writeVariableOpaque(new byte[0]);
+ xdr.writeInt(messageType.getValue());
+ xdr.writeInt(replyState.getValue());
+ Verifier.writeFlavorAndVerifier(verifier, xdr);
xdr.writeInt(rejectState.getValue());
return xdr;
}
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcMessage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcMessage.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcMessage.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcMessage.java Wed Oct 30 22:21:59 2013
@@ -50,6 +50,8 @@ public abstract class RpcMessage {
this.messageType = messageType;
}
+ public abstract XDR write(XDR xdr);
+
public int getXid() {
return xid;
}
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java Wed Oct 30 22:21:59 2013
@@ -18,21 +18,24 @@
package org.apache.hadoop.oncrpc;
import java.io.IOException;
-import java.net.InetAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
-import org.apache.hadoop.oncrpc.RpcCallCache.CacheEntry;
+import org.apache.hadoop.oncrpc.security.Verifier;
import org.apache.hadoop.portmap.PortmapMapping;
import org.apache.hadoop.portmap.PortmapRequest;
-import org.jboss.netty.channel.Channel;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
/**
* Class for writing RPC server programs based on RFC 1050. Extend this class
* and implement {@link #handleInternal} to handle the requests received.
*/
-public abstract class RpcProgram {
+public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
private static final Log LOG = LogFactory.getLog(RpcProgram.class);
public static final int RPCB_PORT = 111;
private final String program;
@@ -41,7 +44,6 @@ public abstract class RpcProgram {
private final int progNumber;
private final int lowProgVersion;
private final int highProgVersion;
- private final RpcCallCache rpcCallCache;
/**
* Constructor
@@ -52,19 +54,15 @@ public abstract class RpcProgram {
* @param progNumber program number as defined in RFC 1050
* @param lowProgVersion lowest version of the specification supported
* @param highProgVersion highest version of the specification supported
- * @param cacheSize size of cache to handle duplciate requests. Size <= 0
- * indicates no cache.
*/
protected RpcProgram(String program, String host, int port, int progNumber,
- int lowProgVersion, int highProgVersion, int cacheSize) {
+ int lowProgVersion, int highProgVersion) {
this.program = program;
this.host = host;
this.port = port;
this.progNumber = progNumber;
this.lowProgVersion = lowProgVersion;
this.highProgVersion = highProgVersion;
- this.rpcCallCache = cacheSize > 0 ? new RpcCallCache(program, cacheSize)
- : null;
}
/**
@@ -102,88 +100,50 @@ public abstract class RpcProgram {
}
}
- /**
- * Handle an RPC request.
- * @param rpcCall RPC call that is received
- * @param in xdr with cursor at reading the remaining bytes of a method call
- * @param out xdr output corresponding to Rpc reply
- * @param client making the Rpc request
- * @param channel connection over which Rpc request is received
- * @return response xdr response
- */
- protected abstract XDR handleInternal(RpcCall rpcCall, XDR in, XDR out,
- InetAddress client, Channel channel);
-
- public XDR handle(XDR xdr, InetAddress client, Channel channel) {
- XDR out = new XDR();
- RpcCall rpcCall = RpcCall.read(xdr);
- if (LOG.isDebugEnabled()) {
- LOG.debug(program + " procedure #" + rpcCall.getProcedure());
- }
-
- if (!checkProgram(rpcCall.getProgram())) {
- return programMismatch(out, rpcCall);
- }
-
- if (!checkProgramVersion(rpcCall.getVersion())) {
- return programVersionMismatch(out, rpcCall);
- }
-
- // Check for duplicate requests in the cache for non-idempotent requests
- boolean idempotent = rpcCallCache != null && !isIdempotent(rpcCall);
- if (idempotent) {
- CacheEntry entry = rpcCallCache.checkOrAddToCache(client, rpcCall.getXid());
- if (entry != null) { // in ache
- if (entry.isCompleted()) {
- LOG.info("Sending the cached reply to retransmitted request "
- + rpcCall.getXid());
- return entry.getResponse();
- } else { // else request is in progress
- LOG.info("Retransmitted request, transaction still in progress "
- + rpcCall.getXid());
- // TODO: ignore the request?
- }
- }
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ throws Exception {
+ RpcInfo info = (RpcInfo) e.getMessage();
+ RpcCall call = (RpcCall) info.header();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(program + " procedure #" + call.getProcedure());
}
- XDR response = handleInternal(rpcCall, xdr, out, client, channel);
- if (response.size() == 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("No sync response, expect an async response for request XID="
- + rpcCall.getXid());
- }
+ if (this.progNumber != call.getProgram()) {
+ LOG.warn("Invalid RPC call program " + call.getProgram());
+ RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
+ AcceptState.PROG_UNAVAIL, Verifier.VERIFIER_NONE);
+
+ XDR out = new XDR();
+ reply.write(out);
+ ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+ .buffer());
+ RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
+ RpcUtil.sendRpcResponse(ctx, rsp);
+ return;
+ }
+
+ int ver = call.getVersion();
+ if (ver < lowProgVersion || ver > highProgVersion) {
+ LOG.warn("Invalid RPC call version " + ver);
+ RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
+ AcceptState.PROG_MISMATCH, Verifier.VERIFIER_NONE);
+
+ XDR out = new XDR();
+ reply.write(out);
+ out.writeInt(lowProgVersion);
+ out.writeInt(highProgVersion);
+ ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+ .buffer());
+ RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
+ RpcUtil.sendRpcResponse(ctx, rsp);
+ return;
}
- // Add the request to the cache
- if (idempotent) {
- rpcCallCache.callCompleted(client, rpcCall.getXid(), response);
- }
- return response;
- }
-
- private XDR programMismatch(XDR out, RpcCall call) {
- LOG.warn("Invalid RPC call program " + call.getProgram());
- RpcAcceptedReply.voidReply(out, call.getXid(), AcceptState.PROG_UNAVAIL);
- return out;
- }
-
- private XDR programVersionMismatch(XDR out, RpcCall call) {
- LOG.warn("Invalid RPC call version " + call.getVersion());
- RpcAcceptedReply.voidReply(out, call.getXid(), AcceptState.PROG_MISMATCH);
- out.writeInt(lowProgVersion);
- out.writeInt(highProgVersion);
- return out;
- }
-
- private boolean checkProgram(int progNumber) {
- return this.progNumber == progNumber;
- }
-
- /** Return true if a the program version in rpcCall is supported */
- private boolean checkProgramVersion(int programVersion) {
- return programVersion >= lowProgVersion
- && programVersion <= highProgVersion;
+ handleInternal(ctx, info);
}
+
+ protected abstract void handleInternal(ChannelHandlerContext ctx, RpcInfo info);
@Override
public String toString() {
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcReply.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcReply.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcReply.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcReply.java Wed Oct 30 22:21:59 2013
@@ -17,6 +17,11 @@
*/
package org.apache.hadoop.oncrpc;
+import org.apache.hadoop.oncrpc.security.RpcAuthInfo;
+import org.apache.hadoop.oncrpc.security.Verifier;
+
+import com.google.common.base.Preconditions;
+
/**
* Represents an RPC message of type RPC reply as defined in RFC 1831
*/
@@ -36,28 +41,35 @@ public abstract class RpcReply extends R
}
}
- private final ReplyState state;
+ protected final ReplyState replyState;
+ protected final Verifier verifier;
- RpcReply(int xid, RpcMessage.Type messageType, ReplyState state) {
- super(xid, messageType);
- this.state = state;
- validateMessageType(RpcMessage.Type.RPC_REPLY);
+ RpcReply(int xid, ReplyState state, Verifier verifier) {
+ super(xid, RpcMessage.Type.RPC_REPLY);
+ this.replyState = state;
+ this.verifier = verifier;
+ }
+
+ public RpcAuthInfo getVerifier() {
+ return verifier;
}
public static RpcReply read(XDR xdr) {
int xid = xdr.readInt();
final Type messageType = Type.fromValue(xdr.readInt());
+ Preconditions.checkState(messageType == RpcMessage.Type.RPC_REPLY);
+
ReplyState stat = ReplyState.fromValue(xdr.readInt());
switch (stat) {
case MSG_ACCEPTED:
- return RpcAcceptedReply.read(xid, messageType, stat, xdr);
+ return RpcAcceptedReply.read(xid, stat, xdr);
case MSG_DENIED:
- return RpcDeniedReply.read(xid, messageType, stat, xdr);
+ return RpcDeniedReply.read(xid, stat, xdr);
}
return null;
}
public ReplyState getState() {
- return state;
+ return replyState;
}
}
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java Wed Oct 30 22:21:59 2013
@@ -17,13 +17,152 @@
*/
package org.apache.hadoop.oncrpc;
-/**
- * The XID in RPC call. It is used for starting with new seed after each reboot.
- */
-public class RpcUtil {
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+public final class RpcUtil {
+ /**
+ * The XID in RPC call. It is used for starting with new seed after each
+ * reboot.
+ */
private static int xid = (int) (System.currentTimeMillis() / 1000) << 12;
public static int getNewXid(String caller) {
return xid = ++xid + caller.hashCode();
}
+
+ public static void sendRpcResponse(ChannelHandlerContext ctx,
+ RpcResponse response) {
+ Channels.fireMessageReceived(ctx, response);
+ }
+
+ public static FrameDecoder constructRpcFrameDecoder() {
+ return new RpcFrameDecoder();
+ }
+
+ public static final SimpleChannelUpstreamHandler STAGE_RPC_MESSAGE_PARSER = new RpcMessageParserStage();
+ public static final SimpleChannelUpstreamHandler STAGE_RPC_TCP_RESPONSE = new RpcTcpResponseStage();
+ public static final SimpleChannelUpstreamHandler STAGE_RPC_UDP_RESPONSE = new RpcUdpResponseStage();
+
+ /**
+ * An RPC client can separate a RPC message into several frames (i.e.,
+ * fragments) when transferring it across the wire. RpcFrameDecoder
+ * reconstructs a full RPC message from these fragments.
+ *
+ * RpcFrameDecoder is a stateful pipeline stage. It has to be constructed for
+ * each RPC client.
+ */
+ static class RpcFrameDecoder extends FrameDecoder {
+ public static final Log LOG = LogFactory.getLog(RpcFrameDecoder.class);
+ private ChannelBuffer currentFrame;
+
+ @Override
+ protected Object decode(ChannelHandlerContext ctx, Channel channel,
+ ChannelBuffer buf) {
+
+ if (buf.readableBytes() < 4)
+ return null;
+
+ buf.markReaderIndex();
+
+ byte[] fragmentHeader = new byte[4];
+ buf.readBytes(fragmentHeader);
+ int length = XDR.fragmentSize(fragmentHeader);
+ boolean isLast = XDR.isLastFragment(fragmentHeader);
+
+ if (buf.readableBytes() < length) {
+ buf.resetReaderIndex();
+ return null;
+ }
+
+ ChannelBuffer newFragment = buf.readSlice(length);
+ if (currentFrame == null) {
+ currentFrame = newFragment;
+ } else {
+ currentFrame = ChannelBuffers.wrappedBuffer(currentFrame, newFragment);
+ }
+
+ if (isLast) {
+ ChannelBuffer completeFrame = currentFrame;
+ currentFrame = null;
+ return completeFrame;
+ } else {
+ return null;
+ }
+ }
+ }
+
+ /**
+ * RpcMessageParserStage parses the network bytes and encapsulates the RPC
+ * request into a RpcInfo instance.
+ */
+ static final class RpcMessageParserStage extends SimpleChannelUpstreamHandler {
+ private static final Log LOG = LogFactory
+ .getLog(RpcMessageParserStage.class);
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ throws Exception {
+ ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+ ByteBuffer b = buf.toByteBuffer().asReadOnlyBuffer();
+ XDR in = new XDR(b, XDR.State.READING);
+
+ RpcInfo info = null;
+ try {
+ RpcCall callHeader = RpcCall.read(in);
+ ChannelBuffer dataBuffer = ChannelBuffers.wrappedBuffer(in.buffer()
+ .slice());
+ info = new RpcInfo(callHeader, dataBuffer, ctx, e.getChannel(),
+ e.getRemoteAddress());
+ } catch (Exception exc) {
+ LOG.info("Malfromed RPC request from " + e.getRemoteAddress());
+ }
+
+ if (info != null) {
+ Channels.fireMessageReceived(ctx, info);
+ }
+ }
+ }
+
+ /**
+ * RpcTcpResponseStage sends an RpcResponse across the wire with the
+ * appropriate fragment header.
+ */
+ private static class RpcTcpResponseStage extends SimpleChannelUpstreamHandler {
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ throws Exception {
+ RpcResponse r = (RpcResponse) e.getMessage();
+ byte[] fragmentHeader = XDR.recordMark(r.data().readableBytes(), true);
+ ChannelBuffer header = ChannelBuffers.wrappedBuffer(fragmentHeader);
+ ChannelBuffer d = ChannelBuffers.wrappedBuffer(header, r.data());
+ e.getChannel().write(d);
+ }
+ }
+
+ /**
+ * RpcUdpResponseStage sends an RpcResponse as a UDP packet, which does not
+ * require a fragment header.
+ */
+ private static final class RpcUdpResponseStage extends
+ SimpleChannelUpstreamHandler {
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ throws Exception {
+ RpcResponse r = (RpcResponse) e.getMessage();
+ e.getChannel().write(r.data(), r.remoteAddress());
+ }
+ }
}
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java Wed Oct 30 22:21:59 2013
@@ -20,8 +20,6 @@ package org.apache.hadoop.oncrpc;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
-import org.apache.hadoop.oncrpc.RpcFrameDecoder;
-import org.apache.hadoop.oncrpc.XDR;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
@@ -55,7 +53,8 @@ public class SimpleTcpClient {
this.pipelineFactory = new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
- return Channels.pipeline(new RpcFrameDecoder(),
+ return Channels.pipeline(
+ RpcUtil.constructRpcFrameDecoder(),
new SimpleTcpClientHandler(request));
}
};
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java Wed Oct 30 22:21:59 2013
@@ -27,6 +27,7 @@ import org.jboss.netty.channel.ChannelFa
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
/**
@@ -35,8 +36,7 @@ import org.jboss.netty.channel.socket.ni
public class SimpleTcpServer {
public static final Log LOG = LogFactory.getLog(SimpleTcpServer.class);
protected final int port;
- protected final ChannelPipelineFactory pipelineFactory;
- protected final RpcProgram rpcProgram;
+ protected final SimpleChannelUpstreamHandler rpcProgram;
/** The maximum number of I/O worker threads */
protected final int workerCount;
@@ -50,17 +50,6 @@ public class SimpleTcpServer {
this.port = port;
this.rpcProgram = program;
this.workerCount = workercount;
- this.pipelineFactory = getPipelineFactory();
- }
-
- public ChannelPipelineFactory getPipelineFactory() {
- return new ChannelPipelineFactory() {
- @Override
- public ChannelPipeline getPipeline() {
- return Channels.pipeline(new RpcFrameDecoder(),
- new SimpleTcpServerHandler(rpcProgram));
- }
- };
}
public void run() {
@@ -77,7 +66,15 @@ public class SimpleTcpServer {
}
ServerBootstrap bootstrap = new ServerBootstrap(factory);
- bootstrap.setPipelineFactory(pipelineFactory);
+ bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ return Channels.pipeline(RpcUtil.constructRpcFrameDecoder(),
+ RpcUtil.STAGE_RPC_MESSAGE_PARSER, rpcProgram,
+ RpcUtil.STAGE_RPC_TCP_RESPONSE);
+ }
+ });
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.keepAlive", true);
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java Wed Oct 30 22:21:59 2013
@@ -57,8 +57,7 @@ public class SimpleUdpClient {
clientSocket.receive(receivePacket);
// Check reply status
- XDR xdr = new XDR();
- xdr.writeFixedOpaque(Arrays.copyOfRange(receiveData, 0,
+ XDR xdr = new XDR(Arrays.copyOfRange(receiveData, 0,
receivePacket.getLength()));
RpcReply reply = RpcReply.read(xdr);
if (reply.getState() != RpcReply.ReplyState.MSG_ACCEPTED) {
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java Wed Oct 30 22:21:59 2013
@@ -23,9 +23,8 @@ import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.DatagramChannelFactory;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
@@ -38,20 +37,13 @@ public class SimpleUdpServer {
private final int RECEIVE_BUFFER_SIZE = 65536;
protected final int port;
- protected final ChannelPipelineFactory pipelineFactory;
- protected final RpcProgram rpcProgram;
+ protected final SimpleChannelUpstreamHandler rpcProgram;
protected final int workerCount;
- public SimpleUdpServer(int port, RpcProgram program, int workerCount) {
+ public SimpleUdpServer(int port, SimpleChannelUpstreamHandler program, int workerCount) {
this.port = port;
this.rpcProgram = program;
this.workerCount = workerCount;
- this.pipelineFactory = new ChannelPipelineFactory() {
- @Override
- public ChannelPipeline getPipeline() {
- return Channels.pipeline(new SimpleUdpServerHandler(rpcProgram));
- }
- };
}
public void run() {
@@ -60,8 +52,9 @@ public class SimpleUdpServer {
Executors.newCachedThreadPool(), workerCount);
ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);
- ChannelPipeline p = b.getPipeline();
- p.addLast("handler", new SimpleUdpServerHandler(rpcProgram));
+ b.setPipeline(Channels.pipeline(
+ RpcUtil.STAGE_RPC_MESSAGE_PARSER, rpcProgram,
+ RpcUtil.STAGE_RPC_UDP_RESPONSE));
b.setOption("broadcast", "false");
b.setOption("sendBufferSize", SEND_BUFFER_SIZE);
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java Wed Oct 30 22:21:59 2013
@@ -17,402 +17,256 @@
*/
package org.apache.hadoop.oncrpc;
-import java.io.PrintStream;
-import java.util.Arrays;
+import java.nio.ByteBuffer;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
/**
* Utility class for building XDR messages based on RFC 4506.
- * <p>
- * This class maintains a buffer into which java types are written as
- * XDR types for building XDR messages. Similarly this class can
- * be used to get java types from an XDR request or response.
- * <p>
- * Currently only a subset of XDR types defined in RFC 4506 are supported.
+ *
+ * Key points of the format:
+ *
+ * <ul>
+ * <li>Primitives are stored in big-endian order (i.e., the default byte order
+ * of ByteBuffer).</li>
+ * <li>Booleans are stored as an integer.</li>
+ * <li>Each field in the message is always aligned by 4.</li>
+ * </ul>
+ *
*/
-public class XDR {
- private final static String HEXES = "0123456789abcdef";
-
- /** Internal buffer for reading or writing to */
- private byte[] bytearr;
-
- /** Place to read from or write to */
- private int cursor;
+public final class XDR {
+ private static final int DEFAULT_INITIAL_CAPACITY = 256;
+ private static final int SIZEOF_INT = 4;
+ private static final int SIZEOF_LONG = 8;
+ private static final byte[] PADDING_BYTES = new byte[] { 0, 0, 0, 0 };
- public XDR() {
- this(new byte[0]);
- }
+ private ByteBuffer buf;
- public XDR(byte[] data) {
- bytearr = Arrays.copyOf(data, data.length);
- cursor = 0;
+ public enum State {
+ READING, WRITING,
}
+ private final State state;
+
/**
- * @param bytes bytes to be appended to internal buffer
+ * Construct a new XDR message buffer.
+ *
+ * @param initialCapacity
+ * the initial capacity of the buffer.
*/
- private void append(byte[] bytesToAdd) {
- bytearr = append(bytearr, bytesToAdd);
+ public XDR(int initialCapacity) {
+ this(ByteBuffer.allocate(initialCapacity), State.WRITING);
}
- public int size() {
- return bytearr.length;
+ public XDR() {
+ this(DEFAULT_INITIAL_CAPACITY);
}
- /** Skip some bytes by moving the cursor */
- public void skip(int size) {
- cursor += size;
+ public XDR(ByteBuffer buf, State state) {
+ this.buf = buf;
+ this.state = state;
}
/**
- * Write Java primitive integer as XDR signed integer.
- *
- * Definition of XDR signed integer from RFC 4506:
- * <pre>
- * An XDR signed integer is a 32-bit datum that encodes an integer in
- * the range [-2147483648,2147483647]. The integer is represented in
- * two's complement notation. The most and least significant bytes are
- * 0 and 3, respectively. Integers are declared as follows:
+ * Wraps a byte array as a read-only XDR message. There's no copy involved,
+ * thus it is the client's responsibility to ensure that the byte array
+ * remains unmodified when using the XDR object.
*
- * int identifier;
- *
- * (MSB) (LSB)
- * +-------+-------+-------+-------+
- * |byte 0 |byte 1 |byte 2 |byte 3 | INTEGER
- * +-------+-------+-------+-------+
- * <------------32 bits------------>
- * </pre>
+ * @param src
+ * the byte array to be wrapped.
*/
- public void writeInt(int data) {
- append(toBytes(data));
+ public XDR(byte[] src) {
+ this(ByteBuffer.wrap(src).asReadOnlyBuffer(), State.READING);
+ }
+
+ public XDR asReadOnlyWrap() {
+ ByteBuffer b = buf.asReadOnlyBuffer();
+ if (state == State.WRITING) {
+ b.flip();
+ }
+
+ XDR n = new XDR(b, State.READING);
+ return n;
+ }
+
+ public ByteBuffer buffer() {
+ return buf.duplicate();
+ }
+
+ public int size() {
+ // TODO: This overloading intends to be compatible with the semantics of
+ // the previous version of the class. This function should be separated into
+ // two with clear semantics.
+ return state == State.READING ? buf.limit() : buf.position();
}
- /**
- * Read an XDR signed integer and return as Java primitive integer.
- */
public int readInt() {
- byte byte0 = bytearr[cursor++];
- byte byte1 = bytearr[cursor++];
- byte byte2 = bytearr[cursor++];
- byte byte3 = bytearr[cursor++];
- return (XDR.toShort(byte0) << 24) + (XDR.toShort(byte1) << 16)
- + (XDR.toShort(byte2) << 8) + XDR.toShort(byte3);
+ Preconditions.checkState(state == State.READING);
+ return buf.getInt();
}
- /**
- * Write Java primitive boolean as an XDR boolean.
- *
- * Definition of XDR boolean from RFC 4506:
- * <pre>
- * Booleans are important enough and occur frequently enough to warrant
- * their own explicit type in the standard. Booleans are declared as
- * follows:
- *
- * bool identifier;
- *
- * This is equivalent to:
- *
- * enum { FALSE = 0, TRUE = 1 } identifier;
- * </pre>
- */
- public void writeBoolean(boolean data) {
- this.writeInt(data ? 1 : 0);
+ public void writeInt(int v) {
+ ensureFreeSpace(SIZEOF_INT);
+ buf.putInt(v);
}
- /**
- * Read an XDR boolean and return as Java primitive boolean.
- */
public boolean readBoolean() {
- return readInt() == 0 ? false : true;
+ Preconditions.checkState(state == State.READING);
+ return buf.getInt() != 0;
}
- /**
- * Write Java primitive long to an XDR signed long.
- *
- * Definition of XDR signed long from RFC 4506:
- * <pre>
- * The standard also defines 64-bit (8-byte) numbers called hyper
- * integers and unsigned hyper integers. Their representations are the
- * obvious extensions of integer and unsigned integer defined above.
- * They are represented in two's complement notation.The most and
- * least significant bytes are 0 and 7, respectively. Their
- * declarations:
- *
- * hyper identifier; unsigned hyper identifier;
- *
- * (MSB) (LSB)
- * +-------+-------+-------+-------+-------+-------+-------+-------+
- * |byte 0 |byte 1 |byte 2 |byte 3 |byte 4 |byte 5 |byte 6 |byte 7 |
- * +-------+-------+-------+-------+-------+-------+-------+-------+
- * <----------------------------64 bits---------------------------->
- * HYPER INTEGER
- * UNSIGNED HYPER INTEGER
- * </pre>
- */
- public void writeLongAsHyper(long data) {
- byte byte0 = (byte) ((data & 0xff00000000000000l) >> 56);
- byte byte1 = (byte) ((data & 0x00ff000000000000l) >> 48);
- byte byte2 = (byte) ((data & 0x0000ff0000000000l) >> 40);
- byte byte3 = (byte) ((data & 0x000000ff00000000l) >> 32);
- byte byte4 = (byte) ((data & 0x00000000ff000000l) >> 24);
- byte byte5 = (byte) ((data & 0x0000000000ff0000l) >> 16);
- byte byte6 = (byte) ((data & 0x000000000000ff00l) >> 8);
- byte byte7 = (byte) ((data & 0x00000000000000ffl));
- this.append(new byte[] { byte0, byte1, byte2, byte3, byte4, byte5, byte6, byte7 });
+ public void writeBoolean(boolean v) {
+ ensureFreeSpace(SIZEOF_INT);
+ buf.putInt(v ? 1 : 0);
}
- /**
- * Read XDR signed hyper and return as java primitive long.
- */
public long readHyper() {
- byte byte0 = bytearr[cursor++];
- byte byte1 = bytearr[cursor++];
- byte byte2 = bytearr[cursor++];
- byte byte3 = bytearr[cursor++];
- byte byte4 = bytearr[cursor++];
- byte byte5 = bytearr[cursor++];
- byte byte6 = bytearr[cursor++];
- byte byte7 = bytearr[cursor++];
- return ((long) XDR.toShort(byte0) << 56)
- + ((long) XDR.toShort(byte1) << 48) + ((long) XDR.toShort(byte2) << 40)
- + ((long) XDR.toShort(byte3) << 32) + ((long) XDR.toShort(byte4) << 24)
- + ((long) XDR.toShort(byte5) << 16) + ((long) XDR.toShort(byte6) << 8)
- + XDR.toShort(byte7);
+ Preconditions.checkState(state == State.READING);
+ return buf.getLong();
}
- /**
- * Write a Java primitive byte array to XDR fixed-length opaque data.
- *
- * Defintion of fixed-length opaque data from RFC 4506:
- * <pre>
- * At times, fixed-length uninterpreted data needs to be passed among
- * machines. This data is called "opaque" and is declared as follows:
- *
- * opaque identifier[n];
- *
- * where the constant n is the (static) number of bytes necessary to
- * contain the opaque data. If n is not a multiple of four, then the n
- * bytes are followed by enough (0 to 3) residual zero bytes, r, to make
- * the total byte count of the opaque object a multiple of four.
- *
- * 0 1 ...
- * +--------+--------+...+--------+--------+...+--------+
- * | byte 0 | byte 1 |...|byte n-1| 0 |...| 0 |
- * +--------+--------+...+--------+--------+...+--------+
- * |<-----------n bytes---------->|<------r bytes------>|
- * |<-----------n+r (where (n+r) mod 4 = 0)------------>|
- * FIXED-LENGTH OPAQUE
- * </pre>
- */
- public void writeFixedOpaque(byte[] data) {
- writeFixedOpaque(data, data.length);
- }
-
- public void writeFixedOpaque(byte[] data, int length) {
- append(Arrays.copyOf(data, length + XDR.pad(length, 4)));
+ public void writeLongAsHyper(long v) {
+ ensureFreeSpace(SIZEOF_LONG);
+ buf.putLong(v);
}
public byte[] readFixedOpaque(int size) {
- byte[] ret = new byte[size];
- for(int i = 0; i < size; i++) {
- ret[i] = bytearr[cursor];
- cursor++;
- }
+ Preconditions.checkState(state == State.READING);
+ byte[] r = new byte[size];
+ buf.get(r);
+ alignPosition();
+ return r;
+ }
- for(int i = 0; i < XDR.pad(size, 4); i++) {
- cursor++;
- }
- return ret;
+ public void writeFixedOpaque(byte[] src, int length) {
+ ensureFreeSpace(alignUp(length));
+ buf.put(src, 0, length);
+ writePadding();
}
- /**
- * Write a Java primitive byte array as XDR variable-length opque data.
- *
- * Definition of XDR variable-length opaque data RFC 4506:
- *
- * <pre>
- * The standard also provides for variable-length (counted) opaque data,
- * defined as a sequence of n (numbered 0 through n-1) arbitrary bytes
- * to be the number n encoded as an unsigned integer (as described
- * below), and followed by the n bytes of the sequence.
- *
- * Byte m of the sequence always precedes byte m+1 of the sequence, and
- * byte 0 of the sequence always follows the sequence's length (count).
- * If n is not a multiple of four, then the n bytes are followed by
- * enough (0 to 3) residual zero bytes, r, to make the total byte count
- * a multiple of four. Variable-length opaque data is declared in the
- * following way:
- *
- * opaque identifier<m>;
- * or
- * opaque identifier<>;
- *
- * The constant m denotes an upper bound of the number of bytes that the
- * sequence may contain. If m is not specified, as in the second
- * declaration, it is assumed to be (2**32) - 1, the maximum length.
- *
- * The constant m would normally be found in a protocol specification.
- * For example, a filing protocol may state that the maximum data
- * transfer size is 8192 bytes, as follows:
- *
- * opaque filedata<8192>;
- *
- * 0 1 2 3 4 5 ...
- * +-----+-----+-----+-----+-----+-----+...+-----+-----+...+-----+
- * | length n |byte0|byte1|...| n-1 | 0 |...| 0 |
- * +-----+-----+-----+-----+-----+-----+...+-----+-----+...+-----+
- * |<-------4 bytes------->|<------n bytes------>|<---r bytes--->|
- * |<----n+r (where (n+r) mod 4 = 0)---->|
- * VARIABLE-LENGTH OPAQUE
- *
- * It is an error to encode a length greater than the maximum described
- * in the specification.
- * </pre>
- */
- public void writeVariableOpaque(byte[] data) {
- this.writeInt(data.length);
- this.writeFixedOpaque(data);
+ public void writeFixedOpaque(byte[] src) {
+ writeFixedOpaque(src, src.length);
}
public byte[] readVariableOpaque() {
- int size = this.readInt();
- return size != 0 ? this.readFixedOpaque(size) : null;
+ Preconditions.checkState(state == State.READING);
+ int size = readInt();
+ return readFixedOpaque(size);
}
- public void skipVariableOpaque() {
- int length= this.readInt();
- this.skip(length+XDR.pad(length, 4));
- }
-
- /**
- * Write Java String as XDR string.
- *
- * Definition of XDR string from RFC 4506:
- *
- * <pre>
- * The standard defines a string of n (numbered 0 through n-1) ASCII
- * bytes to be the number n encoded as an unsigned integer (as described
- * above), and followed by the n bytes of the string. Byte m of the
- * string always precedes byte m+1 of the string, and byte 0 of the
- * string always follows the string's length. If n is not a multiple of
- * four, then the n bytes are followed by enough (0 to 3) residual zero
- * bytes, r, to make the total byte count a multiple of four. Counted
- * byte strings are declared as follows:
- *
- * string object<m>;
- * or
- * string object<>;
- *
- * The constant m denotes an upper bound of the number of bytes that a
- * string may contain. If m is not specified, as in the second
- * declaration, it is assumed to be (2**32) - 1, the maximum length.
- * The constant m would normally be found in a protocol specification.
- * For example, a filing protocol may state that a file name can be no
- * longer than 255 bytes, as follows:
- *
- * string filename<255>;
- *
- * 0 1 2 3 4 5 ...
- * +-----+-----+-----+-----+-----+-----+...+-----+-----+...+-----+
- * | length n |byte0|byte1|...| n-1 | 0 |...| 0 |
- * +-----+-----+-----+-----+-----+-----+...+-----+-----+...+-----+
- * |<-------4 bytes------->|<------n bytes------>|<---r bytes--->|
- * |<----n+r (where (n+r) mod 4 = 0)---->|
- * STRING
- * It is an error to encode a length greater than the maximum described
- * in the specification.
- * </pre>
- */
- public void writeString(String data) {
- this.writeVariableOpaque(data.getBytes());
+ public void writeVariableOpaque(byte[] src) {
+ ensureFreeSpace(SIZEOF_INT + alignUp(src.length));
+ buf.putInt(src.length);
+ writeFixedOpaque(src);
}
public String readString() {
- return new String(this.readVariableOpaque());
+ return new String(readVariableOpaque());
}
- public void dump(PrintStream out) {
- for(int i = 0; i < bytearr.length; i += 4) {
- out.println(hex(bytearr[i]) + " " + hex(bytearr[i + 1]) + " "
- + hex(bytearr[i + 2]) + " " + hex(bytearr[i + 3]));
- }
+ public void writeString(String s) {
+ writeVariableOpaque(s.getBytes());
}
- @VisibleForTesting
- public byte[] getBytes() {
- return Arrays.copyOf(bytearr, bytearr.length);
+ private void writePadding() {
+ Preconditions.checkState(state == State.WRITING);
+ int p = pad(buf.position());
+ ensureFreeSpace(p);
+ buf.put(PADDING_BYTES, 0, p);
}
- public static byte[] append(byte[] bytes, byte[] bytesToAdd) {
- byte[] newByteArray = new byte[bytes.length + bytesToAdd.length];
- System.arraycopy(bytes, 0, newByteArray, 0, bytes.length);
- System.arraycopy(bytesToAdd, 0, newByteArray, bytes.length, bytesToAdd.length);
- return newByteArray;
+ private int alignUp(int length) {
+ return length + pad(length);
}
- private static int pad(int x, int y) {
- return x % y == 0 ? 0 : y - (x % y);
+ private int pad(int length) {
+ switch (length % 4) {
+ case 1:
+ return 3;
+ case 2:
+ return 2;
+ case 3:
+ return 1;
+ default:
+ return 0;
+ }
}
- static byte[] toBytes(int n) {
- byte[] ret = { (byte) ((n & 0xff000000) >> 24),
- (byte) ((n & 0x00ff0000) >> 16), (byte) ((n & 0x0000ff00) >> 8),
- (byte) (n & 0x000000ff) };
- return ret;
+ private void alignPosition() {
+ buf.position(alignUp(buf.position()));
}
- private static short toShort(byte b) {
- return b < 0 ? (short) (b + 256): (short) b;
+ private void ensureFreeSpace(int size) {
+ Preconditions.checkState(state == State.WRITING);
+ if (buf.remaining() < size) {
+ int newCapacity = buf.capacity() * 2;
+ int newRemaining = buf.capacity() + buf.remaining();
+
+ while (newRemaining < size) {
+ newRemaining += newCapacity;
+ newCapacity *= 2;
+ }
+
+ ByteBuffer newbuf = ByteBuffer.allocate(newCapacity);
+ buf.flip();
+ newbuf.put(buf);
+ buf = newbuf;
+ }
}
- private static String hex(byte b) {
- return "" + HEXES.charAt((b & 0xF0) >> 4) + HEXES.charAt((b & 0x0F));
+ /** check if the rest of data has more than len bytes */
+ public static boolean verifyLength(XDR xdr, int len) {
+ return xdr.buf.remaining() >= len;
}
- private static byte[] recordMark(int size, boolean last) {
- return toBytes(!last ? size : size | 0x80000000);
+ static byte[] recordMark(int size, boolean last) {
+ byte[] b = new byte[SIZEOF_INT];
+ ByteBuffer buf = ByteBuffer.wrap(b);
+ buf.putInt(!last ? size : size | 0x80000000);
+ return b;
}
- public static byte[] getVariableOpque(byte[] data) {
- byte[] bytes = toBytes(data.length);
- return append(bytes, Arrays.copyOf(data, data.length + XDR.pad(data.length, 4)));
+ /** Write an XDR message to a TCP ChannelBuffer */
+ public static ChannelBuffer writeMessageTcp(XDR request, boolean last) {
+ Preconditions.checkState(request.state == XDR.State.WRITING);
+ ByteBuffer b = request.buf.duplicate();
+ b.flip();
+ byte[] fragmentHeader = XDR.recordMark(b.limit(), last);
+ ByteBuffer headerBuf = ByteBuffer.wrap(fragmentHeader);
+
+ // TODO: Investigate whether making a copy of the buffer is necessary.
+ return ChannelBuffers.copiedBuffer(headerBuf, b);
+ }
+
+ /** Write an XDR message to a UDP ChannelBuffer */
+ public static ChannelBuffer writeMessageUdp(XDR response) {
+ Preconditions.checkState(response.state == XDR.State.READING);
+ // TODO: Investigate whether making a copy of the buffer is necessary.
+ return ChannelBuffers.copiedBuffer(response.buf);
}
public static int fragmentSize(byte[] mark) {
- int n = (XDR.toShort(mark[0]) << 24) + (XDR.toShort(mark[1]) << 16)
- + (XDR.toShort(mark[2]) << 8) + XDR.toShort(mark[3]);
+ ByteBuffer b = ByteBuffer.wrap(mark);
+ int n = b.getInt();
return n & 0x7fffffff;
}
public static boolean isLastFragment(byte[] mark) {
- int n = (XDR.toShort(mark[0]) << 24) + (XDR.toShort(mark[1]) << 16)
- + (XDR.toShort(mark[2]) << 8) + XDR.toShort(mark[3]);
+ ByteBuffer b = ByteBuffer.wrap(mark);
+ int n = b.getInt();
return (n & 0x80000000) != 0;
}
- /** check if the rest of data has more than <len> bytes */
- public static boolean verifyLength(XDR xdr, int len) {
- return (xdr.bytearr.length - xdr.cursor) >= len;
- }
-
- /** Write an XDR message to a TCP ChannelBuffer */
- public static ChannelBuffer writeMessageTcp(XDR request, boolean last) {
- byte[] fragmentHeader = XDR.recordMark(request.bytearr.length, last);
- ChannelBuffer outBuf = ChannelBuffers.buffer(fragmentHeader.length
- + request.bytearr.length);
- outBuf.writeBytes(fragmentHeader);
- outBuf.writeBytes(request.bytearr);
- return outBuf;
- }
+ @VisibleForTesting
+ public byte[] getBytes() {
+ ByteBuffer d = asReadOnlyWrap().buffer();
+ byte[] b = new byte[d.remaining()];
+ d.get(b);
- /** Write an XDR message to a UDP ChannelBuffer */
- public static ChannelBuffer writeMessageUdp(XDR response) {
- ChannelBuffer outBuf = ChannelBuffers.buffer(response.bytearr.length);
- outBuf.writeBytes(response.bytearr);
- return outBuf;
+ return b;
}
-}
+}
\ No newline at end of file
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java Wed Oct 30 22:21:59 2013
@@ -17,10 +17,11 @@
*/
package org.apache.hadoop.portmap;
-import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
import org.apache.hadoop.oncrpc.RpcCall;
import org.apache.hadoop.oncrpc.RpcUtil;
import org.apache.hadoop.oncrpc.XDR;
+import org.apache.hadoop.oncrpc.security.CredentialsNone;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.apache.hadoop.portmap.PortmapInterface.Procedure;
/**
@@ -33,14 +34,12 @@ public class PortmapRequest {
public static XDR create(PortmapMapping mapping) {
XDR request = new XDR();
- RpcCall.write(request,
+ RpcCall call = RpcCall.getInstance(
RpcUtil.getNewXid(String.valueOf(RpcProgramPortmap.PROGRAM)),
RpcProgramPortmap.PROGRAM, RpcProgramPortmap.VERSION,
- Procedure.PMAPPROC_SET.getValue());
- request.writeInt(AuthFlavor.AUTH_NONE.getValue());
- request.writeInt(0);
- request.writeInt(0);
- request.writeInt(0);
+ Procedure.PMAPPROC_SET.getValue(), new CredentialsNone(),
+ new VerifierNone());
+ call.write(request);
return mapping.serialize(request);
}
}
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapResponse.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapResponse.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapResponse.java Wed Oct 30 22:21:59 2013
@@ -22,30 +22,31 @@ import java.util.Collection;
import org.apache.hadoop.oncrpc.RpcAcceptedReply;
import org.apache.hadoop.oncrpc.XDR;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
/**
* Helper utility for sending portmap response.
*/
public class PortmapResponse {
public static XDR voidReply(XDR xdr, int xid) {
- RpcAcceptedReply.voidReply(xdr, xid);
+ RpcAcceptedReply.getAcceptInstance(xid, new VerifierNone()).write(xdr);
return xdr;
}
public static XDR intReply(XDR xdr, int xid, int value) {
- RpcAcceptedReply.voidReply(xdr, xid);
+ RpcAcceptedReply.getAcceptInstance(xid, new VerifierNone()).write(xdr);
xdr.writeInt(value);
return xdr;
}
public static XDR booleanReply(XDR xdr, int xid, boolean value) {
- RpcAcceptedReply.voidReply(xdr, xid);
+ RpcAcceptedReply.getAcceptInstance(xid, new VerifierNone()).write(xdr);
xdr.writeBoolean(value);
return xdr;
}
public static XDR pmapList(XDR xdr, int xid, Collection<PortmapMapping> list) {
- RpcAcceptedReply.voidReply(xdr, xid);
+ RpcAcceptedReply.getAcceptInstance(xid, new VerifierNone()).write(xdr);
for (PortmapMapping mapping : list) {
System.out.println(mapping);
xdr.writeBoolean(true); // Value follows
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java Wed Oct 30 22:21:59 2013
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.portmap;
-import java.net.InetAddress;
import java.util.HashMap;
import java.util.Map.Entry;
import java.util.Set;
@@ -26,9 +25,15 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.oncrpc.RpcAcceptedReply;
import org.apache.hadoop.oncrpc.RpcCall;
+import org.apache.hadoop.oncrpc.RpcInfo;
import org.apache.hadoop.oncrpc.RpcProgram;
+import org.apache.hadoop.oncrpc.RpcResponse;
+import org.apache.hadoop.oncrpc.RpcUtil;
import org.apache.hadoop.oncrpc.XDR;
-import org.jboss.netty.channel.Channel;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.ChannelHandlerContext;
/**
* An rpcbind request handler.
@@ -43,7 +48,7 @@ public class RpcProgramPortmap extends R
private final HashMap<String, PortmapMapping> map;
public RpcProgramPortmap() {
- super("portmap", "localhost", RPCB_PORT, PROGRAM, VERSION, VERSION, 0);
+ super("portmap", "localhost", RPCB_PORT, PROGRAM, VERSION, VERSION);
map = new HashMap<String, PortmapMapping>(256);
}
@@ -129,10 +134,15 @@ public class RpcProgramPortmap extends R
}
@Override
- public XDR handleInternal(RpcCall rpcCall, XDR in, XDR out,
- InetAddress client, Channel channel) {
+ public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
+ RpcCall rpcCall = (RpcCall) info.header();
final Procedure portmapProc = Procedure.fromValue(rpcCall.getProcedure());
int xid = rpcCall.getXid();
+ byte[] data = new byte[info.data().readableBytes()];
+ info.data().readBytes(data);
+ XDR in = new XDR(data);
+ XDR out = new XDR();
+
if (portmapProc == Procedure.PMAPPROC_NULL) {
out = nullOp(xid, in, out);
} else if (portmapProc == Procedure.PMAPPROC_SET) {
@@ -147,10 +157,14 @@ public class RpcProgramPortmap extends R
out = getport(xid, in, out);
} else {
LOG.info("PortmapHandler unknown rpc procedure=" + portmapProc);
- RpcAcceptedReply.voidReply(out, xid,
- RpcAcceptedReply.AcceptState.PROC_UNAVAIL);
+ RpcAcceptedReply reply = RpcAcceptedReply.getInstance(xid,
+ RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone());
+ reply.write(out);
}
- return out;
+
+ ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
+ RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
+ RpcUtil.sendRpcResponse(ctx, rsp);
}
@Override
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/nfs/TestNfsTime.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/nfs/TestNfsTime.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/nfs/TestNfsTime.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/nfs/TestNfsTime.java Wed Oct 30 22:21:59 2013
@@ -39,7 +39,7 @@ public class TestNfsTime {
t1.serialize(xdr);
// Deserialize it back
- NfsTime t2 = NfsTime.deserialize(xdr);
+ NfsTime t2 = NfsTime.deserialize(xdr.asReadOnlyWrap());
// Ensure the NfsTimes are equal
Assert.assertEquals(t1, t2);
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/nfs/nfs3/TestFileHandle.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/nfs/nfs3/TestFileHandle.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/nfs/nfs3/TestFileHandle.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/nfs/nfs3/TestFileHandle.java Wed Oct 30 22:21:59 2013
@@ -33,7 +33,7 @@ public class TestFileHandle {
// Deserialize it back
FileHandle handle2 = new FileHandle();
- handle2.deserialize(xdr);
+ handle2.deserialize(xdr.asReadOnlyWrap());
Assert.assertEquals(handle.getFileId(), 1024);
}
}
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java Wed Oct 30 22:21:59 2013
@@ -18,14 +18,18 @@
package org.apache.hadoop.oncrpc;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import java.net.InetAddress;
import java.nio.ByteBuffer;
+import org.apache.hadoop.oncrpc.RpcUtil.RpcFrameDecoder;
+import org.apache.hadoop.oncrpc.security.CredentialsNone;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.junit.Test;
@@ -34,7 +38,7 @@ import org.mockito.Mockito;
public class TestFrameDecoder {
private static int port = 12345; // some random server port
- private static XDR result = null;
+ private static int resultSize;
static void testRequest(XDR request) {
SimpleTcpClient tcpClient = new SimpleTcpClient("localhost", port, request,
@@ -45,17 +49,20 @@ public class TestFrameDecoder {
static class TestRpcProgram extends RpcProgram {
protected TestRpcProgram(String program, String host, int port,
- int progNumber, int lowProgVersion, int highProgVersion, int cacheSize) {
- super(program, host, port, progNumber, lowProgVersion, highProgVersion,
- cacheSize);
+ int progNumber, int lowProgVersion, int highProgVersion) {
+ super(program, host, port, progNumber, lowProgVersion, highProgVersion);
}
@Override
- public XDR handleInternal(RpcCall rpcCall, XDR in, XDR out,
- InetAddress client, Channel channel) {
- // Get the final complete request and return a void response.
- result = in;
- return RpcAcceptedReply.voidReply(out, 1234);
+ protected void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
+ resultSize = info.data().readableBytes();
+ RpcAcceptedReply reply = RpcAcceptedReply.getAcceptInstance(1234,
+ new VerifierNone());
+ XDR out = new XDR();
+ reply.write(out);
+ ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
+ RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
+ RpcUtil.sendRpcResponse(ctx, rsp);
}
@Override
@@ -135,42 +142,40 @@ public class TestFrameDecoder {
buf);
assertTrue(channelBuffer != null);
// Complete frame should have to total size 10+10=20
- assertTrue(channelBuffer.array().length == 20);
+ assertEquals(20, channelBuffer.readableBytes());
}
@Test
public void testFrames() {
RpcProgram program = new TestFrameDecoder.TestRpcProgram("TestRpcProgram",
- "localhost", port, 100000, 1, 2, 100);
+ "localhost", port, 100000, 1, 2);
SimpleTcpServer tcpServer = new SimpleTcpServer(port, program, 1);
tcpServer.run();
XDR xdrOut = createGetportMount();
+ int headerSize = xdrOut.size();
int bufsize = 2 * 1024 * 1024;
byte[] buffer = new byte[bufsize];
xdrOut.writeFixedOpaque(buffer);
- int requestSize = xdrOut.size();
+ int requestSize = xdrOut.size() - headerSize;
// Send the request to the server
testRequest(xdrOut);
// Verify the server got the request with right size
- assertTrue(requestSize == result.size());
+ assertEquals(requestSize, resultSize);
}
static void createPortmapXDRheader(XDR xdr_out, int procedure) {
// Make this a method
- RpcCall.write(xdr_out, 0, 100000, 2, procedure);
+ RpcCall.getInstance(0, 100000, 2, procedure, new CredentialsNone(),
+ new VerifierNone()).write(xdr_out);
}
static XDR createGetportMount() {
XDR xdr_out = new XDR();
createPortmapXDRheader(xdr_out, 3);
- xdr_out.writeInt(0); // AUTH_NULL
- xdr_out.writeInt(0); // cred len
- xdr_out.writeInt(0); // verifier AUTH_NULL
- xdr_out.writeInt(0); // verf len
return xdr_out;
}
/*
@@ -191,4 +196,4 @@ public class TestFrameDecoder {
* static void testDump() { XDR xdr_out = new XDR();
* createPortmapXDRheader(xdr_out, 4); testRequest(xdr_out); }
*/
-}
\ No newline at end of file
+}
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAcceptedReply.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAcceptedReply.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAcceptedReply.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAcceptedReply.java Wed Oct 30 22:21:59 2013
@@ -20,8 +20,9 @@ package org.apache.hadoop.oncrpc;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
-import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
import org.apache.hadoop.oncrpc.RpcReply.ReplyState;
+import org.apache.hadoop.oncrpc.security.Verifier;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.junit.Test;
/**
@@ -45,8 +46,8 @@ public class TestRpcAcceptedReply {
@Test
public void testConstructor() {
- RpcAuthInfo verifier = new RpcAuthInfo(AuthFlavor.AUTH_NONE, new byte[0]);
- RpcAcceptedReply reply = new RpcAcceptedReply(0, RpcMessage.Type.RPC_REPLY,
+ Verifier verifier = new VerifierNone();
+ RpcAcceptedReply reply = new RpcAcceptedReply(0,
ReplyState.MSG_ACCEPTED, verifier, AcceptState.SUCCESS);
assertEquals(0, reply.getXid());
assertEquals(RpcMessage.Type.RPC_REPLY, reply.getMessageType());
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCall.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCall.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCall.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCall.java Wed Oct 30 22:21:59 2013
@@ -17,8 +17,12 @@
*/
package org.apache.hadoop.oncrpc;
-import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.oncrpc.security.CredentialsNone;
+import org.apache.hadoop.oncrpc.security.Credentials;
+import org.apache.hadoop.oncrpc.security.Verifier;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.junit.Test;
/**
@@ -28,8 +32,8 @@ public class TestRpcCall {
@Test
public void testConstructor() {
- RpcAuthInfo credential = new RpcAuthInfo(AuthFlavor.AUTH_NONE, new byte[0]);
- RpcAuthInfo verifier = new RpcAuthInfo(AuthFlavor.AUTH_NONE, new byte[0]);
+ Credentials credential = new CredentialsNone();
+ Verifier verifier = new VerifierNone();
int rpcVersion = RpcCall.RPC_VERSION;
int program = 2;
int version = 3;
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java Wed Oct 30 22:21:59 2013
@@ -32,6 +32,8 @@ import org.apache.hadoop.oncrpc.RpcCallC
import org.apache.hadoop.oncrpc.RpcCallCache.ClientRequest;
import org.junit.Test;
+import static org.mockito.Mockito.*;
+
/**
* Unit tests for {@link RpcCallCache}
*/
@@ -67,7 +69,7 @@ public class TestRpcCallCache {
validateInprogressCacheEntry(e);
// Set call as completed
- XDR response = new XDR();
+ RpcResponse response = mock(RpcResponse.class);
cache.callCompleted(clientIp, xid, response);
e = cache.checkOrAddToCache(clientIp, xid);
validateCompletedCacheEntry(e, response);
@@ -79,7 +81,7 @@ public class TestRpcCallCache {
assertNull(c.getResponse());
}
- private void validateCompletedCacheEntry(CacheEntry c, XDR response) {
+ private void validateCompletedCacheEntry(CacheEntry c, RpcResponse response) {
assertFalse(c.isInProgress());
assertTrue(c.isCompleted());
assertEquals(response, c.getResponse());
@@ -93,7 +95,7 @@ public class TestRpcCallCache {
assertFalse(c.isCompleted());
assertNull(c.getResponse());
- XDR response = new XDR();
+ RpcResponse response = mock(RpcResponse.class);
c.setResponse(response);
validateCompletedCacheEntry(c, response);
}
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcDeniedReply.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcDeniedReply.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcDeniedReply.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcDeniedReply.java Wed Oct 30 22:21:59 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.oncrpc;
import org.apache.hadoop.oncrpc.RpcDeniedReply.RejectState;
import org.apache.hadoop.oncrpc.RpcReply.ReplyState;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.junit.Assert;
import org.junit.Test;
@@ -39,10 +40,8 @@ public class TestRpcDeniedReply {
@Test
public void testConstructor() {
- RpcDeniedReply reply = new RpcDeniedReply(0, RpcMessage.Type.RPC_REPLY,
- ReplyState.MSG_ACCEPTED, RejectState.AUTH_ERROR) {
- // Anonymous class
- };
+ RpcDeniedReply reply = new RpcDeniedReply(0, ReplyState.MSG_ACCEPTED,
+ RejectState.AUTH_ERROR, new VerifierNone());
Assert.assertEquals(0, reply.getXid());
Assert.assertEquals(RpcMessage.Type.RPC_REPLY, reply.getMessageType());
Assert.assertEquals(ReplyState.MSG_ACCEPTED, reply.getState());
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcMessage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcMessage.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcMessage.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcMessage.java Wed Oct 30 22:21:59 2013
@@ -26,7 +26,10 @@ import org.junit.Test;
public class TestRpcMessage {
private RpcMessage getRpcMessage(int xid, RpcMessage.Type msgType) {
return new RpcMessage(xid, msgType) {
- // Anonymous class
+ @Override
+ public XDR write(XDR xdr) {
+ return null;
+ }
};
}
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcReply.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcReply.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcReply.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcReply.java Wed Oct 30 22:21:59 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.oncrpc;
import org.apache.hadoop.oncrpc.RpcReply.ReplyState;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.junit.Assert;
import org.junit.Test;
@@ -39,8 +40,12 @@ public class TestRpcReply {
@Test
public void testRpcReply() {
- RpcReply reply = new RpcReply(0, RpcMessage.Type.RPC_REPLY, ReplyState.MSG_ACCEPTED) {
- // Anonymous class
+ RpcReply reply = new RpcReply(0, ReplyState.MSG_ACCEPTED,
+ new VerifierNone()) {
+ @Override
+ public XDR write(XDR xdr) {
+ return null;
+ }
};
Assert.assertEquals(0, reply.getXid());
Assert.assertEquals(RpcMessage.Type.RPC_REPLY, reply.getMessageType());
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java Wed Oct 30 22:21:59 2013
@@ -17,23 +17,34 @@
*/
package org.apache.hadoop.oncrpc;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-
+import org.junit.Assert;
import org.junit.Test;
-/**
- * Tests for {@link XDR}
- */
public class TestXDR {
- /**
- * Test {@link XDR#append(byte[], byte[])}
- */
+ private void serializeInt(int times) {
+ XDR w = new XDR();
+ for (int i = 0; i < times; ++i)
+ w.writeInt(23);
+
+ XDR r = w.asReadOnlyWrap();
+ for (int i = 0; i < times; ++i)
+ Assert.assertEquals(r.readInt(), 23);
+ }
+
+ private void serializeLong(int times) {
+ XDR w = new XDR();
+ for (int i = 0; i < times; ++i)
+ w.writeLongAsHyper(23);
+
+ XDR r = w.asReadOnlyWrap();
+ for (int i = 0; i < times; ++i)
+ Assert.assertEquals(r.readHyper(), 23);
+ }
+
@Test
- public void testAppendBytes() {
- byte[] arr1 = new byte[] {0, 1};
- byte[] arr2 = new byte[] {2, 3};
- assertTrue(Arrays.equals(new byte[]{0, 1, 2, 3}, XDR.append(arr1, arr2)));
+ public void testPerformance() {
+ final int TEST_TIMES = 8 << 20;
+ serializeInt(TEST_TIMES);
+ serializeLong(TEST_TIMES);
}
}