You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by br...@apache.org on 2013/06/07 23:45:07 UTC
svn commit: r1490845 [1/2] - in /hadoop/common/trunk/hadoop-common-project:
./ hadoop-common/ hadoop-nfs/ hadoop-nfs/src/ hadoop-nfs/src/main/
hadoop-nfs/src/main/java/ hadoop-nfs/src/main/java/org/
hadoop-nfs/src/main/java/org/apache/ hadoop-nfs/src/m...
Author: brandonli
Date: Fri Jun 7 21:45:06 2013
New Revision: 1490845
URL: http://svn.apache.org/r1490845
Log:
HADOOP-9509. Implement ONCRPC and XDR. Contributed by Brandon Li
Added:
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/ (with props)
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/README.txt
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/pom.xml
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAcceptedReply.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthInfo.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthSys.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCall.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcDeniedReply.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcFrameDecoder.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcMessage.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcReply.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapInterface.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapMapping.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapResponse.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAcceptedReply.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAuthInfo.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAuthSys.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCall.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcDeniedReply.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcMessage.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcReply.java
hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java
Modified:
hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
hadoop/common/trunk/hadoop-common-project/pom.xml
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1490845&r1=1490844&r2=1490845&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Fri Jun 7 21:45:06 2013
@@ -12,6 +12,8 @@ Trunk (Unreleased)
HADOOP-8561. Introduce HADOOP_PROXY_USER for secure impersonation in child
hadoop client processes. (Yu Gao via llu)
+ HADOOP-9509. Implement ONCRPC and XDR. (brandonli)
+
IMPROVEMENTS
HADOOP-8017. Configure hadoop-main pom to get rid of M2E plugin execution
Propchange: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Fri Jun 7 21:45:06 2013
@@ -0,0 +1,5 @@
+.classpath
+.git
+.project
+.settings
+target
Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/README.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/README.txt?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/README.txt (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/README.txt Fri Jun 7 21:45:06 2013
@@ -0,0 +1,10 @@
+Hadoop NFS
+
+Hadoop NFS is a Java library for building NFS gateway. It has
+the following components:
+
+- ONCRPC: This a implementation of ONCRPC(RFC-5531) and XDR(RFC-4506).
+- Mount: This an interface implementation of MOUNT protocol (RFC-1813).
+- Portmap: This is a implementation of Binding protocol(RFC-1833).
+- NFSv3: This is an interface implementation of NFSv3 protocol(RFC-1813).
+
Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/pom.xml?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/pom.xml (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/pom.xml Fri Jun 7 21:45:06 2013
@@ -0,0 +1,98 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+ http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-project</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+ <relativePath>../../hadoop-project</relativePath>
+ </parent>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-nfs</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+ <packaging>jar</packaging>
+
+ <name>Apache Hadoop NFS</name>
+ <description>Apache Hadoop NFS library</description>
+
+ <properties>
+ <maven.build.timestamp.format>yyyyMMdd</maven.build.timestamp.format>
+ <kerberos.realm>LOCALHOST</kerberos.realm>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <!-- Used, even though 'mvn dependency:analyze' doesn't find it -->
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.8.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>1.8.5</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ <version>3.6.2.Final</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>11.0.2</version>
+ </dependency>
+ </dependencies>
+</project>
Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java Fri Jun 7 21:45:06 2013
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+
+/**
+ * A simple client that registers an RPC program with portmap.
+ */
+public class RegistrationClient extends SimpleTcpClient {
+ public static final Log LOG = LogFactory.getLog(RegistrationClient.class);
+
+ public RegistrationClient(String host, int port, XDR request) {
+ super(host, port, request);
+ }
+
+ /**
+ * Handler to handle response from the server.
+ */
+ static class RegistrationClientHandler extends SimpleTcpClientHandler {
+ public RegistrationClientHandler(XDR request) {
+ super(request);
+ }
+
+ private boolean validMessageLength(int len) {
+ // 28 bytes is the minimal success response size (portmapV2)
+ if (len < 28) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Portmap mapping registration failed,"
+ + " the response size is less than 28 bytes:" + len);
+ }
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+ ChannelBuffer buf = (ChannelBuffer) e.getMessage(); // Read reply
+ if (!validMessageLength(buf.readableBytes())) {
+ e.getChannel().close();
+ return;
+ }
+
+ // handling fragment header for TCP, 4 bytes.
+ byte[] fragmentHeader = Arrays.copyOfRange(buf.array(), 0, 4);
+ int fragmentSize = XDR.fragmentSize(fragmentHeader);
+ boolean isLast = XDR.isLastFragment(fragmentHeader);
+ assert (fragmentSize == 28 && isLast == true);
+
+ XDR xdr = new XDR();
+ xdr.writeFixedOpaque(Arrays.copyOfRange(buf.array(), 4,
+ buf.readableBytes()));
+
+ RpcReply reply = RpcReply.read(xdr);
+ if (reply.getState() == RpcReply.ReplyState.MSG_ACCEPTED) {
+ RpcAcceptedReply acceptedReply = (RpcAcceptedReply) reply;
+ handle(acceptedReply, xdr);
+ } else {
+ RpcDeniedReply deniedReply = (RpcDeniedReply) reply;
+ handle(deniedReply);
+ }
+ e.getChannel().close(); // shutdown now that request is complete
+ }
+
+ private void handle(RpcDeniedReply deniedReply) {
+ LOG.warn("Portmap mapping registration request was denied , " +
+ deniedReply);
+ }
+
+ private void handle(RpcAcceptedReply acceptedReply, XDR xdr) {
+ AcceptState acceptState = acceptedReply.getAcceptState();
+ assert (acceptState == AcceptState.SUCCESS);
+ boolean answer = xdr.readBoolean();
+ if (answer != true) {
+ LOG.warn("Portmap mapping registration failed, accept state:"
+ + acceptState);
+ }
+ LOG.info("Portmap mapping registration succeeded");
+ }
+ }
+}
Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAcceptedReply.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAcceptedReply.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAcceptedReply.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAcceptedReply.java Fri Jun 7 21:45:06 2013
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
+
+/**
+ * Represents RPC message MSG_ACCEPTED reply body. See RFC 1831 for details.
+ * This response is sent to a request to indicate success of the request.
+ */
+public class RpcAcceptedReply extends RpcReply {
+ public enum AcceptState {
+ SUCCESS(0), /* RPC executed successfully */
+ PROG_UNAVAIL(1), /* remote hasn't exported program */
+ PROG_MISMATCH(2), /* remote can't support version # */
+ PROC_UNAVAIL(3), /* program can't support procedure */
+ GARBAGE_ARGS(4), /* procedure can't decode params */
+ SYSTEM_ERR(5); /* e.g. memory allocation failure */
+
+ private final int value;
+
+ AcceptState(int value) {
+ this.value = value;
+ }
+
+ public static AcceptState fromValue(int value) {
+ return values()[value];
+ }
+
+ public int getValue() {
+ return value;
+ }
+ };
+
+ private final RpcAuthInfo verifier;
+ private final AcceptState acceptState;
+
+ RpcAcceptedReply(int xid, int messageType, ReplyState state,
+ RpcAuthInfo verifier, AcceptState acceptState) {
+ super(xid, messageType, state);
+ this.verifier = verifier;
+ this.acceptState = acceptState;
+ }
+
+ public static RpcAcceptedReply read(int xid, int messageType,
+ ReplyState replyState, XDR xdr) {
+ RpcAuthInfo verifier = RpcAuthInfo.read(xdr);
+ AcceptState acceptState = AcceptState.fromValue(xdr.readInt());
+ return new RpcAcceptedReply(xid, messageType, replyState, verifier,
+ acceptState);
+ }
+
+ public RpcAuthInfo getVerifier() {
+ return verifier;
+ }
+
+ public AcceptState getAcceptState() {
+ return acceptState;
+ }
+
+ public static XDR voidReply(XDR xdr, int xid) {
+ return voidReply(xdr, xid, AcceptState.SUCCESS);
+ }
+
+ public static XDR voidReply(XDR xdr, int xid, AcceptState acceptState) {
+ xdr.writeInt(xid);
+ xdr.writeInt(RpcMessage.RPC_REPLY);
+ xdr.writeInt(ReplyState.MSG_ACCEPTED.getValue());
+ xdr.writeInt(AuthFlavor.AUTH_NONE.getValue());
+ xdr.writeVariableOpaque(new byte[0]);
+ xdr.writeInt(acceptState.getValue());
+ return xdr;
+ }
+}
Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthInfo.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthInfo.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthInfo.java Fri Jun 7 21:45:06 2013
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.util.Arrays;
+
+/**
+ * Authentication Info as defined in RFC 1831
+ */
+public class RpcAuthInfo {
+ /** Different types of authentication as defined in RFC 1831 */
+ public enum AuthFlavor {
+ AUTH_NONE(0),
+ AUTH_SYS(1),
+ AUTH_SHORT(2),
+ AUTH_DH(3),
+ RPCSEC_GSS(6);
+
+ private int value;
+
+ AuthFlavor(int value) {
+ this.value = value;
+ }
+
+ public int getValue() {
+ return value;
+ }
+
+ static AuthFlavor fromValue(int value) {
+ for (AuthFlavor v : values()) {
+ if (v.value == value) {
+ return v;
+ }
+ }
+ throw new IllegalArgumentException("Invalid AuthFlavor value " + value);
+ }
+ }
+
+ private final AuthFlavor flavor;
+ private final byte[] body;
+
+ protected RpcAuthInfo(AuthFlavor flavor, byte[] body) {
+ this.flavor = flavor;
+ this.body = body;
+ }
+
+ public static RpcAuthInfo read(XDR xdr) {
+ int type = xdr.readInt();
+ AuthFlavor flavor = AuthFlavor.fromValue(type);
+ byte[] body = xdr.readVariableOpaque();
+ return new RpcAuthInfo(flavor, body);
+ }
+
+ public AuthFlavor getFlavor() {
+ return flavor;
+ }
+
+ public byte[] getBody() {
+ return Arrays.copyOf(body, body.length);
+ }
+
+ @Override
+ public String toString() {
+ return "(AuthFlavor:" + flavor + ")";
+ }
+}
Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthSys.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthSys.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthSys.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAuthSys.java Fri Jun 7 21:45:06 2013
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+/**
+ * AUTH_SYS as defined in RFC 1831
+ */
+public class RpcAuthSys {
+ private final int uid;
+ private final int gid;
+
+ public RpcAuthSys(int uid, int gid) {
+ this.uid = uid;
+ this.gid = gid;
+ }
+
+ public static RpcAuthSys from(byte[] credentials) {
+ XDR sys = new XDR(credentials);
+ sys.skip(4); // Stamp
+ sys.skipVariableOpaque(); // Machine name
+ return new RpcAuthSys(sys.readInt(), sys.readInt());
+ }
+
+ public int getUid() {
+ return uid;
+ }
+
+ public int getGid() {
+ return gid;
+ }
+
+ @Override
+ public String toString() {
+ return "(AuthSys: uid=" + uid + " gid=" + gid + ")";
+ }
+}
Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCall.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCall.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCall.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCall.java Fri Jun 7 21:45:06 2013
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Represents an RPC message of type RPC call as defined in RFC 1831
+ */
+public class RpcCall extends RpcMessage {
+ public static final int RPC_VERSION = 2;
+ private static final Log LOG = LogFactory.getLog(RpcCall.class);
+ private final int rpcVersion;
+ private final int program;
+ private final int version;
+ private final int procedure;
+ private final RpcAuthInfo credential;
+ private final RpcAuthInfo verifier;
+
+ protected RpcCall(int xid, int messageType, int rpcVersion, int program,
+ int version, int procedure, RpcAuthInfo credential, RpcAuthInfo verifier) {
+ super(xid, messageType);
+ this.rpcVersion = rpcVersion;
+ this.program = program;
+ this.version = version;
+ this.procedure = procedure;
+ this.credential = credential;
+ this.verifier = verifier;
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this);
+ }
+ validate();
+ }
+
+ private void validateRpcVersion() {
+ if (rpcVersion != RPC_VERSION) {
+ throw new IllegalArgumentException("RPC version is expected to be "
+ + RPC_VERSION + " but got " + rpcVersion);
+ }
+ }
+
+ public void validate() {
+ validateMessageType(RPC_CALL);
+ validateRpcVersion();
+ // Validate other members
+ // Throw exception if validation fails
+ }
+
+
+ public int getRpcVersion() {
+ return rpcVersion;
+ }
+
+ public int getProgram() {
+ return program;
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+ public int getProcedure() {
+ return procedure;
+ }
+
+ public RpcAuthInfo getCredential() {
+ return credential;
+ }
+
+ public RpcAuthInfo getVerifier() {
+ return verifier;
+ }
+
+ public static RpcCall read(XDR xdr) {
+ return new RpcCall(xdr.readInt(), xdr.readInt(), xdr.readInt(), xdr.readInt(),
+ xdr.readInt(), xdr.readInt(), RpcAuthInfo.read(xdr),
+ RpcAuthInfo.read(xdr));
+ }
+
+ public static void write(XDR out, int xid, int program, int progVersion,
+ int procedure) {
+ out.writeInt(xid);
+ out.writeInt(RpcMessage.RPC_CALL);
+ out.writeInt(2);
+ out.writeInt(program);
+ out.writeInt(progVersion);
+ out.writeInt(procedure);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Xid:%d, messageType:%d, rpcVersion:%d, program:%d,"
+ + " version:%d, procedure:%d, credential:%s, verifier:%s", xid,
+ messageType, rpcVersion, program, version, procedure,
+ credential.toString(), verifier.toString());
+ }
+}
Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java Fri Jun 7 21:45:06 2013
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.net.InetAddress;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This class is used for handling the duplicate <em>non-idempotenty</em> Rpc
+ * calls. A non-idempotent request is processed as follows:
+ * <ul>
+ * <li>If the request is being processed for the first time, its state is
+ * in-progress in cache.</li>
+ * <li>If the request is retransimitted and is in-progress state, it is ignored.
+ * </li>
+ * <li>If the request is retransimitted and is completed, the previous response
+ * from the cache is sent back to the client.</li>
+ * </ul>
+ * <br>
+ * A request is identified by the client ID (address of the client) and
+ * transaction ID (xid) from the Rpc call.
+ *
+ */
+public class RpcCallCache {
+
+ public static class CacheEntry {
+ private XDR response; // null if no response has been sent
+
+ public CacheEntry() {
+ response = null;
+ }
+
+ public boolean isInProgress() {
+ return response == null;
+ }
+
+ public boolean isCompleted() {
+ return response != null;
+ }
+
+ public XDR getResponse() {
+ return response;
+ }
+
+ public void setResponse(XDR response) {
+ this.response = response;
+ }
+ }
+
+ /**
+ * Call that is used to track a client in the {@link RpcCallCache}
+ */
+ public static class ClientRequest {
+ protected final InetAddress clientId;
+ protected final int xid;
+
+ public InetAddress getClientId() {
+ return clientId;
+ }
+
+ public ClientRequest(InetAddress clientId, int xid) {
+ this.clientId = clientId;
+ this.xid = xid;
+ }
+
+ @Override
+ public int hashCode() {
+ return xid + clientId.hashCode() * 31;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || !(obj instanceof ClientRequest)) {
+ return false;
+ }
+ ClientRequest other = (ClientRequest) obj;
+ return clientId.equals(other.clientId) && (xid == other.xid);
+ }
+ }
+
+ private final String program;
+
+ private final Map<ClientRequest, CacheEntry> map;
+
+ public RpcCallCache(final String program, final int maxEntries) {
+ if (maxEntries <= 0) {
+ throw new IllegalArgumentException("Cache size is " + maxEntries
+ + ". Should be > 0");
+ }
+ this.program = program;
+ map = new LinkedHashMap<ClientRequest, CacheEntry>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected boolean removeEldestEntry(
+ java.util.Map.Entry<ClientRequest, CacheEntry> eldest) {
+ return RpcCallCache.this.size() > maxEntries;
+ }
+ };
+ }
+
+ /** Return the program name */
+ public String getProgram() {
+ return program;
+ }
+
+ /** Mark a request as completed and add corresponding response to the cache */
+ public void callCompleted(InetAddress clientId, int xid, XDR response) {
+ ClientRequest req = new ClientRequest(clientId, xid);
+ CacheEntry e;
+ synchronized(map) {
+ e = map.get(req);
+ }
+ e.setResponse(response);
+ }
+
+ /**
+ * Check the cache for an entry. If it does not exist, add the request
+ * as in progress.
+ */
+ public CacheEntry checkOrAddToCache(InetAddress clientId, int xid) {
+ ClientRequest req = new ClientRequest(clientId, xid);
+ CacheEntry e;
+ synchronized(map) {
+ e = map.get(req);
+ if (e == null) {
+ // Add an inprogress cache entry
+ map.put(req, new CacheEntry());
+ }
+ }
+ return e;
+ }
+
+ /** Return number of cached entries */
+ public int size() {
+ return map.size();
+ }
+
+ /**
+ * Iterator to the cache entries
+ * @return iterator
+ */
+ @VisibleForTesting
+ public Iterator<Entry<ClientRequest, CacheEntry>> iterator() {
+ return map.entrySet().iterator();
+ }
+}
Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcDeniedReply.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcDeniedReply.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcDeniedReply.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcDeniedReply.java Fri Jun 7 21:45:06 2013
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
+
+/**
+ * Represents RPC message MSG_DENIED reply body. See RFC 1831 for details.
+ * This response is sent to a request to indicate failure of the request.
+ */
+public class RpcDeniedReply extends RpcReply {
+ public enum RejectState {
+ RPC_MISMATCH(0), AUTH_ERROR(1);
+
+ private final int value;
+
+ RejectState(int value) {
+ this.value = value;
+ }
+
+ int getValue() {
+ return value;
+ }
+
+ static RejectState fromValue(int value) {
+ return values()[value];
+ }
+ }
+
+ private final RejectState rejectState;
+
+ RpcDeniedReply(int xid, int messageType, ReplyState replyState,
+ RejectState rejectState) {
+ super(xid, messageType, replyState);
+ this.rejectState = rejectState;
+ }
+
+ public static RpcDeniedReply read(int xid, int messageType,
+ ReplyState replyState, XDR xdr) {
+ RejectState rejectState = RejectState.fromValue(xdr.readInt());
+ return new RpcDeniedReply(xid, messageType, replyState, rejectState);
+ }
+
+ public RejectState getRejectState() {
+ return rejectState;
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuffer().append("xid:").append(xid)
+ .append(",messageType:").append(messageType).append("rejectState:")
+ .append(rejectState).toString();
+ }
+
+ public static XDR voidReply(XDR xdr, int xid, ReplyState msgAccepted,
+ RejectState rejectState) {
+ xdr.writeInt(xid);
+ xdr.writeInt(RpcMessage.RPC_REPLY);
+ xdr.writeInt(msgAccepted.getValue());
+ xdr.writeInt(AuthFlavor.AUTH_NONE.getValue());
+ xdr.writeVariableOpaque(new byte[0]);
+ xdr.writeInt(rejectState.getValue());
+ return xdr;
+ }
+}
Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcFrameDecoder.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcFrameDecoder.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcFrameDecoder.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcFrameDecoder.java Fri Jun 7 21:45:06 2013
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+/**
+ * {@link FrameDecoder} for RPC messages.
+ */
+public class RpcFrameDecoder extends FrameDecoder {
+ public static final Log LOG = LogFactory.getLog(RpcFrameDecoder.class);
+ private ChannelBuffer frame;
+
+ /**
+ * Decode an RPC message received on the socket.
+ * @return mpnull if incomplete message is received.
+ */
+ @Override
+ protected Object decode(ChannelHandlerContext ctx, Channel channel,
+ ChannelBuffer buf) {
+
+ // Make sure if the length field was received.
+ if (buf.readableBytes() < 4) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Length field is not received yet");
+ }
+ return null;
+ }
+
+ // Note the index and go back to it when an incomplete message is received
+ buf.markReaderIndex();
+
+ // Read the record marking.
+ ChannelBuffer fragmentHeader = buf.readBytes(4);
+ int length = XDR.fragmentSize(fragmentHeader.array());
+ boolean isLast = XDR.isLastFragment(fragmentHeader.array());
+
+ // Make sure if there's enough bytes in the buffer.
+ if (buf.readableBytes() < length) {
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(length + " bytes are not received yet");
+ }
+ buf.resetReaderIndex(); // Go back to the right reader index
+ return null;
+ }
+
+ if (frame == null) {
+ frame = buf.readBytes(length);
+ } else {
+ ChannelBuffer tmp = ChannelBuffers.copiedBuffer(frame.array(), buf
+ .readBytes(length).array());
+ frame = tmp;
+ }
+
+ // Successfully decoded a frame. Return the decoded frame if the frame is
+ // the last one. Otherwise, wait for the next frame.
+ if (isLast) {
+ ChannelBuffer completeFrame = frame;
+ frame = null;
+ return completeFrame;
+ } else {
+ LOG.info("Wait for the next frame. This rarely happens.");
+ return null;
+ }
+ }
+}
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcMessage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcMessage.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcMessage.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcMessage.java Fri Jun 7 21:45:06 2013
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+/**
+ * Represent an RPC message as defined in RFC 1831.
+ */
+public abstract class RpcMessage {
+ public static final int RPC_CALL = 0;
+ public static final int RPC_REPLY = 1;
+
+ protected final int xid;
+ protected final int messageType;
+
+ RpcMessage(int xid, int messageType) {
+ if (messageType != RPC_CALL && messageType != RPC_REPLY) {
+ throw new IllegalArgumentException("Invalid message type " + messageType);
+ }
+ this.xid = xid;
+ this.messageType = messageType;
+ }
+
+ public int getXid() {
+ return xid;
+ }
+
+ public int getMessageType() {
+ return messageType;
+ }
+
+ protected void validateMessageType(int expected) {
+ if (expected != messageType) {
+ throw new IllegalArgumentException("Message type is expected to be "
+ + expected + " but got " + messageType);
+ }
+ }
+}
Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java Fri Jun 7 21:45:06 2013
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.io.IOException;
+import java.net.InetAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
+import org.apache.hadoop.oncrpc.RpcCallCache.CacheEntry;
+import org.apache.hadoop.portmap.PortmapMapping;
+import org.apache.hadoop.portmap.PortmapRequest;
+import org.jboss.netty.channel.Channel;
+
+/**
+ * Class for writing RPC server programs based on RFC 1050. Extend this class
+ * and implement {@link #handleInternal} to handle the requests received.
+ */
+public abstract class RpcProgram {
+ private static final Log LOG = LogFactory.getLog(RpcProgram.class);
+ public static final int RPCB_PORT = 111;
+ private final String program;
+ private final String host;
+ private final int port;
+ private final int progNumber;
+ private final int lowProgVersion;
+ private final int highProgVersion;
+ private final RpcCallCache rpcCallCache;
+
+ /**
+ * Constructor
+ *
+ * @param program program name
+ * @param host host where the Rpc server program is started
+ * @param port port where the Rpc server program is listening to
+ * @param progNumber program number as defined in RFC 1050
+ * @param lowProgVersion lowest version of the specification supported
+ * @param highProgVersion highest version of the specification supported
+ * @param cacheSize size of cache to handle duplciate requests. Size <= 0
+ * indicates no cache.
+ */
+ protected RpcProgram(String program, String host, int port, int progNumber,
+ int lowProgVersion, int highProgVersion, int cacheSize) {
+ this.program = program;
+ this.host = host;
+ this.port = port;
+ this.progNumber = progNumber;
+ this.lowProgVersion = lowProgVersion;
+ this.highProgVersion = highProgVersion;
+ this.rpcCallCache = cacheSize > 0 ? new RpcCallCache(program, cacheSize)
+ : null;
+ }
+
+ /**
+ * Register this program with the local portmapper.
+ */
+ public void register(int transport) {
+ // Register all the program versions with portmapper for a given transport
+ for (int vers = lowProgVersion; vers <= highProgVersion; vers++) {
+ register(vers, transport);
+ }
+ }
+
+ /**
+ * Register this program with the local portmapper.
+ */
+ private void register(int progVersion, int transport) {
+ PortmapMapping mapEntry = new PortmapMapping(progNumber, progVersion,
+ transport, port);
+ register(mapEntry);
+ }
+
+ /**
+ * Register the program with Portmap or Rpcbind
+ */
+ protected void register(PortmapMapping mapEntry) {
+ XDR mappingRequest = PortmapRequest.create(mapEntry);
+ SimpleUdpClient registrationClient = new SimpleUdpClient(host, RPCB_PORT,
+ mappingRequest);
+ try {
+ registrationClient.run();
+ } catch (IOException e) {
+ LOG.error("Registration failure with " + host + ":" + port
+ + ", portmap entry: " + mapEntry);
+ throw new RuntimeException("Registration failure");
+ }
+ }
+
+ /**
+ * Handle an RPC request.
+ * @param rpcCall RPC call that is received
+ * @param in xdr with cursor at reading the remaining bytes of a method call
+ * @param out xdr output corresponding to Rpc reply
+ * @param client making the Rpc request
+ * @param channel connection over which Rpc request is received
+ * @return response xdr response
+ */
+ protected abstract XDR handleInternal(RpcCall rpcCall, XDR in, XDR out,
+ InetAddress client, Channel channel);
+
+ public XDR handle(XDR xdr, InetAddress client, Channel channel) {
+ XDR out = new XDR();
+ RpcCall rpcCall = RpcCall.read(xdr);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(program + " procedure #" + rpcCall.getProcedure());
+ }
+
+ if (!checkProgram(rpcCall.getProgram())) {
+ return programMismatch(out, rpcCall);
+ }
+
+ if (!checkProgramVersion(rpcCall.getVersion())) {
+ return programVersionMismatch(out, rpcCall);
+ }
+
+ // Check for duplicate requests in the cache for non-idempotent requests
+ boolean idempotent = rpcCallCache != null && !isIdempotent(rpcCall);
+ if (idempotent) {
+ CacheEntry entry = rpcCallCache.checkOrAddToCache(client, rpcCall.getXid());
+ if (entry != null) { // in ache
+ if (entry.isCompleted()) {
+ LOG.info("Sending the cached reply to retransmitted request "
+ + rpcCall.getXid());
+ return entry.getResponse();
+ } else { // else request is in progress
+ LOG.info("Retransmitted request, transaction still in progress "
+ + rpcCall.getXid());
+ // TODO: ignore the request?
+ }
+ }
+ }
+
+ XDR response = handleInternal(rpcCall, xdr, out, client, channel);
+ if (response.size() == 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No sync response, expect an async response for request XID="
+ + rpcCall.getXid());
+ }
+ }
+
+ // Add the request to the cache
+ if (idempotent) {
+ rpcCallCache.callCompleted(client, rpcCall.getXid(), response);
+ }
+ return response;
+ }
+
+ private XDR programMismatch(XDR out, RpcCall call) {
+ LOG.warn("Invalid RPC call program " + call.getProgram());
+ RpcAcceptedReply.voidReply(out, call.getXid(), AcceptState.PROG_UNAVAIL);
+ return out;
+ }
+
+ private XDR programVersionMismatch(XDR out, RpcCall call) {
+ LOG.warn("Invalid RPC call version " + call.getVersion());
+ RpcAcceptedReply.voidReply(out, call.getXid(), AcceptState.PROG_MISMATCH);
+ out.writeInt(lowProgVersion);
+ out.writeInt(highProgVersion);
+ return out;
+ }
+
+ private boolean checkProgram(int progNumber) {
+ return this.progNumber == progNumber;
+ }
+
+ /** Return true if a the program version in rpcCall is supported */
+ private boolean checkProgramVersion(int programVersion) {
+ return programVersion >= lowProgVersion
+ && programVersion <= highProgVersion;
+ }
+
+ @Override
+ public String toString() {
+ return "Rpc program: " + program + " at " + host + ":" + port;
+ }
+
+ protected abstract boolean isIdempotent(RpcCall call);
+
+ public int getPort() {
+ return port;
+ }
+}
Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcReply.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcReply.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcReply.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcReply.java Fri Jun 7 21:45:06 2013
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+/**
+ * Represents an RPC message of type RPC reply as defined in RFC 1831
+ */
+public abstract class RpcReply extends RpcMessage {
+ /** RPC reply_stat as defined in RFC 1831 */
+ public enum ReplyState {
+ MSG_ACCEPTED(0),
+ MSG_DENIED(1);
+
+ private final int value;
+ ReplyState(int value) {
+ this.value = value;
+ }
+
+ int getValue() {
+ return value;
+ }
+
+ public static ReplyState fromValue(int value) {
+ return values()[value];
+ }
+ }
+
+ private final ReplyState state;
+
+ RpcReply(int xid, int messageType, ReplyState state) {
+ super(xid, messageType);
+ this.state = state;
+ validateMessageType(RPC_REPLY);
+ }
+
+ public static RpcReply read(XDR xdr) {
+ int xid = xdr.readInt();
+ int messageType = xdr.readInt();
+ ReplyState stat = ReplyState.fromValue(xdr.readInt());
+ switch (stat) {
+ case MSG_ACCEPTED:
+ return RpcAcceptedReply.read(xid, messageType, stat, xdr);
+ case MSG_DENIED:
+ return RpcDeniedReply.read(xid, messageType, stat, xdr);
+ }
+ return null;
+ }
+
+ public ReplyState getState() {
+ return state;
+ }
+}
Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java Fri Jun 7 21:45:06 2013
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+/**
+ * The XID in RPC call. It is used for starting with new seed after each reboot.
+ */
+public class RpcUtil {
+ private static int xid = (int) (System.currentTimeMillis() / 1000) << 12;
+
+ public static int getNewXid(String caller) {
+ return xid = ++xid + caller.hashCode();
+ }
+}
Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java Fri Jun 7 21:45:06 2013
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+
+import org.apache.hadoop.oncrpc.RpcFrameDecoder;
+import org.apache.hadoop.oncrpc.XDR;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+
+/**
+ * A simple TCP based RPC client which just sends a request to a server.
+ */
+public class SimpleTcpClient {
+ protected final String host;
+ protected final int port;
+ protected final XDR request;
+ protected ChannelPipelineFactory pipelineFactory;
+ protected final boolean oneShot;
+
+ public SimpleTcpClient(String host, int port, XDR request) {
+ this(host,port, request, true);
+ }
+
+ public SimpleTcpClient(String host, int port, XDR request, Boolean oneShot) {
+ this.host = host;
+ this.port = port;
+ this.request = request;
+ this.oneShot = oneShot;
+ }
+
+ protected ChannelPipelineFactory setPipelineFactory() {
+ this.pipelineFactory = new ChannelPipelineFactory() {
+ @Override
+ public ChannelPipeline getPipeline() {
+ return Channels.pipeline(new RpcFrameDecoder(),
+ new SimpleTcpClientHandler(request));
+ }
+ };
+ return this.pipelineFactory;
+ }
+
+ public void run() {
+ // Configure the client.
+ ChannelFactory factory = new NioClientSocketChannelFactory(
+ Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), 1, 1);
+ ClientBootstrap bootstrap = new ClientBootstrap(factory);
+
+ // Set up the pipeline factory.
+ bootstrap.setPipelineFactory(setPipelineFactory());
+
+ bootstrap.setOption("tcpNoDelay", true);
+ bootstrap.setOption("keepAlive", true);
+
+ // Start the connection attempt.
+ ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
+
+ if (oneShot) {
+ // Wait until the connection is closed or the connection attempt fails.
+ future.getChannel().getCloseFuture().awaitUninterruptibly();
+
+ // Shut down thread pools to exit.
+ bootstrap.releaseExternalResources();
+ }
+ }
+}
Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java Fri Jun 7 21:45:06 2013
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+
+/**
+ * A simple TCP based RPC client handler used by {@link SimpleTcpServer}.
+ */
+public class SimpleTcpClientHandler extends SimpleChannelHandler {
+ public static final Log LOG = LogFactory.getLog(SimpleTcpClient.class);
+ protected final XDR request;
+
+ public SimpleTcpClientHandler(XDR request) {
+ this.request = request;
+ }
+
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
+ // Send the request
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("sending PRC request");
+ }
+ ChannelBuffer outBuf = XDR.writeMessageTcp(request, true);
+ e.getChannel().write(outBuf);
+ }
+
+ /**
+ * Shutdown connection by default. Subclass can override this method to do
+ * more interaction with the server.
+ */
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+ e.getChannel().close();
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ LOG.warn("Unexpected exception from downstream: ", e.getCause());
+ e.getChannel().close();
+ }
+}
Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java Fri Jun 7 21:45:06 2013
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+
+/**
+ * Simple UDP server implemented using netty.
+ */
+public class SimpleTcpServer {
+ public static final Log LOG = LogFactory.getLog(SimpleTcpServer.class);
+ protected final int port;
+ protected final ChannelPipelineFactory pipelineFactory;
+ protected final RpcProgram rpcProgram;
+
+ /** The maximum number of I/O worker threads */
+ protected final int workerCount;
+
+ /**
+ * @param port TCP port where to start the server at
+ * @param program RPC program corresponding to the server
+ * @param workercount Number of worker threads
+ */
+ public SimpleTcpServer(int port, RpcProgram program, int workercount) {
+ this.port = port;
+ this.rpcProgram = program;
+ this.workerCount = workercount;
+ this.pipelineFactory = getPipelineFactory();
+ }
+
+ public ChannelPipelineFactory getPipelineFactory() {
+ return new ChannelPipelineFactory() {
+ @Override
+ public ChannelPipeline getPipeline() {
+ return Channels.pipeline(new RpcFrameDecoder(),
+ new SimpleTcpServerHandler(rpcProgram));
+ }
+ };
+ }
+
+ public void run() {
+ // Configure the Server.
+ ChannelFactory factory;
+ if (workerCount == 0) {
+ // Use default workers: 2 * the number of available processors
+ factory = new NioServerSocketChannelFactory(
+ Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+ } else {
+ factory = new NioServerSocketChannelFactory(
+ Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
+ workerCount);
+ }
+
+ ServerBootstrap bootstrap = new ServerBootstrap(factory);
+ bootstrap.setPipelineFactory(pipelineFactory);
+ bootstrap.setOption("child.tcpNoDelay", true);
+ bootstrap.setOption("child.keepAlive", true);
+
+ // Listen to TCP port
+ bootstrap.bind(new InetSocketAddress(port));
+
+ LOG.info("Started listening to TCP requests at port " + port + " for "
+ + rpcProgram + " with workerCount " + workerCount);
+ }
+}
Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java Fri Jun 7 21:45:06 2013
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+
+/**
+ * Handler used by {@link SimpleTcpServer}.
+ */
+public class SimpleTcpServerHandler extends SimpleChannelHandler {
+ public static final Log LOG = LogFactory.getLog(SimpleTcpServerHandler.class);
+
+ protected final RpcProgram rpcProgram;
+
+ public SimpleTcpServerHandler(RpcProgram rpcProgram) {
+ this.rpcProgram = rpcProgram;
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+ ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+ XDR request = new XDR(buf.array());
+
+ InetAddress remoteInetAddr = ((InetSocketAddress) ctx.getChannel()
+ .getRemoteAddress()).getAddress();
+ Channel outChannel = e.getChannel();
+ XDR response = rpcProgram.handle(request, remoteInetAddr, outChannel);
+ if (response.size() > 0) {
+ outChannel.write(XDR.writeMessageTcp(response, true));
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ LOG.warn("Encountered ", e.getCause());
+ e.getChannel().close();
+ }
+}
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java Fri Jun 7 21:45:06 2013
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.util.Arrays;
+
+/**
+ * A simple UDP based RPC client which just sends one request to a server.
+ */
+public class SimpleUdpClient {
+ protected final String host;
+ protected final int port;
+ protected final XDR request;
+ protected final boolean oneShot;
+
+ public SimpleUdpClient(String host, int port, XDR request) {
+ this(host, port, request, true);
+ }
+
+ public SimpleUdpClient(String host, int port, XDR request, Boolean oneShot) {
+ this.host = host;
+ this.port = port;
+ this.request = request;
+ this.oneShot = oneShot;
+ }
+
+ public void run() throws IOException {
+ DatagramSocket clientSocket = new DatagramSocket();
+ InetAddress IPAddress = InetAddress.getByName(host);
+ byte[] sendData = request.getBytes();
+ byte[] receiveData = new byte[65535];
+
+ DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length,
+ IPAddress, port);
+ clientSocket.send(sendPacket);
+ DatagramPacket receivePacket = new DatagramPacket(receiveData,
+ receiveData.length);
+ clientSocket.receive(receivePacket);
+
+ // Check reply status
+ XDR xdr = new XDR();
+ xdr.writeFixedOpaque(Arrays.copyOfRange(receiveData, 0,
+ receivePacket.getLength()));
+ RpcReply reply = RpcReply.read(xdr);
+ if (reply.getState() != RpcReply.ReplyState.MSG_ACCEPTED) {
+ throw new IOException("Request failed: " + reply.getState());
+ }
+
+ clientSocket.close();
+ }
+}
Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java Fri Jun 7 21:45:06 2013
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.DatagramChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
+
+/**
+ * Simple UDP server implemented based on netty.
+ */
+public class SimpleUdpServer {
+ public static final Log LOG = LogFactory.getLog(SimpleUdpServer.class);
+ private final int SEND_BUFFER_SIZE = 65536;
+ private final int RECEIVE_BUFFER_SIZE = 65536;
+
+ protected final int port;
+ protected final ChannelPipelineFactory pipelineFactory;
+ protected final RpcProgram rpcProgram;
+ protected final int workerCount;
+
+ public SimpleUdpServer(int port, RpcProgram program, int workerCount) {
+ this.port = port;
+ this.rpcProgram = program;
+ this.workerCount = workerCount;
+ this.pipelineFactory = new ChannelPipelineFactory() {
+ @Override
+ public ChannelPipeline getPipeline() {
+ return Channels.pipeline(new SimpleUdpServerHandler(rpcProgram));
+ }
+ };
+ }
+
+ public void run() {
+ // Configure the client.
+ DatagramChannelFactory f = new NioDatagramChannelFactory(
+ Executors.newCachedThreadPool(), workerCount);
+
+ ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);
+ ChannelPipeline p = b.getPipeline();
+ p.addLast("handler", new SimpleUdpServerHandler(rpcProgram));
+
+ b.setOption("broadcast", "false");
+ b.setOption("sendBufferSize", SEND_BUFFER_SIZE);
+ b.setOption("receiveBufferSize", RECEIVE_BUFFER_SIZE);
+
+ // Listen to the UDP port
+ b.bind(new InetSocketAddress(port));
+
+ LOG.info("Started listening to UDP requests at port " + port + " for "
+ + rpcProgram + " with workerCount " + workerCount);
+ }
+}
Added: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java?rev=1490845&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java Fri Jun 7 21:45:06 2013
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.oncrpc;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+
+/**
+ * Handler used by {@link SimpleUdpServer}.
+ */
+public class SimpleUdpServerHandler extends SimpleChannelHandler {
+ public static final Log LOG = LogFactory.getLog(SimpleUdpServerHandler.class);
+ private final RpcProgram rpcProgram;
+
+ public SimpleUdpServerHandler(RpcProgram rpcProgram) {
+ this.rpcProgram = rpcProgram;
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+ ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+
+ XDR request = new XDR();
+
+ request.writeFixedOpaque(buf.array());
+ InetAddress remoteInetAddr = ((InetSocketAddress) e.getRemoteAddress())
+ .getAddress();
+ XDR response = rpcProgram.handle(request, remoteInetAddr, null);
+ e.getChannel().write(XDR.writeMessageUdp(response), e.getRemoteAddress());
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ LOG.warn("Encountered ", e.getCause());
+ e.getChannel().close();
+ }
+}
\ No newline at end of file