You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by at...@apache.org on 2014/04/25 02:19:34 UTC
svn commit: r1589915 - in
/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src:
main/java/org/apache/hadoop/oncrpc/ test/java/org/apache/hadoop/oncrpc/
Author: atm
Date: Fri Apr 25 00:19:34 2014
New Revision: 1589915
URL: http://svn.apache.org/r1589915
Log:
HDFS-6281. Provide option to use the NFS Gateway without having to use the Hadoop portmapper. Contributed by Aaron T. Myers.
Modified:
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java?rev=1589915&r1=1589914&r2=1589915&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java Fri Apr 25 00:19:34 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.oncrpc;
import java.io.IOException;
+import java.net.DatagramSocket;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -46,6 +47,12 @@ public abstract class RpcProgram extends
private final int highProgVersion;
/**
+ * If not null, this will be used as the socket to use to connect to the
+ * system portmap daemon when registering this RPC server program.
+ */
+ private final DatagramSocket registrationSocket;
+
+ /**
* Constructor
*
* @param program program name
@@ -56,13 +63,15 @@ public abstract class RpcProgram extends
* @param highProgVersion highest version of the specification supported
*/
protected RpcProgram(String program, String host, int port, int progNumber,
- int lowProgVersion, int highProgVersion) {
+ int lowProgVersion, int highProgVersion,
+ DatagramSocket registrationSocket) {
this.program = program;
this.host = host;
this.port = port;
this.progNumber = progNumber;
this.lowProgVersion = lowProgVersion;
this.highProgVersion = highProgVersion;
+ this.registrationSocket = registrationSocket;
}
/**
@@ -105,14 +114,14 @@ public abstract class RpcProgram extends
protected void register(PortmapMapping mapEntry, boolean set) {
XDR mappingRequest = PortmapRequest.create(mapEntry, set);
SimpleUdpClient registrationClient = new SimpleUdpClient(host, RPCB_PORT,
- mappingRequest);
+ mappingRequest, registrationSocket);
try {
registrationClient.run();
} catch (IOException e) {
String request = set ? "Registration" : "Unregistration";
LOG.error(request + " failure with " + host + ":" + port
- + ", portmap entry: " + mapEntry);
- throw new RuntimeException(request + " failure");
+ + ", portmap entry: " + mapEntry, e);
+ throw new RuntimeException(request + " failure", e);
}
}
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java?rev=1589915&r1=1589914&r2=1589915&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java Fri Apr 25 00:19:34 2014
@@ -27,43 +27,56 @@ import java.util.Arrays;
* A simple UDP based RPC client which just sends one request to a server.
*/
public class SimpleUdpClient {
+
protected final String host;
protected final int port;
protected final XDR request;
protected final boolean oneShot;
+ protected final DatagramSocket clientSocket;
- public SimpleUdpClient(String host, int port, XDR request) {
- this(host, port, request, true);
+ public SimpleUdpClient(String host, int port, XDR request,
+ DatagramSocket clientSocket) {
+ this(host, port, request, true, clientSocket);
}
- public SimpleUdpClient(String host, int port, XDR request, Boolean oneShot) {
+ public SimpleUdpClient(String host, int port, XDR request, Boolean oneShot,
+ DatagramSocket clientSocket) {
this.host = host;
this.port = port;
this.request = request;
this.oneShot = oneShot;
+ this.clientSocket = clientSocket;
}
public void run() throws IOException {
- DatagramSocket clientSocket = new DatagramSocket();
InetAddress IPAddress = InetAddress.getByName(host);
byte[] sendData = request.getBytes();
byte[] receiveData = new byte[65535];
-
- DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length,
- IPAddress, port);
- clientSocket.send(sendPacket);
- DatagramPacket receivePacket = new DatagramPacket(receiveData,
- receiveData.length);
- clientSocket.receive(receivePacket);
-
- // Check reply status
- XDR xdr = new XDR(Arrays.copyOfRange(receiveData, 0,
- receivePacket.getLength()));
- RpcReply reply = RpcReply.read(xdr);
- if (reply.getState() != RpcReply.ReplyState.MSG_ACCEPTED) {
- throw new IOException("Request failed: " + reply.getState());
+ // Use the provided socket if there is one, else just make a new one.
+ DatagramSocket socket = this.clientSocket == null ?
+ new DatagramSocket() : this.clientSocket;
+
+ try {
+ DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length,
+ IPAddress, port);
+ socket.send(sendPacket);
+ DatagramPacket receivePacket = new DatagramPacket(receiveData,
+ receiveData.length);
+ socket.receive(receivePacket);
+
+ // Check reply status
+ XDR xdr = new XDR(Arrays.copyOfRange(receiveData, 0,
+ receivePacket.getLength()));
+ RpcReply reply = RpcReply.read(xdr);
+ if (reply.getState() != RpcReply.ReplyState.MSG_ACCEPTED) {
+ throw new IOException("Request failed: " + reply.getState());
+ }
+ } finally {
+ // If the client socket was passed in to this UDP client, it's on the
+ // caller of this UDP client to close that socket.
+ if (this.clientSocket == null) {
+ socket.close();
+ }
}
-
- clientSocket.close();
}
}
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java?rev=1589915&r1=1589914&r2=1589915&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java Fri Apr 25 00:19:34 2014
@@ -51,7 +51,8 @@ public class TestFrameDecoder {
protected TestRpcProgram(String program, String host, int port,
int progNumber, int lowProgVersion, int highProgVersion) {
- super(program, host, port, progNumber, lowProgVersion, highProgVersion);
+ super(program, host, port, progNumber, lowProgVersion, highProgVersion,
+ null);
}
@Override