You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by br...@apache.org on 2013/09/30 21:37:34 UTC

svn commit: r1527741 - in /hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src: main/java/org/apache/hadoop/nfs/nfs3/ main/java/org/apache/hadoop/oncrpc/ main/java/org/apache/hadoop/oncrpc/security/ main/java/org/apache/hadoop/portmap/...

Author: brandonli
Date: Mon Sep 30 19:37:33 2013
New Revision: 1527741

URL: http://svn.apache.org/r1527741
Log:
HDFS-5230. Merging change r1527726 from trunk

Added:
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcInfo.java
      - copied unchanged from r1527726, hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcInfo.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcResponse.java
      - copied unchanged from r1527726, hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcResponse.java
Removed:
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java
Modified:
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/security/Verifier.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java?rev=1527741&r1=1527740&r2=1527741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java Mon Sep 30 19:37:33 2013
@@ -22,13 +22,8 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mount.MountdBase;
 import org.apache.hadoop.oncrpc.RpcProgram;
-import org.apache.hadoop.oncrpc.RpcUtil;
 import org.apache.hadoop.oncrpc.SimpleTcpServer;
-import org.apache.hadoop.oncrpc.SimpleTcpServerHandler;
 import org.apache.hadoop.portmap.PortmapMapping;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
 
 /**
  * Nfs server. Supports NFS v3 using {@link RpcProgram}.
@@ -72,19 +67,7 @@ public abstract class Nfs3Base {
 
   private void startTCPServer() {
     SimpleTcpServer tcpServer = new SimpleTcpServer(nfsPort,
-        rpcProgram, 0) {
-      @Override
-      public ChannelPipelineFactory getPipelineFactory() {
-        return new ChannelPipelineFactory() {
-          @Override
-          public ChannelPipeline getPipeline() {
-            return Channels.pipeline(
-                RpcUtil.constructRpcFrameDecoder(),
-                new SimpleTcpServerHandler(rpcProgram));
-          }
-        };
-      }
-    };
+        rpcProgram, 0);
     tcpServer.run();
   }
 }

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java?rev=1527741&r1=1527740&r2=1527741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java Mon Sep 30 19:37:33 2013
@@ -44,7 +44,7 @@ import com.google.common.annotations.Vis
 public class RpcCallCache {
   
   public static class CacheEntry {
-    private XDR response; // null if no response has been sent
+    private RpcResponse response; // null if no response has been sent
     
     public CacheEntry() {
       response = null;
@@ -58,11 +58,11 @@ public class RpcCallCache {
       return response != null;
     }
     
-    public XDR getResponse() {
+    public RpcResponse getResponse() {
       return response;
     }
     
-    public void setResponse(XDR response) {
+    public void setResponse(RpcResponse response) {
       this.response = response;
     }
   }
@@ -128,13 +128,13 @@ public class RpcCallCache {
   }
 
   /** Mark a request as completed and add corresponding response to the cache */
-  public void callCompleted(InetAddress clientId, int xid, XDR response) {
+  public void callCompleted(InetAddress clientId, int xid, RpcResponse response) {
     ClientRequest req = new ClientRequest(clientId, xid);
     CacheEntry e;
     synchronized(map) {
       e = map.get(req);
     }
-    e.setResponse(response);
+    e.response = response;
   }
   
   /**

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java?rev=1527741&r1=1527740&r2=1527741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java Mon Sep 30 19:37:33 2013
@@ -18,22 +18,24 @@
 package org.apache.hadoop.oncrpc;
 
 import java.io.IOException;
-import java.net.InetAddress;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
-import org.apache.hadoop.oncrpc.RpcCallCache.CacheEntry;
-import org.apache.hadoop.oncrpc.security.VerifierNone;
+import org.apache.hadoop.oncrpc.security.Verifier;
 import org.apache.hadoop.portmap.PortmapMapping;
 import org.apache.hadoop.portmap.PortmapRequest;
-import org.jboss.netty.channel.Channel;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 
 /**
  * Class for writing RPC server programs based on RFC 1050. Extend this class
  * and implement {@link #handleInternal} to handle the requests received.
  */
-public abstract class RpcProgram {
+public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
   private static final Log LOG = LogFactory.getLog(RpcProgram.class);
   public static final int RPCB_PORT = 111;
   private final String program;
@@ -42,7 +44,6 @@ public abstract class RpcProgram {
   private final int progNumber;
   private final int lowProgVersion;
   private final int highProgVersion;
-  private final RpcCallCache rpcCallCache;
   
   /**
    * Constructor
@@ -53,19 +54,15 @@ public abstract class RpcProgram {
    * @param progNumber program number as defined in RFC 1050
    * @param lowProgVersion lowest version of the specification supported
    * @param highProgVersion highest version of the specification supported
-   * @param cacheSize size of cache to handle duplciate requests. Size <= 0
-   *          indicates no cache.
    */
   protected RpcProgram(String program, String host, int port, int progNumber,
-      int lowProgVersion, int highProgVersion, int cacheSize) {
+      int lowProgVersion, int highProgVersion) {
     this.program = program;
     this.host = host;
     this.port = port;
     this.progNumber = progNumber;
     this.lowProgVersion = lowProgVersion;
     this.highProgVersion = highProgVersion;
-    this.rpcCallCache = cacheSize > 0 ? new RpcCallCache(program, cacheSize)
-        : null;
   }
 
   /**
@@ -103,92 +100,50 @@ public abstract class RpcProgram {
     }
   }
 
-  /**
-   * Handle an RPC request.
-   * @param rpcCall RPC call that is received
-   * @param in xdr with cursor at reading the remaining bytes of a method call
-   * @param out xdr output corresponding to Rpc reply
-   * @param client making the Rpc request
-   * @param channel connection over which Rpc request is received
-   * @return response xdr response
-   */
-  protected abstract XDR handleInternal(RpcCall rpcCall, XDR in, XDR out,
-      InetAddress client, Channel channel);
-  
-  public XDR handle(XDR xdr, InetAddress client, Channel channel) {
-    XDR out = new XDR();
-    RpcCall rpcCall = RpcCall.read(xdr);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(program + " procedure #" + rpcCall.getProcedure());
-    }
-    
-    if (!checkProgram(rpcCall.getProgram())) {
-      return programMismatch(out, rpcCall);
-    }
-
-    if (!checkProgramVersion(rpcCall.getVersion())) {
-      return programVersionMismatch(out, rpcCall);
-    }
-    
-    // Check for duplicate requests in the cache for non-idempotent requests
-    boolean idempotent = rpcCallCache != null && !isIdempotent(rpcCall);
-    if (idempotent) {
-      CacheEntry entry = rpcCallCache.checkOrAddToCache(client, rpcCall.getXid());
-      if (entry != null) { // in ache 
-        if (entry.isCompleted()) {
-          LOG.info("Sending the cached reply to retransmitted request "
-              + rpcCall.getXid());
-          return entry.getResponse();
-        } else { // else request is in progress
-          LOG.info("Retransmitted request, transaction still in progress "
-              + rpcCall.getXid());
-          // TODO: ignore the request?
-        }
-      }
+  @Override
+  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+      throws Exception {
+    RpcInfo info = (RpcInfo) e.getMessage();
+    RpcCall call = (RpcCall) info.header();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(program + " procedure #" + call.getProcedure());
     }
     
-    XDR response = handleInternal(rpcCall, xdr, out, client, channel);
-    if (response.size() == 0) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("No sync response, expect an async response for request XID="
-            + rpcCall.getXid());
-      }
+    if (this.progNumber != call.getProgram()) {
+      LOG.warn("Invalid RPC call program " + call.getProgram());
+      RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
+          AcceptState.PROG_UNAVAIL, Verifier.VERIFIER_NONE);
+
+      XDR out = new XDR();
+      reply.write(out);
+      ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+          .buffer());
+      RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
+      RpcUtil.sendRpcResponse(ctx, rsp);
+      return;
+    }
+
+    int ver = call.getVersion();
+    if (ver < lowProgVersion || ver > highProgVersion) {
+      LOG.warn("Invalid RPC call version " + ver);
+      RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
+          AcceptState.PROG_MISMATCH, Verifier.VERIFIER_NONE);
+
+      XDR out = new XDR();
+      reply.write(out);
+      out.writeInt(lowProgVersion);
+      out.writeInt(highProgVersion);
+      ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+          .buffer());
+      RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
+      RpcUtil.sendRpcResponse(ctx, rsp);
+      return;
     }
     
-    // Add the request to the cache
-    if (idempotent) {
-      rpcCallCache.callCompleted(client, rpcCall.getXid(), response);
-    }
-    return response;
-  }
-  
-  private XDR programMismatch(XDR out, RpcCall call) {
-    LOG.warn("Invalid RPC call program " + call.getProgram());
-    RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
-        AcceptState.PROG_UNAVAIL, new VerifierNone());
-    reply.write(out);
-    return out;
-  }
-  
-  private XDR programVersionMismatch(XDR out, RpcCall call) {
-    LOG.warn("Invalid RPC call version " + call.getVersion());
-    RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
-        AcceptState.PROG_MISMATCH, new VerifierNone());
-    reply.write(out);
-    out.writeInt(lowProgVersion);
-    out.writeInt(highProgVersion);
-    return out;
-  }
-  
-  private boolean checkProgram(int progNumber) {
-    return this.progNumber == progNumber;
-  }
-  
-  /** Return true if a the program version in rpcCall is supported */
-  private boolean checkProgramVersion(int programVersion) {
-    return programVersion >= lowProgVersion
-        && programVersion <= highProgVersion;
+    handleInternal(ctx, info);
   }
+
+  protected abstract void handleInternal(ChannelHandlerContext ctx, RpcInfo info);
   
   @Override
   public String toString() {

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java?rev=1527741&r1=1527740&r2=1527741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java Mon Sep 30 19:37:33 2013
@@ -17,17 +17,23 @@
  */
 package org.apache.hadoop.oncrpc;
 
+import java.nio.ByteBuffer;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 import org.jboss.netty.handler.codec.frame.FrameDecoder;
 
-public class RpcUtil {
+public final class RpcUtil {
   /**
-   * The XID in RPC call. It is used for starting with new seed after each reboot.
+   * The XID in RPC call. It is used for starting with new seed after each
+   * reboot.
    */
   private static int xid = (int) (System.currentTimeMillis() / 1000) << 12;
 
@@ -35,10 +41,27 @@ public class RpcUtil {
     return xid = ++xid + caller.hashCode();
   }
 
+  public static void sendRpcResponse(ChannelHandlerContext ctx,
+      RpcResponse response) {
+    Channels.fireMessageReceived(ctx, response);
+  }
+
   public static FrameDecoder constructRpcFrameDecoder() {
     return new RpcFrameDecoder();
   }
 
+  public static final SimpleChannelUpstreamHandler STAGE_RPC_MESSAGE_PARSER = new RpcMessageParserStage();
+  public static final SimpleChannelUpstreamHandler STAGE_RPC_TCP_RESPONSE = new RpcTcpResponseStage();
+  public static final SimpleChannelUpstreamHandler STAGE_RPC_UDP_RESPONSE = new RpcUdpResponseStage();
+
+  /**
+   * An RPC client can separate a RPC message into several frames (i.e.,
+   * fragments) when transferring it across the wire. RpcFrameDecoder
+   * reconstructs a full RPC message from these fragments.
+   *
+   * RpcFrameDecoder is a stateful pipeline stage. It has to be constructed for
+   * each RPC client.
+   */
   static class RpcFrameDecoder extends FrameDecoder {
     public static final Log LOG = LogFactory.getLog(RpcFrameDecoder.class);
     private ChannelBuffer currentFrame;
@@ -78,4 +101,68 @@ public class RpcUtil {
       }
     }
   }
+
+  /**
+   * RpcMessageParserStage parses the network bytes and encapsulates the RPC
+   * request into a RpcInfo instance.
+   */
+  static final class RpcMessageParserStage extends SimpleChannelUpstreamHandler {
+    private static final Log LOG = LogFactory
+        .getLog(RpcMessageParserStage.class);
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+        throws Exception {
+      ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+      ByteBuffer b = buf.toByteBuffer().asReadOnlyBuffer();
+      XDR in = new XDR(b, XDR.State.READING);
+
+      RpcInfo info = null;
+      try {
+        RpcCall callHeader = RpcCall.read(in);
+        ChannelBuffer dataBuffer = ChannelBuffers.wrappedBuffer(in.buffer()
+            .slice());
+        info = new RpcInfo(callHeader, dataBuffer, ctx, e.getChannel(),
+            e.getRemoteAddress());
+      } catch (Exception exc) {
+        LOG.info("Malfromed RPC request from " + e.getRemoteAddress());
+      }
+
+      if (info != null) {
+        Channels.fireMessageReceived(ctx, info);
+      }
+    }
+  }
+
+  /**
+   * RpcTcpResponseStage sends an RpcResponse across the wire with the
+   * appropriate fragment header.
+   */
+  private static class RpcTcpResponseStage extends SimpleChannelUpstreamHandler {
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+        throws Exception {
+      RpcResponse r = (RpcResponse) e.getMessage();
+      byte[] fragmentHeader = XDR.recordMark(r.data().readableBytes(), true);
+      ChannelBuffer header = ChannelBuffers.wrappedBuffer(fragmentHeader);
+      ChannelBuffer d = ChannelBuffers.wrappedBuffer(header, r.data());
+      e.getChannel().write(d);
+    }
+  }
+
+  /**
+   * RpcUdpResponseStage sends an RpcResponse as a UDP packet, which does not
+   * require a fragment header.
+   */
+  private static final class RpcUdpResponseStage extends
+      SimpleChannelUpstreamHandler {
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+        throws Exception {
+      RpcResponse r = (RpcResponse) e.getMessage();
+      e.getChannel().write(r.data(), r.remoteAddress());
+    }
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java?rev=1527741&r1=1527740&r2=1527741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java Mon Sep 30 19:37:33 2013
@@ -27,6 +27,7 @@ import org.jboss.netty.channel.ChannelFa
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 
 /**
@@ -35,8 +36,7 @@ import org.jboss.netty.channel.socket.ni
 public class SimpleTcpServer {
   public static final Log LOG = LogFactory.getLog(SimpleTcpServer.class);
   protected final int port;
-  protected final ChannelPipelineFactory pipelineFactory;
-  protected final RpcProgram rpcProgram;
+  protected final SimpleChannelUpstreamHandler rpcProgram;
   
   /** The maximum number of I/O worker threads */
   protected final int workerCount;
@@ -50,18 +50,6 @@ public class SimpleTcpServer {
     this.port = port;
     this.rpcProgram = program;
     this.workerCount = workercount;
-    this.pipelineFactory = getPipelineFactory();
-  }
-
-  public ChannelPipelineFactory getPipelineFactory() {
-    return new ChannelPipelineFactory() {
-      @Override
-      public ChannelPipeline getPipeline() {
-        return Channels.pipeline(
-            RpcUtil.constructRpcFrameDecoder(),
-            new SimpleTcpServerHandler(rpcProgram));
-      }
-    };
   }
   
   public void run() {
@@ -78,7 +66,15 @@ public class SimpleTcpServer {
     }
     
     ServerBootstrap bootstrap = new ServerBootstrap(factory);
-    bootstrap.setPipelineFactory(pipelineFactory);
+    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+
+      @Override
+      public ChannelPipeline getPipeline() throws Exception {
+        return Channels.pipeline(RpcUtil.constructRpcFrameDecoder(),
+            RpcUtil.STAGE_RPC_MESSAGE_PARSER, rpcProgram,
+            RpcUtil.STAGE_RPC_TCP_RESPONSE);
+      }
+    });
     bootstrap.setOption("child.tcpNoDelay", true);
     bootstrap.setOption("child.keepAlive", true);
     

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java?rev=1527741&r1=1527740&r2=1527741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java Mon Sep 30 19:37:33 2013
@@ -23,9 +23,8 @@ import java.util.concurrent.Executors;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 import org.jboss.netty.channel.socket.DatagramChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
 
@@ -38,20 +37,13 @@ public class SimpleUdpServer {
   private final int RECEIVE_BUFFER_SIZE = 65536;
 
   protected final int port;
-  protected final ChannelPipelineFactory pipelineFactory;
-  protected final RpcProgram rpcProgram;
+  protected final SimpleChannelUpstreamHandler rpcProgram;
   protected final int workerCount;
 
-  public SimpleUdpServer(int port, RpcProgram program, int workerCount) {
+  public SimpleUdpServer(int port, SimpleChannelUpstreamHandler program, int workerCount) {
     this.port = port;
     this.rpcProgram = program;
     this.workerCount = workerCount;
-    this.pipelineFactory = new ChannelPipelineFactory() {
-      @Override
-      public ChannelPipeline getPipeline() {
-        return Channels.pipeline(new SimpleUdpServerHandler(rpcProgram));
-      }
-    };
   }
 
   public void run() {
@@ -60,8 +52,9 @@ public class SimpleUdpServer {
         Executors.newCachedThreadPool(), workerCount);
 
     ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);
-    ChannelPipeline p = b.getPipeline();
-    p.addLast("handler", new SimpleUdpServerHandler(rpcProgram));
+    b.setPipeline(Channels.pipeline(
+            RpcUtil.STAGE_RPC_MESSAGE_PARSER, rpcProgram,
+            RpcUtil.STAGE_RPC_UDP_RESPONSE));
 
     b.setOption("broadcast", "false");
     b.setOption("sendBufferSize", SEND_BUFFER_SIZE);

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java?rev=1527741&r1=1527740&r2=1527741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java Mon Sep 30 19:37:33 2013
@@ -93,6 +93,10 @@ public final class XDR {
     return n;
   }
 
+  public ByteBuffer buffer() {
+    return buf.duplicate();
+  }
+
   public int size() {
     // TODO: This overloading intends to be compatible with the semantics of
     // the previous version of the class. This function should be separated into
@@ -219,7 +223,7 @@ public final class XDR {
     return xdr.buf.remaining() >= len;
   }
 
-  private static byte[] recordMark(int size, boolean last) {
+  static byte[] recordMark(int size, boolean last) {
     byte[] b = new byte[SIZEOF_INT];
     ByteBuffer buf = ByteBuffer.wrap(b);
     buf.putInt(!last ? size : size | 0x80000000);
@@ -259,9 +263,8 @@ public final class XDR {
 
   @VisibleForTesting
   public byte[] getBytes() {
-    ByteBuffer d = buf.duplicate();
-    byte[] b = new byte[d.position()];
-    d.flip();
+    ByteBuffer d = asReadOnlyWrap().buffer();
+    byte[] b = new byte[d.remaining()];
     d.get(b);
 
     return b;

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/security/Verifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/security/Verifier.java?rev=1527741&r1=1527740&r2=1527741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/security/Verifier.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/security/Verifier.java Mon Sep 30 19:37:33 2013
@@ -18,16 +18,17 @@
 package org.apache.hadoop.oncrpc.security;
 
 import org.apache.hadoop.oncrpc.XDR;
-import org.apache.hadoop.oncrpc.security.RpcAuthInfo.AuthFlavor;
 
 /**
  * Base class for verifier. Currently our authentication only supports 3 types
- * of auth flavors: {@link AuthFlavor#AUTH_NONE}, {@link AuthFlavor#AUTH_SYS},
- * and {@link AuthFlavor#RPCSEC_GSS}. Thus for verifier we only need to handle
+ * of auth flavors: {@link RpcAuthInfo.AuthFlavor#AUTH_NONE}, {@link RpcAuthInfo.AuthFlavor#AUTH_SYS},
+ * and {@link RpcAuthInfo.AuthFlavor#RPCSEC_GSS}. Thus for verifier we only need to handle
  * AUTH_NONE and RPCSEC_GSS
  */
 public abstract class Verifier extends RpcAuthInfo {
 
+  public static final Verifier VERIFIER_NONE = new VerifierNone();
+
   protected Verifier(AuthFlavor flavor) {
     super(flavor);
   }
@@ -61,6 +62,4 @@ public abstract class Verifier extends R
     }
     verifier.write(xdr);
   }  
- 
-  
 }

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java?rev=1527741&r1=1527740&r2=1527741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java Mon Sep 30 19:37:33 2013
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.portmap;
 
-import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -26,10 +25,15 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.oncrpc.RpcAcceptedReply;
 import org.apache.hadoop.oncrpc.RpcCall;
+import org.apache.hadoop.oncrpc.RpcInfo;
 import org.apache.hadoop.oncrpc.RpcProgram;
+import org.apache.hadoop.oncrpc.RpcResponse;
+import org.apache.hadoop.oncrpc.RpcUtil;
 import org.apache.hadoop.oncrpc.XDR;
 import org.apache.hadoop.oncrpc.security.VerifierNone;
-import org.jboss.netty.channel.Channel;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.ChannelHandlerContext;
 
 /**
  * An rpcbind request handler.
@@ -44,7 +48,7 @@ public class RpcProgramPortmap extends R
   private final HashMap<String, PortmapMapping> map;
 
   public RpcProgramPortmap() {
-    super("portmap", "localhost", RPCB_PORT, PROGRAM, VERSION, VERSION, 0);
+    super("portmap", "localhost", RPCB_PORT, PROGRAM, VERSION, VERSION);
     map = new HashMap<String, PortmapMapping>(256);
   }
 
@@ -130,10 +134,15 @@ public class RpcProgramPortmap extends R
   }
 
   @Override
-  public XDR handleInternal(RpcCall rpcCall, XDR in, XDR out,
-      InetAddress client, Channel channel) {
+  public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
+    RpcCall rpcCall = (RpcCall) info.header();
     final Procedure portmapProc = Procedure.fromValue(rpcCall.getProcedure());
     int xid = rpcCall.getXid();
+    byte[] data = new byte[info.data().readableBytes()];
+    info.data().readBytes(data);
+    XDR in = new XDR(data);
+    XDR out = new XDR();
+
     if (portmapProc == Procedure.PMAPPROC_NULL) {
       out = nullOp(xid, in, out);
     } else if (portmapProc == Procedure.PMAPPROC_SET) {
@@ -148,11 +157,14 @@ public class RpcProgramPortmap extends R
       out = getport(xid, in, out);
     } else {
       LOG.info("PortmapHandler unknown rpc procedure=" + portmapProc);
-      RpcAcceptedReply.getInstance(xid,
-          RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write(
-          out);
+      RpcAcceptedReply reply = RpcAcceptedReply.getInstance(xid,
+          RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone());
+      reply.write(out);
     }
-    return out;
+
+    ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
+    RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
+    RpcUtil.sendRpcResponse(ctx, rsp);
   }
   
   @Override

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java?rev=1527741&r1=1527740&r2=1527741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java Mon Sep 30 19:37:33 2013
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import java.net.InetAddress;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.oncrpc.RpcUtil.RpcFrameDecoder;
@@ -30,6 +29,7 @@ import org.apache.hadoop.oncrpc.security
 import org.apache.hadoop.oncrpc.security.VerifierNone;
 import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.junit.Test;
@@ -38,7 +38,7 @@ import org.mockito.Mockito;
 public class TestFrameDecoder {
 
   private static int port = 12345; // some random server port
-  private static XDR result = null;
+  private static int resultSize;
 
   static void testRequest(XDR request) {
     SimpleTcpClient tcpClient = new SimpleTcpClient("localhost", port, request,
@@ -49,18 +49,20 @@ public class TestFrameDecoder {
   static class TestRpcProgram extends RpcProgram {
 
     protected TestRpcProgram(String program, String host, int port,
-        int progNumber, int lowProgVersion, int highProgVersion, int cacheSize) {
-      super(program, host, port, progNumber, lowProgVersion, highProgVersion,
-          cacheSize);
+        int progNumber, int lowProgVersion, int highProgVersion) {
+      super(program, host, port, progNumber, lowProgVersion, highProgVersion);
     }
 
     @Override
-    public XDR handleInternal(RpcCall rpcCall, XDR in, XDR out,
-        InetAddress client, Channel channel) {
-      // Get the final complete request and return a void response.
-      result = in;
-      RpcAcceptedReply.getAcceptInstance(1234, new VerifierNone()).write(out);
-      return out;
+    protected void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
+      resultSize = info.data().readableBytes();
+      RpcAcceptedReply reply = RpcAcceptedReply.getAcceptInstance(1234,
+          new VerifierNone());
+      XDR out = new XDR();
+      reply.write(out);
+      ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
+      RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
+      RpcUtil.sendRpcResponse(ctx, rsp);
     }
 
     @Override
@@ -147,21 +149,22 @@ public class TestFrameDecoder {
   public void testFrames() {
 
     RpcProgram program = new TestFrameDecoder.TestRpcProgram("TestRpcProgram",
-        "localhost", port, 100000, 1, 2, 100);
+        "localhost", port, 100000, 1, 2);
     SimpleTcpServer tcpServer = new SimpleTcpServer(port, program, 1);
     tcpServer.run();
 
     XDR xdrOut = createGetportMount();
+    int headerSize = xdrOut.size();
     int bufsize = 2 * 1024 * 1024;
     byte[] buffer = new byte[bufsize];
     xdrOut.writeFixedOpaque(buffer);
-    int requestSize = xdrOut.size();
+    int requestSize = xdrOut.size() - headerSize;
 
     // Send the request to the server
     testRequest(xdrOut);
 
     // Verify the server got the request with right size
-    assertTrue(requestSize == result.size());
+    assertEquals(requestSize, resultSize);
   }
 
   static void createPortmapXDRheader(XDR xdr_out, int procedure) {
@@ -173,10 +176,6 @@ public class TestFrameDecoder {
   static XDR createGetportMount() {
     XDR xdr_out = new XDR();
     createPortmapXDRheader(xdr_out, 3);
-    xdr_out.writeInt(0); // AUTH_NULL
-    xdr_out.writeInt(0); // cred len
-    xdr_out.writeInt(0); // verifier AUTH_NULL
-    xdr_out.writeInt(0); // verf len
     return xdr_out;
   }
   /*

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java?rev=1527741&r1=1527740&r2=1527741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java Mon Sep 30 19:37:33 2013
@@ -32,6 +32,8 @@ import org.apache.hadoop.oncrpc.RpcCallC
 import org.apache.hadoop.oncrpc.RpcCallCache.ClientRequest;
 import org.junit.Test;
 
+import static org.mockito.Mockito.*;
+
 /**
  * Unit tests for {@link RpcCallCache}
  */
@@ -67,7 +69,7 @@ public class TestRpcCallCache {
     validateInprogressCacheEntry(e);
     
     // Set call as completed
-    XDR response = new XDR();
+    RpcResponse response = mock(RpcResponse.class);
     cache.callCompleted(clientIp, xid, response);
     e = cache.checkOrAddToCache(clientIp, xid);
     validateCompletedCacheEntry(e, response);
@@ -79,7 +81,7 @@ public class TestRpcCallCache {
     assertNull(c.getResponse());
   }
   
-  private void validateCompletedCacheEntry(CacheEntry c, XDR response) {
+  private void validateCompletedCacheEntry(CacheEntry c, RpcResponse response) {
     assertFalse(c.isInProgress());
     assertTrue(c.isCompleted());
     assertEquals(response, c.getResponse());
@@ -93,7 +95,7 @@ public class TestRpcCallCache {
     assertFalse(c.isCompleted());
     assertNull(c.getResponse());
     
-    XDR response = new XDR();
+    RpcResponse response = mock(RpcResponse.class);
     c.setResponse(response);
     validateCompletedCacheEntry(c, response);
   }