You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by br...@apache.org on 2013/06/07 23:45:07 UTC

svn commit: r1490845 [1/2] - in /hadoop/common/trunk/hadoop-common-project: ./ hadoop-common/ hadoop-nfs/ hadoop-nfs/src/ hadoop-nfs/src/main/ hadoop-nfs/src/main/java/ hadoop-nfs/src/main/java/org/ hadoop-nfs/src/main/java/org/apache/ hadoop-nfs/src/m...

Author: brandonli
Date: Fri Jun  7 21:45:06 2013
New Revision: 1490845

URL: http://svn.apache.org/r1490845
Log:
HADOOP-9509. Implement ONCRPC and XDR. Contributed by Brandon Li

Added:
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/   (with props)
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/README.txt
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/pom.xml
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAcceptedReply.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthInfo.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthSys.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCall.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcDeniedReply.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcFrameDecoder.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcMessage.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcReply.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapInterface.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapMapping.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapResponse.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAcceptedReply.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAuthInfo.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAuthSys.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCall.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcDeniedReply.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcMessage.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcReply.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java
Modified:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/trunk/hadoop-common-project/pom.xml

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1490845&r1=1490844&r2=1490845&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Fri Jun  7 21:45:06 2013
@@ -12,6 +12,8 @@ Trunk (Unreleased)
     HADOOP-8561. Introduce HADOOP_PROXY_USER for secure impersonation in child
     hadoop client processes. (Yu Gao via llu)
 
+    HADOOP-9509. Implement ONCRPC and XDR. (brandonli)
+
   IMPROVEMENTS
 
     HADOOP-8017. Configure hadoop-main pom to get rid of M2E plugin execution

Propchange: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Fri Jun  7 21:45:06 2013
@@ -0,0 +1,5 @@
+.classpath
+.git
+.project
+.settings
+target

Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/README.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/README.txt?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/README.txt (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/README.txt Fri Jun  7 21:45:06 2013
@@ -0,0 +1,10 @@
+Hadoop NFS
+
+Hadoop NFS is a Java library for building NFS gateway. It has
+the following components:
+
+- ONCRPC: This a implementation of ONCRPC(RFC-5531) and XDR(RFC-4506).
+- Mount: This an interface implementation of MOUNT protocol (RFC-1813).
+- Portmap: This is a implementation of Binding protocol(RFC-1833).
+- NFSv3: This is an interface implementation of NFSv3 protocol(RFC-1813).
+

Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/pom.xml?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/pom.xml (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/pom.xml Fri Jun  7 21:45:06 2013
@@ -0,0 +1,98 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+                      http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.hadoop</groupId>
+    <artifactId>hadoop-project</artifactId>
+    <version>3.0.0-SNAPSHOT</version>
+    <relativePath>../../hadoop-project</relativePath>
+  </parent>
+  <groupId>org.apache.hadoop</groupId>
+  <artifactId>hadoop-nfs</artifactId>
+  <version>3.0.0-SNAPSHOT</version>
+  <packaging>jar</packaging>
+
+  <name>Apache Hadoop NFS</name>
+  <description>Apache Hadoop NFS library</description>
+
+  <properties>
+    <maven.build.timestamp.format>yyyyMMdd</maven.build.timestamp.format>
+    <kerberos.realm>LOCALHOST</kerberos.realm>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <!-- Used, even though 'mvn dependency:analyze' doesn't find it -->
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-annotations</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.8.2</version>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>1.8.5</version>
+    </dependency>
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>javax.servlet</groupId>
+      <artifactId>servlet-api</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty</artifactId>
+      <version>3.6.2.Final</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>11.0.2</version>
+    </dependency>
+  </dependencies>
+</project>

Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java Fri Jun  7 21:45:06 2013
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+
+/**
+ * A simple client that registers an RPC program with portmap.
+ */
+public class RegistrationClient extends SimpleTcpClient {
+  public static final Log LOG = LogFactory.getLog(RegistrationClient.class);
+
+  public RegistrationClient(String host, int port, XDR request) {
+    super(host, port, request);
+  }
+
+  /**
+   * Handler to handle response from the server.
+   */
+  static class RegistrationClientHandler extends SimpleTcpClientHandler {
+    public RegistrationClientHandler(XDR request) {
+      super(request);
+    }
+
+    private boolean validMessageLength(int len) {
+      // 28 bytes is the minimal success response size (portmapV2)
+      if (len < 28) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Portmap mapping registration failed,"
+              + " the response size is less than 28 bytes:" + len);
+        }
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+      ChannelBuffer buf = (ChannelBuffer) e.getMessage(); // Read reply
+      if (!validMessageLength(buf.readableBytes())) {
+        e.getChannel().close();
+        return;
+      }
+
+      // handling fragment header for TCP, 4 bytes.
+      byte[] fragmentHeader = Arrays.copyOfRange(buf.array(), 0, 4);
+      int fragmentSize = XDR.fragmentSize(fragmentHeader);
+      boolean isLast = XDR.isLastFragment(fragmentHeader);
+      assert (fragmentSize == 28 && isLast == true);
+
+      XDR xdr = new XDR();
+      xdr.writeFixedOpaque(Arrays.copyOfRange(buf.array(), 4,
+          buf.readableBytes()));
+
+      RpcReply reply = RpcReply.read(xdr);
+      if (reply.getState() == RpcReply.ReplyState.MSG_ACCEPTED) {
+        RpcAcceptedReply acceptedReply = (RpcAcceptedReply) reply;
+        handle(acceptedReply, xdr);
+      } else {
+        RpcDeniedReply deniedReply = (RpcDeniedReply) reply;
+        handle(deniedReply);
+      }
+      e.getChannel().close(); // shutdown now that request is complete
+    }
+
+    private void handle(RpcDeniedReply deniedReply) {
+      LOG.warn("Portmap mapping registration request was denied , " + 
+          deniedReply);
+    }
+
+    private void handle(RpcAcceptedReply acceptedReply, XDR xdr) {
+      AcceptState acceptState = acceptedReply.getAcceptState();
+      assert (acceptState == AcceptState.SUCCESS);
+      boolean answer = xdr.readBoolean();
+      if (answer != true) {
+        LOG.warn("Portmap mapping registration failed, accept state:"
+            + acceptState);
+      }
+      LOG.info("Portmap mapping registration succeeded");
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAcceptedReply.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAcceptedReply.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAcceptedReply.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAcceptedReply.java Fri Jun  7 21:45:06 2013
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
+
+/** 
+ * Represents RPC message MSG_ACCEPTED reply body. See RFC 1831 for details.
+ * This response is sent to a request to indicate success of the request.
+ */
+public class RpcAcceptedReply extends RpcReply {
+  public enum AcceptState {
+    SUCCESS(0), /* RPC executed successfully */
+    PROG_UNAVAIL(1), /* remote hasn't exported program */
+    PROG_MISMATCH(2), /* remote can't support version # */
+    PROC_UNAVAIL(3), /* program can't support procedure */
+    GARBAGE_ARGS(4), /* procedure can't decode params */
+    SYSTEM_ERR(5); /* e.g. memory allocation failure */
+    
+    private final int value;
+
+    AcceptState(int value) {
+      this.value = value;
+    }
+
+    public static AcceptState fromValue(int value) {
+      return values()[value];
+    }
+
+    public int getValue() {
+      return value;
+    }
+  };
+
+  private final RpcAuthInfo verifier;
+  private final AcceptState acceptState;
+
+  RpcAcceptedReply(int xid, int messageType, ReplyState state,
+      RpcAuthInfo verifier, AcceptState acceptState) {
+    super(xid, messageType, state);
+    this.verifier = verifier;
+    this.acceptState = acceptState;
+  }
+
+  public static RpcAcceptedReply read(int xid, int messageType,
+      ReplyState replyState, XDR xdr) {
+    RpcAuthInfo verifier = RpcAuthInfo.read(xdr);
+    AcceptState acceptState = AcceptState.fromValue(xdr.readInt());
+    return new RpcAcceptedReply(xid, messageType, replyState, verifier,
+        acceptState);
+  }
+
+  public RpcAuthInfo getVerifier() {
+    return verifier;
+  }
+
+  public AcceptState getAcceptState() {
+    return acceptState;
+  }
+  
+  public static XDR voidReply(XDR xdr, int xid) {
+    return voidReply(xdr, xid, AcceptState.SUCCESS);
+  }
+  
+  public static XDR voidReply(XDR xdr, int xid, AcceptState acceptState) {
+    xdr.writeInt(xid);
+    xdr.writeInt(RpcMessage.RPC_REPLY);
+    xdr.writeInt(ReplyState.MSG_ACCEPTED.getValue());
+    xdr.writeInt(AuthFlavor.AUTH_NONE.getValue());
+    xdr.writeVariableOpaque(new byte[0]);
+    xdr.writeInt(acceptState.getValue());
+    return xdr;
+  }
+}

Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthInfo.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthInfo.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthInfo.java Fri Jun  7 21:45:06 2013
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.util.Arrays;
+
+/**
+ *  Authentication Info as defined in RFC 1831
+ */
+public class RpcAuthInfo {
+  /** Different types of authentication as defined in RFC 1831 */
+  public enum AuthFlavor {
+    AUTH_NONE(0),
+    AUTH_SYS(1),
+    AUTH_SHORT(2),
+    AUTH_DH(3),
+    RPCSEC_GSS(6);
+    
+    private int value;
+    
+    AuthFlavor(int value) {
+      this.value = value;
+    }
+    
+    public int getValue() {
+      return value;
+    }
+    
+    static AuthFlavor fromValue(int value) {
+      for (AuthFlavor v : values()) {
+        if (v.value == value) {
+          return v;
+        }
+      }
+      throw new IllegalArgumentException("Invalid AuthFlavor value " + value);
+    }
+  }
+  
+  private final AuthFlavor flavor;
+  private final byte[] body;
+  
+  protected RpcAuthInfo(AuthFlavor flavor, byte[] body) {
+    this.flavor = flavor;
+    this.body = body;
+  }
+  
+  public static RpcAuthInfo read(XDR xdr) {
+    int type = xdr.readInt();
+    AuthFlavor flavor = AuthFlavor.fromValue(type);
+    byte[] body = xdr.readVariableOpaque();
+    return new RpcAuthInfo(flavor, body);
+  }
+  
+  public AuthFlavor getFlavor() {
+    return flavor;
+  }
+
+  public byte[] getBody() {
+    return Arrays.copyOf(body, body.length);
+  }
+  
+  @Override
+  public String toString() {
+    return "(AuthFlavor:" + flavor + ")";
+  }
+}

Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthSys.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthSys.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthSys.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthSys.java Fri Jun  7 21:45:06 2013
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+/**
+ * AUTH_SYS as defined in RFC 1831
+ */
+public class RpcAuthSys {
+  private final int uid;
+  private final int gid;
+
+  public RpcAuthSys(int uid, int gid) {
+    this.uid = uid;
+    this.gid = gid;
+  }
+  
+  public static RpcAuthSys from(byte[] credentials) {
+    XDR sys = new XDR(credentials);
+    sys.skip(4); // Stamp
+    sys.skipVariableOpaque(); // Machine name
+    return new RpcAuthSys(sys.readInt(), sys.readInt());
+  }
+  
+  public int getUid() {
+    return uid;
+  }
+
+  public int getGid() {
+    return gid;
+  }
+
+  @Override
+  public String toString() {
+    return "(AuthSys: uid=" + uid + " gid=" + gid + ")";
+  }
+}

Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCall.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCall.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCall.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCall.java Fri Jun  7 21:45:06 2013
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Represents an RPC message of type RPC call as defined in RFC 1831
+ */
+public class RpcCall extends RpcMessage {
+  public static final int RPC_VERSION = 2;
+  private static final Log LOG = LogFactory.getLog(RpcCall.class);
+  private final int rpcVersion;
+  private final int program;
+  private final int version;
+  private final int procedure;
+  private final RpcAuthInfo credential;
+  private final RpcAuthInfo verifier;
+
+  protected RpcCall(int xid, int messageType, int rpcVersion, int program,
+      int version, int procedure, RpcAuthInfo credential, RpcAuthInfo verifier) {
+    super(xid, messageType);
+    this.rpcVersion = rpcVersion;
+    this.program = program;
+    this.version = version;
+    this.procedure = procedure;
+    this.credential = credential;
+    this.verifier = verifier;
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(this);
+    }
+    validate();
+  }
+  
+  private void validateRpcVersion() {
+    if (rpcVersion != RPC_VERSION) {
+      throw new IllegalArgumentException("RPC version is expected to be "
+          + RPC_VERSION + " but got " + rpcVersion);
+    }
+  }
+  
+  public void validate() {
+    validateMessageType(RPC_CALL);
+    validateRpcVersion();
+    // Validate other members
+    // Throw exception if validation fails
+  }
+
+
+  public int getRpcVersion() {
+    return rpcVersion;
+  }
+
+  public int getProgram() {
+    return program;
+  }
+
+  public int getVersion() {
+    return version;
+  }
+
+  public int getProcedure() {
+    return procedure;
+  }
+  
+  public RpcAuthInfo getCredential() {
+    return credential;
+  }
+
+  public RpcAuthInfo getVerifier() {
+    return verifier;
+  }
+  
+  public static RpcCall read(XDR xdr) {
+    return new RpcCall(xdr.readInt(), xdr.readInt(), xdr.readInt(), xdr.readInt(),
+        xdr.readInt(), xdr.readInt(), RpcAuthInfo.read(xdr),
+        RpcAuthInfo.read(xdr));
+  }
+  
+  public static void write(XDR out, int xid, int program, int progVersion,
+      int procedure) {
+    out.writeInt(xid);
+    out.writeInt(RpcMessage.RPC_CALL);
+    out.writeInt(2);
+    out.writeInt(program);
+    out.writeInt(progVersion);
+    out.writeInt(procedure);
+  }
+  
+  @Override
+  public String toString() {
+    return String.format("Xid:%d, messageType:%d, rpcVersion:%d, program:%d,"
+        + " version:%d, procedure:%d, credential:%s, verifier:%s", xid,
+        messageType, rpcVersion, program, version, procedure,
+        credential.toString(), verifier.toString());
+  }
+}

Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java Fri Jun  7 21:45:06 2013
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.net.InetAddress;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This class is used for handling the duplicate <em>non-idempotenty</em> Rpc
+ * calls. A non-idempotent request is processed as follows:
+ * <ul>
+ * <li>If the request is being processed for the first time, its state is
+ * in-progress in cache.</li>
+ * <li>If the request is retransimitted and is in-progress state, it is ignored.
+ * </li>
+ * <li>If the request is retransimitted and is completed, the previous response
+ * from the cache is sent back to the client.</li>
+ * </ul>
+ * <br>
+ * A request is identified by the client ID (address of the client) and
+ * transaction ID (xid) from the Rpc call.
+ * 
+ */
+public class RpcCallCache {
+  
+  public static class CacheEntry {
+    private XDR response; // null if no response has been sent
+    
+    public CacheEntry() {
+      response = null;
+    }
+      
+    public boolean isInProgress() {
+      return response == null;
+    }
+    
+    public boolean isCompleted() {
+      return response != null;
+    }
+    
+    public XDR getResponse() {
+      return response;
+    }
+    
+    public void setResponse(XDR response) {
+      this.response = response;
+    }
+  }
+  
+  /**
+   * Call that is used to track a client in the {@link RpcCallCache}
+   */
+  public static class ClientRequest {
+    protected final InetAddress clientId;
+    protected final int xid;
+
+    public InetAddress getClientId() {
+      return clientId;
+    }
+
+    public ClientRequest(InetAddress clientId, int xid) {
+      this.clientId = clientId;
+      this.xid = xid;
+    }
+
+    @Override
+    public int hashCode() {
+	  return xid + clientId.hashCode() * 31;
+    }
+    
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null || !(obj instanceof ClientRequest)) {
+        return false;
+      }
+      ClientRequest other = (ClientRequest) obj;
+      return clientId.equals(other.clientId) && (xid == other.xid);
+    }
+  }
+  
+  private final String program;
+  
+  private final Map<ClientRequest, CacheEntry> map;
+  
+  public RpcCallCache(final String program, final int maxEntries) {
+    if (maxEntries <= 0) {
+      throw new IllegalArgumentException("Cache size is " + maxEntries
+          + ". Should be > 0");
+    }
+    this.program = program;
+    map = new LinkedHashMap<ClientRequest, CacheEntry>() {
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      protected boolean removeEldestEntry(
+          java.util.Map.Entry<ClientRequest, CacheEntry> eldest) {
+        return RpcCallCache.this.size() > maxEntries;
+      }
+    };
+  }
+  
+  /** Return the program name */
+  public String getProgram() {
+    return program;
+  }
+
+  /** Mark a request as completed and add corresponding response to the cache */
+  public void callCompleted(InetAddress clientId, int xid, XDR response) {
+    ClientRequest req = new ClientRequest(clientId, xid);
+    CacheEntry e;
+    synchronized(map) {
+      e = map.get(req);
+    }
+    e.setResponse(response);
+  }
+  
+  /**
+   * Check the cache for an entry. If it does not exist, add the request
+   * as in progress.
+   */
+  public CacheEntry checkOrAddToCache(InetAddress clientId, int xid) {
+    ClientRequest req = new ClientRequest(clientId, xid);
+    CacheEntry e;
+    synchronized(map) {
+      e = map.get(req);
+      if (e == null) {
+        // Add an inprogress cache entry
+        map.put(req, new CacheEntry());
+      }
+    }
+    return e;
+  }
+  
+  /** Return number of cached entries */
+  public int size() {
+    return map.size();
+  }
+  
+  /** 
+   * Iterator to the cache entries 
+   * @return iterator
+   */
+  @VisibleForTesting
+  public Iterator<Entry<ClientRequest, CacheEntry>> iterator() {
+    return map.entrySet().iterator();
+  }
+}

Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcDeniedReply.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcDeniedReply.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcDeniedReply.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcDeniedReply.java Fri Jun  7 21:45:06 2013
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
+
+/** 
+ * Represents RPC message MSG_DENIED reply body. See RFC 1831 for details.
+ * This response is sent to a request to indicate failure of the request.
+ */
+public class RpcDeniedReply extends RpcReply {
+  public enum RejectState {
+    RPC_MISMATCH(0), AUTH_ERROR(1);
+
+    private final int value;
+
+    RejectState(int value) {
+      this.value = value;
+    }
+
+    int getValue() {
+      return value;
+    }
+
+    static RejectState fromValue(int value) {
+      return values()[value];
+    }
+  }
+
+  private final RejectState rejectState;
+
+  RpcDeniedReply(int xid, int messageType, ReplyState replyState,
+      RejectState rejectState) {
+    super(xid, messageType, replyState);
+    this.rejectState = rejectState;
+  }
+
+  public static RpcDeniedReply read(int xid, int messageType,
+      ReplyState replyState, XDR xdr) {
+    RejectState rejectState = RejectState.fromValue(xdr.readInt());
+    return new RpcDeniedReply(xid, messageType, replyState, rejectState);
+  }
+
+  public RejectState getRejectState() {
+    return rejectState;
+  }
+  
+  @Override
+  public String toString() {
+    return new StringBuffer().append("xid:").append(xid)
+        .append(",messageType:").append(messageType).append("rejectState:")
+        .append(rejectState).toString();
+  }
+  
+  public static XDR voidReply(XDR xdr, int xid, ReplyState msgAccepted,
+      RejectState rejectState) {
+    xdr.writeInt(xid);
+    xdr.writeInt(RpcMessage.RPC_REPLY);
+    xdr.writeInt(msgAccepted.getValue());
+    xdr.writeInt(AuthFlavor.AUTH_NONE.getValue());
+    xdr.writeVariableOpaque(new byte[0]);
+    xdr.writeInt(rejectState.getValue());
+    return xdr;
+  }
+}

Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcFrameDecoder.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcFrameDecoder.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcFrameDecoder.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcFrameDecoder.java Fri Jun  7 21:45:06 2013
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+/**
+ * {@link FrameDecoder} for RPC messages.
+ */
+public class RpcFrameDecoder extends FrameDecoder {
+  public static final Log LOG = LogFactory.getLog(RpcFrameDecoder.class);
+  private ChannelBuffer frame;
+
+  /**
+   * Decode an RPC message received on the socket.
+   * @return mpnull if incomplete message is received.
+   */
+  @Override
+  protected Object decode(ChannelHandlerContext ctx, Channel channel,
+      ChannelBuffer buf) {
+
+    // Make sure if the length field was received.
+    if (buf.readableBytes() < 4) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Length field is not received yet");
+      }
+      return null;
+    }
+
+    // Note the index and go back to it when an incomplete message is received
+    buf.markReaderIndex();
+
+    // Read the record marking.
+    ChannelBuffer fragmentHeader = buf.readBytes(4);
+    int length = XDR.fragmentSize(fragmentHeader.array());
+    boolean isLast = XDR.isLastFragment(fragmentHeader.array());
+
+    // Make sure if there's enough bytes in the buffer.
+    if (buf.readableBytes() < length) {
+
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(length + " bytes are not received yet");
+      }
+      buf.resetReaderIndex(); // Go back to the right reader index
+      return null;
+    }
+
+    if (frame == null) {
+      frame = buf.readBytes(length);
+    } else {
+      ChannelBuffer tmp = ChannelBuffers.copiedBuffer(frame.array(), buf
+          .readBytes(length).array());
+      frame = tmp;
+    }
+
+    // Successfully decoded a frame. Return the decoded frame if the frame is
+    // the last one. Otherwise, wait for the next frame.
+    if (isLast) {
+      ChannelBuffer completeFrame = frame;
+      frame = null;
+      return completeFrame;
+    } else {
+      LOG.info("Wait for the next frame. This rarely happens.");
+      return null;
+    }
+  }
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcMessage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcMessage.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcMessage.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcMessage.java Fri Jun  7 21:45:06 2013
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+/**
+ * Represent an RPC message as defined in RFC 1831.
+ */
+public abstract class RpcMessage {
+  public static final int RPC_CALL = 0;
+  public static final int RPC_REPLY = 1;
+
+  protected final int xid;
+  protected final int messageType;
+  
+  RpcMessage(int xid, int messageType) {
+    if (messageType != RPC_CALL && messageType != RPC_REPLY) {
+      throw new IllegalArgumentException("Invalid message type " + messageType);
+    }
+    this.xid = xid;
+    this.messageType = messageType;
+  }
+  
+  public int getXid() {
+    return xid;
+  }
+
+  public int getMessageType() {
+    return messageType;
+  }
+  
+  protected void validateMessageType(int expected) {
+    if (expected != messageType) {
+      throw new IllegalArgumentException("Message type is expected to be "
+          + expected + " but got " + messageType);
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java Fri Jun  7 21:45:06 2013
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.io.IOException;
+import java.net.InetAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
+import org.apache.hadoop.oncrpc.RpcCallCache.CacheEntry;
+import org.apache.hadoop.portmap.PortmapMapping;
+import org.apache.hadoop.portmap.PortmapRequest;
+import org.jboss.netty.channel.Channel;
+
+/**
+ * Class for writing RPC server programs based on RFC 1050. Extend this class
+ * and implement {@link #handleInternal} to handle the requests received.
+ */
+public abstract class RpcProgram {
+  private static final Log LOG = LogFactory.getLog(RpcProgram.class);
+  public static final int RPCB_PORT = 111;
+  private final String program;
+  private final String host;
+  private final int port;
+  private final int progNumber;
+  private final int lowProgVersion;
+  private final int highProgVersion;
+  private final RpcCallCache rpcCallCache;
+  
+  /**
+   * Constructor
+   * 
+   * @param program program name
+   * @param host host where the Rpc server program is started
+   * @param port port where the Rpc server program is listening to
+   * @param progNumber program number as defined in RFC 1050
+   * @param lowProgVersion lowest version of the specification supported
+   * @param highProgVersion highest version of the specification supported
+   * @param cacheSize size of cache to handle duplciate requests. Size <= 0
+   *          indicates no cache.
+   */
+  protected RpcProgram(String program, String host, int port, int progNumber,
+      int lowProgVersion, int highProgVersion, int cacheSize) {
+    this.program = program;
+    this.host = host;
+    this.port = port;
+    this.progNumber = progNumber;
+    this.lowProgVersion = lowProgVersion;
+    this.highProgVersion = highProgVersion;
+    this.rpcCallCache = cacheSize > 0 ? new RpcCallCache(program, cacheSize)
+        : null;
+  }
+
+  /**
+   * Register this program with the local portmapper.
+   */
+  public void register(int transport) {
+    // Register all the program versions with portmapper for a given transport
+    for (int vers = lowProgVersion; vers <= highProgVersion; vers++) {
+      register(vers, transport);
+    }
+  }
+  
+  /**
+   * Register this program with the local portmapper.
+   */
+  private void register(int progVersion, int transport) {
+    PortmapMapping mapEntry = new PortmapMapping(progNumber, progVersion,
+        transport, port);
+    register(mapEntry);
+  }
+  
+  /**
+   * Register the program with Portmap or Rpcbind
+   */
+  protected void register(PortmapMapping mapEntry) {
+    XDR mappingRequest = PortmapRequest.create(mapEntry);
+    SimpleUdpClient registrationClient = new SimpleUdpClient(host, RPCB_PORT,
+        mappingRequest);
+    try {
+      registrationClient.run();
+    } catch (IOException e) {
+      LOG.error("Registration failure with " + host + ":" + port
+          + ", portmap entry: " + mapEntry);
+      throw new RuntimeException("Registration failure");
+    }
+  }
+
+  /**
+   * Handle an RPC request.
+   * @param rpcCall RPC call that is received
+   * @param in xdr with cursor at reading the remaining bytes of a method call
+   * @param out xdr output corresponding to Rpc reply
+   * @param client making the Rpc request
+   * @param channel connection over which Rpc request is received
+   * @return response xdr response
+   */
+  protected abstract XDR handleInternal(RpcCall rpcCall, XDR in, XDR out,
+      InetAddress client, Channel channel);
+  
+  public XDR handle(XDR xdr, InetAddress client, Channel channel) {
+    XDR out = new XDR();
+    RpcCall rpcCall = RpcCall.read(xdr);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(program + " procedure #" + rpcCall.getProcedure());
+    }
+    
+    if (!checkProgram(rpcCall.getProgram())) {
+      return programMismatch(out, rpcCall);
+    }
+
+    if (!checkProgramVersion(rpcCall.getVersion())) {
+      return programVersionMismatch(out, rpcCall);
+    }
+    
+    // Check for duplicate requests in the cache for non-idempotent requests
+    boolean idempotent = rpcCallCache != null && !isIdempotent(rpcCall);
+    if (idempotent) {
+      CacheEntry entry = rpcCallCache.checkOrAddToCache(client, rpcCall.getXid());
+      if (entry != null) { // in ache 
+        if (entry.isCompleted()) {
+          LOG.info("Sending the cached reply to retransmitted request "
+              + rpcCall.getXid());
+          return entry.getResponse();
+        } else { // else request is in progress
+          LOG.info("Retransmitted request, transaction still in progress "
+              + rpcCall.getXid());
+          // TODO: ignore the request?
+        }
+      }
+    }
+    
+    XDR response = handleInternal(rpcCall, xdr, out, client, channel);
+    if (response.size() == 0) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("No sync response, expect an async response for request XID="
+            + rpcCall.getXid());
+      }
+    }
+    
+    // Add the request to the cache
+    if (idempotent) {
+      rpcCallCache.callCompleted(client, rpcCall.getXid(), response);
+    }
+    return response;
+  }
+  
+  private XDR programMismatch(XDR out, RpcCall call) {
+    LOG.warn("Invalid RPC call program " + call.getProgram());
+    RpcAcceptedReply.voidReply(out, call.getXid(), AcceptState.PROG_UNAVAIL);
+    return out;
+  }
+  
+  private XDR programVersionMismatch(XDR out, RpcCall call) {
+    LOG.warn("Invalid RPC call version " + call.getVersion());
+    RpcAcceptedReply.voidReply(out, call.getXid(), AcceptState.PROG_MISMATCH);
+    out.writeInt(lowProgVersion);
+    out.writeInt(highProgVersion);
+    return out;
+  }
+  
+  private boolean checkProgram(int progNumber) {
+    return this.progNumber == progNumber;
+  }
+  
+  /** Return true if a the program version in rpcCall is supported */
+  private boolean checkProgramVersion(int programVersion) {
+    return programVersion >= lowProgVersion
+        && programVersion <= highProgVersion;
+  }
+  
+  @Override
+  public String toString() {
+    return "Rpc program: " + program + " at " + host + ":" + port;
+  }
+  
+  protected abstract boolean isIdempotent(RpcCall call);
+  
+  public int getPort() {
+    return port;
+  }
+}

Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcReply.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcReply.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcReply.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcReply.java Fri Jun  7 21:45:06 2013
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+/**
+ * Represents an RPC message of type RPC reply as defined in RFC 1831
+ */
+public abstract class RpcReply extends RpcMessage {
+  /** RPC reply_stat as defined in RFC 1831 */
+  public enum ReplyState {
+    MSG_ACCEPTED(0),
+    MSG_DENIED(1);
+    
+    private final int value;
+    ReplyState(int value) {
+      this.value = value;
+    }
+    
+    int getValue() {
+      return value;
+    }
+    
+    public static ReplyState fromValue(int value) {
+      return values()[value];
+    }
+  }
+  
+  private final ReplyState state;
+  
+  RpcReply(int xid, int messageType, ReplyState state) {
+    super(xid, messageType);
+    this.state = state;
+    validateMessageType(RPC_REPLY);
+  }
+
+  public static RpcReply read(XDR xdr) {
+    int xid = xdr.readInt();
+    int messageType = xdr.readInt();
+    ReplyState stat = ReplyState.fromValue(xdr.readInt());
+    switch (stat) {
+    case MSG_ACCEPTED:
+      return RpcAcceptedReply.read(xid, messageType, stat, xdr);
+    case MSG_DENIED:
+      return RpcDeniedReply.read(xid, messageType, stat, xdr);
+    }
+    return null;
+  }
+
+  public ReplyState getState() {
+    return state;
+  }
+}

Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java Fri Jun  7 21:45:06 2013
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+/**
+ * The XID in RPC call. It is used for starting with new seed after each reboot.
+ */
+public class RpcUtil {
+  private static int xid = (int) (System.currentTimeMillis() / 1000) << 12;
+
+  public static int getNewXid(String caller) {
+    return xid = ++xid + caller.hashCode();
+  }
+}

Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java Fri Jun  7 21:45:06 2013
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+
+import org.apache.hadoop.oncrpc.RpcFrameDecoder;
+import org.apache.hadoop.oncrpc.XDR;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+
+/**
+ * A simple TCP based RPC client which just sends a request to a server.
+ */
+public class SimpleTcpClient {
+  protected final String host;
+  protected final int port;
+  protected final XDR request;
+  protected ChannelPipelineFactory pipelineFactory;
+  protected final boolean oneShot;
+  
+  public SimpleTcpClient(String host, int port, XDR request) {
+    this(host,port, request, true);
+  }
+  
+  public SimpleTcpClient(String host, int port, XDR request, Boolean oneShot) {
+    this.host = host;
+    this.port = port;
+    this.request = request;
+    this.oneShot = oneShot;
+  }
+  
+  protected ChannelPipelineFactory setPipelineFactory() {
+    this.pipelineFactory = new ChannelPipelineFactory() {
+      @Override
+      public ChannelPipeline getPipeline() {
+        return Channels.pipeline(new RpcFrameDecoder(),
+            new SimpleTcpClientHandler(request));
+      }
+    };
+    return this.pipelineFactory;
+  }
+
+  public void run() {
+    // Configure the client.
+    ChannelFactory factory = new NioClientSocketChannelFactory(
+        Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), 1, 1);
+    ClientBootstrap bootstrap = new ClientBootstrap(factory);
+
+    // Set up the pipeline factory.
+    bootstrap.setPipelineFactory(setPipelineFactory());
+
+    bootstrap.setOption("tcpNoDelay", true);
+    bootstrap.setOption("keepAlive", true);
+
+    // Start the connection attempt.
+    ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
+
+    if (oneShot) {
+      // Wait until the connection is closed or the connection attempt fails.
+      future.getChannel().getCloseFuture().awaitUninterruptibly();
+
+      // Shut down thread pools to exit.
+      bootstrap.releaseExternalResources();
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java Fri Jun  7 21:45:06 2013
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+
+/**
+ * A simple TCP based RPC client handler used by {@link SimpleTcpServer}.
+ */
+public class SimpleTcpClientHandler extends SimpleChannelHandler {
+  public static final Log LOG = LogFactory.getLog(SimpleTcpClient.class);
+  protected final XDR request;
+
+  public SimpleTcpClientHandler(XDR request) {
+    this.request = request;
+  }
+
+  @Override
+  public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
+    // Send the request
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("sending PRC request");
+    }
+    ChannelBuffer outBuf = XDR.writeMessageTcp(request, true);
+    e.getChannel().write(outBuf);
+  }
+
+  /**
+   * Shutdown connection by default. Subclass can override this method to do
+   * more interaction with the server.
+   */
+  @Override
+  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+    e.getChannel().close();
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+    LOG.warn("Unexpected exception from downstream: ", e.getCause());
+    e.getChannel().close();
+  }
+}

Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java Fri Jun  7 21:45:06 2013
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+
+/**
+ * Simple UDP server implemented using netty.
+ */
+public class SimpleTcpServer {
+  public static final Log LOG = LogFactory.getLog(SimpleTcpServer.class);
+  protected final int port;
+  protected final ChannelPipelineFactory pipelineFactory;
+  protected final RpcProgram rpcProgram;
+  
+  /** The maximum number of I/O worker threads */
+  protected final int workerCount;
+
+  /**
+   * @param port TCP port where to start the server at
+   * @param program RPC program corresponding to the server
+   * @param workercount Number of worker threads
+   */
+  public SimpleTcpServer(int port, RpcProgram program, int workercount) {
+    this.port = port;
+    this.rpcProgram = program;
+    this.workerCount = workercount;
+    this.pipelineFactory = getPipelineFactory();
+  }
+
+  public ChannelPipelineFactory getPipelineFactory() {
+    return new ChannelPipelineFactory() {
+      @Override
+      public ChannelPipeline getPipeline() {
+        return Channels.pipeline(new RpcFrameDecoder(),
+            new SimpleTcpServerHandler(rpcProgram));
+      }
+    };
+  }
+  
+  public void run() {
+    // Configure the Server.
+    ChannelFactory factory;
+    if (workerCount == 0) {
+      // Use default workers: 2 * the number of available processors
+      factory = new NioServerSocketChannelFactory(
+          Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+    } else {
+      factory = new NioServerSocketChannelFactory(
+          Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
+          workerCount);
+    }
+    
+    ServerBootstrap bootstrap = new ServerBootstrap(factory);
+    bootstrap.setPipelineFactory(pipelineFactory);
+    bootstrap.setOption("child.tcpNoDelay", true);
+    bootstrap.setOption("child.keepAlive", true);
+    
+    // Listen to TCP port
+    bootstrap.bind(new InetSocketAddress(port));
+
+    LOG.info("Started listening to TCP requests at port " + port + " for "
+        + rpcProgram + " with workerCount " + workerCount);
+  }
+}

Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java Fri Jun  7 21:45:06 2013
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+
+/**
+ * Handler used by {@link SimpleTcpServer}.
+ */
+public class SimpleTcpServerHandler extends SimpleChannelHandler {
+  public static final Log LOG = LogFactory.getLog(SimpleTcpServerHandler.class);
+
+  protected final RpcProgram rpcProgram;
+
+  public SimpleTcpServerHandler(RpcProgram rpcProgram) {
+    this.rpcProgram = rpcProgram;
+  }
+  
+  @Override
+  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+    ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+    XDR request = new XDR(buf.array());
+    
+    InetAddress remoteInetAddr = ((InetSocketAddress) ctx.getChannel()
+        .getRemoteAddress()).getAddress();
+    Channel outChannel = e.getChannel();
+    XDR response = rpcProgram.handle(request, remoteInetAddr, outChannel);
+    if (response.size() > 0) {
+      outChannel.write(XDR.writeMessageTcp(response, true));
+    }
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+    LOG.warn("Encountered ", e.getCause());
+    e.getChannel().close();
+  }
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java Fri Jun  7 21:45:06 2013
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.util.Arrays;
+
+/**
+ * A simple UDP based RPC client which just sends one request to a server.
+ */
+public class SimpleUdpClient {
+  protected final String host;
+  protected final int port;
+  protected final XDR request;
+  protected final boolean oneShot;
+
+  public SimpleUdpClient(String host, int port, XDR request) {
+    this(host, port, request, true);
+  }
+
+  public SimpleUdpClient(String host, int port, XDR request, Boolean oneShot) {
+    this.host = host;
+    this.port = port;
+    this.request = request;
+    this.oneShot = oneShot;
+  }
+
+  public void run() throws IOException {
+    DatagramSocket clientSocket = new DatagramSocket();
+    InetAddress IPAddress = InetAddress.getByName(host);
+    byte[] sendData = request.getBytes();
+    byte[] receiveData = new byte[65535];
+
+    DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length,
+        IPAddress, port);
+    clientSocket.send(sendPacket);
+    DatagramPacket receivePacket = new DatagramPacket(receiveData,
+        receiveData.length);
+    clientSocket.receive(receivePacket);
+
+    // Check reply status
+    XDR xdr = new XDR();
+    xdr.writeFixedOpaque(Arrays.copyOfRange(receiveData, 0,
+        receivePacket.getLength()));
+    RpcReply reply = RpcReply.read(xdr);
+    if (reply.getState() != RpcReply.ReplyState.MSG_ACCEPTED) {
+      throw new IOException("Request failed: " + reply.getState());
+    }
+
+    clientSocket.close();
+  }
+}

Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java Fri Jun  7 21:45:06 2013
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.DatagramChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
+
+/**
+ * Simple UDP server implemented based on netty.
+ */
+public class SimpleUdpServer {
+  public static final Log LOG = LogFactory.getLog(SimpleUdpServer.class);
+  private final int SEND_BUFFER_SIZE = 65536;
+  private final int RECEIVE_BUFFER_SIZE = 65536;
+
+  protected final int port;
+  protected final ChannelPipelineFactory pipelineFactory;
+  protected final RpcProgram rpcProgram;
+  protected final int workerCount;
+
+  public SimpleUdpServer(int port, RpcProgram program, int workerCount) {
+    this.port = port;
+    this.rpcProgram = program;
+    this.workerCount = workerCount;
+    this.pipelineFactory = new ChannelPipelineFactory() {
+      @Override
+      public ChannelPipeline getPipeline() {
+        return Channels.pipeline(new SimpleUdpServerHandler(rpcProgram));
+      }
+    };
+  }
+
+  public void run() {
+    // Configure the client.
+    DatagramChannelFactory f = new NioDatagramChannelFactory(
+        Executors.newCachedThreadPool(), workerCount);
+
+    ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);
+    ChannelPipeline p = b.getPipeline();
+    p.addLast("handler", new SimpleUdpServerHandler(rpcProgram));
+
+    b.setOption("broadcast", "false");
+    b.setOption("sendBufferSize", SEND_BUFFER_SIZE);
+    b.setOption("receiveBufferSize", RECEIVE_BUFFER_SIZE);
+    
+    // Listen to the UDP port
+    b.bind(new InetSocketAddress(port));
+
+    LOG.info("Started listening to UDP requests at port " + port + " for "
+        + rpcProgram + " with workerCount " + workerCount);
+  }
+}

Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java Fri Jun  7 21:45:06 2013
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+
+/**
+ * Handler used by {@link SimpleUdpServer}.
+ */
+public class SimpleUdpServerHandler extends SimpleChannelHandler {
+  public static final Log LOG = LogFactory.getLog(SimpleUdpServerHandler.class);
+  private final RpcProgram rpcProgram;
+
+  public SimpleUdpServerHandler(RpcProgram rpcProgram) {
+    this.rpcProgram = rpcProgram;
+  }
+
+  @Override
+  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+    ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+
+    XDR request = new XDR();
+
+    request.writeFixedOpaque(buf.array());
+    InetAddress remoteInetAddr = ((InetSocketAddress) e.getRemoteAddress())
+        .getAddress();
+    XDR response = rpcProgram.handle(request, remoteInetAddr, null);
+    e.getChannel().write(XDR.writeMessageUdp(response), e.getRemoteAddress());
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+    LOG.warn("Encountered ", e.getCause());
+    e.getChannel().close();
+  }
+}
\ No newline at end of file