You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by br...@apache.org on 2013/09/30 21:21:19 UTC

svn commit: r1527726 - in /hadoop/common/trunk/hadoop-hdfs-project: hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/ hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/

Author: brandonli
Date: Mon Sep 30 19:21:17 2013
New Revision: 1527726

URL: http://svn.apache.org/r1527726
Log:
HDFS-5230. Introduce RpcInfo to decouple XDR classes from the RPC API. Contributed by Haohui Mai

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java?rev=1527726&r1=1527725&r2=1527726&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java Mon Sep 30 19:21:17 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.nfs.mount
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -38,10 +39,15 @@ import org.apache.hadoop.nfs.nfs3.FileHa
 import org.apache.hadoop.nfs.nfs3.Nfs3Status;
 import org.apache.hadoop.oncrpc.RpcAcceptedReply;
 import org.apache.hadoop.oncrpc.RpcCall;
+import org.apache.hadoop.oncrpc.RpcInfo;
 import org.apache.hadoop.oncrpc.RpcProgram;
+import org.apache.hadoop.oncrpc.RpcResponse;
+import org.apache.hadoop.oncrpc.RpcUtil;
 import org.apache.hadoop.oncrpc.XDR;
 import org.apache.hadoop.oncrpc.security.VerifierNone;
-import org.jboss.netty.channel.Channel;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.ChannelHandlerContext;
 
 /**
  * RPC program corresponding to mountd daemon. See {@link Mountd}.
@@ -77,7 +83,7 @@ public class RpcProgramMountd extends Rp
       throws IOException {
     // Note that RPC cache is not enabled
     super("mountd", "localhost", config.getInt("nfs3.mountd.port", PORT),
-        PROGRAM, VERSION_1, VERSION_3, 0);
+        PROGRAM, VERSION_1, VERSION_3);
     
     this.hostsMatcher = NfsExports.getInstance(config);
     this.mounts = Collections.synchronizedList(new ArrayList<MountEntry>());
@@ -173,10 +179,16 @@ public class RpcProgramMountd extends Rp
   }
 
   @Override
-  public XDR handleInternal(RpcCall rpcCall, XDR xdr, XDR out,
-      InetAddress client, Channel channel) {
+  public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
+    RpcCall rpcCall = (RpcCall) info.header();
     final MNTPROC mntproc = MNTPROC.fromValue(rpcCall.getProcedure());
     int xid = rpcCall.getXid();
+    byte[] data = new byte[info.data().readableBytes()];
+    info.data().readBytes(data);
+    XDR xdr = new XDR(data);
+    XDR out = new XDR();
+    InetAddress client = ((InetSocketAddress) info.remoteAddress()).getAddress();
+
     if (mntproc == MNTPROC.NULL) {
       out = nullOp(out, xid, client);
     } else if (mntproc == MNTPROC.MNT) {
@@ -198,7 +210,9 @@ public class RpcProgramMountd extends Rp
           RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write(
           out);
     }  
-    return out;
+    ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
+    RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
+    RpcUtil.sendRpcResponse(ctx, rsp);
   }
   
   @Override

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java?rev=1527726&r1=1527725&r2=1527726&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java Mon Sep 30 19:21:17 2013
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.EnumSet;
 
@@ -103,9 +104,13 @@ import org.apache.hadoop.nfs.nfs3.respon
 import org.apache.hadoop.nfs.nfs3.response.WccData;
 import org.apache.hadoop.oncrpc.RpcAcceptedReply;
 import org.apache.hadoop.oncrpc.RpcCall;
+import org.apache.hadoop.oncrpc.RpcCallCache;
 import org.apache.hadoop.oncrpc.RpcDeniedReply;
+import org.apache.hadoop.oncrpc.RpcInfo;
 import org.apache.hadoop.oncrpc.RpcProgram;
 import org.apache.hadoop.oncrpc.RpcReply;
+import org.apache.hadoop.oncrpc.RpcResponse;
+import org.apache.hadoop.oncrpc.RpcUtil;
 import org.apache.hadoop.oncrpc.XDR;
 import org.apache.hadoop.oncrpc.security.Credentials;
 import org.apache.hadoop.oncrpc.security.CredentialsSys;
@@ -115,7 +120,10 @@ import org.apache.hadoop.oncrpc.security
 import org.apache.hadoop.oncrpc.security.Verifier;
 import org.apache.hadoop.oncrpc.security.VerifierNone;
 import org.apache.hadoop.security.AccessControlException;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
 
 /**
  * RPC program corresponding to nfs daemon. See {@link Nfs3}.
@@ -150,14 +158,15 @@ public class RpcProgramNfs3 extends RpcP
   private Statistics statistics;
   private String writeDumpDir; // The dir save dump files
   
+  private final RpcCallCache rpcCallCache;
+
   public RpcProgramNfs3() throws IOException {
     this(new Configuration());
   }
 
-  public RpcProgramNfs3(Configuration config)
-      throws IOException {
+  public RpcProgramNfs3(Configuration config) throws IOException {
     super("NFS3", "localhost", Nfs3Constant.PORT, Nfs3Constant.PROGRAM,
-        Nfs3Constant.VERSION, Nfs3Constant.VERSION, 100);
+        Nfs3Constant.VERSION, Nfs3Constant.VERSION);
    
     config.set(FsPermission.UMASK_LABEL, "000");
     iug = new IdUserGroup();
@@ -183,6 +192,8 @@ public class RpcProgramNfs3 extends RpcP
     } else {
       clearDirectory(writeDumpDir);
     }
+
+    rpcCallCache = new RpcCallCache("NFS3", 256);
   }
 
   private void clearDirectory(String writeDumpDir) throws IOException {
@@ -213,8 +224,8 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public GETATTR3Response getattr(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public GETATTR3Response getattr(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     GETATTR3Response response = new GETATTR3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -294,8 +305,8 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public SETATTR3Response setattr(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
@@ -370,8 +381,8 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public LOOKUP3Response lookup(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public LOOKUP3Response lookup(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     LOOKUP3Response response = new LOOKUP3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -432,8 +443,8 @@ public class RpcProgramNfs3 extends RpcP
   }
   
   @Override
-  public ACCESS3Response access(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public ACCESS3Response access(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     ACCESS3Response response = new ACCESS3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -574,7 +585,6 @@ public class RpcProgramNfs3 extends RpcP
     long offset = request.getOffset();
     int count = request.getCount();
 
-    
     FileHandle handle = request.getHandle();
     if (LOG.isDebugEnabled()) {
       LOG.debug("NFS READ fileId: " + handle.getFileId() + " offset: " + offset
@@ -720,8 +730,8 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public CREATE3Response create(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public CREATE3Response create(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
@@ -973,8 +983,7 @@ public class RpcProgramNfs3 extends RpcP
       }
 
       String fileIdPath = dirFileIdPath + "/" + fileName;
-      HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient,
-          fileIdPath);
+      HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, fileIdPath);
       if (fstat == null) {
         WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
             preOpDirAttr);
@@ -1056,8 +1065,7 @@ public class RpcProgramNfs3 extends RpcP
       }
 
       String fileIdPath = dirFileIdPath + "/" + fileName;
-      HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient,
-          fileIdPath);
+      HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, fileIdPath);
       if (fstat == null) {
         return new RMDIR3Response(Nfs3Status.NFS3ERR_NOENT, errWcc);
       }
@@ -1098,8 +1106,8 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public RENAME3Response rename(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public RENAME3Response rename(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
@@ -1245,13 +1253,14 @@ public class RpcProgramNfs3 extends RpcP
     }
   }
 
-  public READDIR3Response link(XDR xdr, SecurityHandler securityHandler, InetAddress client) {
+  public READDIR3Response link(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP);
   }
 
   @Override
-  public READDIR3Response readdir(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public READDIR3Response readdir(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     READDIR3Response response = new READDIR3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -1540,8 +1549,8 @@ public class RpcProgramNfs3 extends RpcP
   }
   
   @Override
-  public FSSTAT3Response fsstat(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public FSSTAT3Response fsstat(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     FSSTAT3Response response = new FSSTAT3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -1598,8 +1607,8 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public FSINFO3Response fsinfo(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public FSINFO3Response fsinfo(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     FSINFO3Response response = new FSINFO3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -1650,8 +1659,8 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public PATHCONF3Response pathconf(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public PATHCONF3Response pathconf(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     PATHCONF3Response response = new PATHCONF3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -1697,8 +1706,8 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public COMMIT3Response commit(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public COMMIT3Response commit(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
@@ -1776,25 +1785,53 @@ public class RpcProgramNfs3 extends RpcP
   }
   
   @Override
-  public XDR handleInternal(RpcCall rpcCall, final XDR xdr, XDR out,
-      InetAddress client, Channel channel) {
+  public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
+    RpcCall rpcCall = (RpcCall) info.header();
     final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(rpcCall.getProcedure());
     int xid = rpcCall.getXid();
+    byte[] data = new byte[info.data().readableBytes()];
+    info.data().readBytes(data);
+    XDR xdr = new XDR(data);
+    XDR out = new XDR();
+    InetAddress client = ((InetSocketAddress) info.remoteAddress())
+        .getAddress();
+    Channel channel = info.channel();
 
     Credentials credentials = rpcCall.getCredential();
     // Ignore auth only for NFSPROC3_NULL, especially for Linux clients.
     if (nfsproc3 != NFSPROC3.NULL) {
-      if (rpcCall.getCredential().getFlavor() != AuthFlavor.AUTH_SYS
-          && rpcCall.getCredential().getFlavor() != AuthFlavor.RPCSEC_GSS) {
-        LOG.info("Wrong RPC AUTH flavor, "
-            + rpcCall.getCredential().getFlavor()
+      if (credentials.getFlavor() != AuthFlavor.AUTH_SYS
+          && credentials.getFlavor() != AuthFlavor.RPCSEC_GSS) {
+        LOG.info("Wrong RPC AUTH flavor, " + credentials.getFlavor()
             + " is not AUTH_SYS or RPCSEC_GSS.");
         XDR reply = new XDR();
         RpcDeniedReply rdr = new RpcDeniedReply(xid,
             RpcReply.ReplyState.MSG_ACCEPTED,
             RpcDeniedReply.RejectState.AUTH_ERROR, new VerifierNone());
         rdr.write(reply);
-        return reply;
+
+        ChannelBuffer buf = ChannelBuffers.wrappedBuffer(reply.asReadOnlyWrap()
+            .buffer());
+        RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
+        RpcUtil.sendRpcResponse(ctx, rsp);
+        return;
+      }
+    }
+
+    if (!isIdempotent(rpcCall)) {
+      RpcCallCache.CacheEntry entry = rpcCallCache.checkOrAddToCache(client,
+          xid);
+      if (entry != null) { // in cache
+        if (entry.isCompleted()) {
+          LOG.info("Sending the cached reply to retransmitted request " + xid);
+          RpcUtil.sendRpcResponse(ctx, entry.getResponse());
+          return;
+        } else { // else request is in progress
+          LOG.info("Retransmitted request, transaction still in progress "
+              + xid);
+          // Ignore the request and do nothing
+          return;
+        }
       }
     }
     
@@ -1862,12 +1899,24 @@ public class RpcProgramNfs3 extends RpcP
           RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write(
           out);
     }
-    if (response != null) {
-      // TODO: currently we just return VerifierNone
-      out = response.writeHeaderAndResponse(out, xid, new VerifierNone());
+    if (response == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("No sync response, expect an async response for request XID="
+            + rpcCall.getXid());
+      }
+      return;
+    }
+    // TODO: currently we just return VerifierNone
+    out = response.writeHeaderAndResponse(out, xid, new VerifierNone());
+    ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+        .buffer());
+    RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
+
+    if (!isIdempotent(rpcCall)) {
+      rpcCallCache.callCompleted(client, xid, rsp);
     }
 
-    return out;
+    RpcUtil.sendRpcResponse(ctx, rsp);
   }
   
   @Override

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1527726&r1=1527725&r2=1527726&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Sep 30 19:21:17 2013
@@ -345,6 +345,9 @@ Release 2.1.2 - UNRELEASED
 
   NEW FEATURES
 
+    HDFS-5230. Introduce RpcInfo to decouple XDR classes from the RPC API.
+    (Haohui Mai via brandonli)
+
   IMPROVEMENTS
 
     HDFS-5246. Make Hadoop nfs server port and mount daemon port