You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2013/07/22 06:11:38 UTC
svn commit: r1505588 - in
/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common:
./ src/main/java/org/apache/hadoop/ipc/ src/test/java/org/apache/hadoop/ipc/
Author: suresh
Date: Mon Jul 22 04:11:38 2013
New Revision: 1505588
URL: http://svn.apache.org/r1505588
Log:
HADOOP-9691. Merge 1505042 from branch-2
Modified:
hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/CHANGES.txt
hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java
hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1505588&r1=1505587&r2=1505588&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/CHANGES.txt Mon Jul 22 04:11:38 2013
@@ -144,6 +144,9 @@ Release 2.1.0-beta - 2013-07-02
HADOOP-9676. Make maximum RPC buffer size configurable (Colin Patrick
McCabe)
+ HADOOP-9691. RPC clients can generate call ID using AtomicInteger instead of
+ synchronizing on the Client instance. (cnauroth)
+
HADOOP-9661. Allow metrics sources to be extended. (sandyr via tucu)
HADOOP-9370. Write FSWrapper class to wrap FileSystem and FileContext for
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1505588&r1=1505587&r2=1505588&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Mon Jul 22 04:11:38 2013
@@ -45,6 +45,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.SocketFactory;
@@ -105,7 +106,7 @@ public class Client {
new Hashtable<ConnectionId, Connection>();
private Class<? extends Writable> valueClass; // class of call values
- private int counter; // counter for call ids
+ private final AtomicInteger counter = new AtomicInteger(); // call ID sequence
private AtomicBoolean running = new AtomicBoolean(true); // if client runs
final private Configuration conf;
@@ -218,9 +219,7 @@ public class Client {
protected Call(RPC.RpcKind rpcKind, Writable param) {
this.rpcKind = rpcKind;
this.rpcRequest = param;
- synchronized (Client.this) {
- this.id = counter++;
- }
+ this.id = nextCallId();
}
/** Indicate when the call is complete and the
@@ -1566,4 +1565,18 @@ public class Client {
return serverPrincipal + "@" + address;
}
}
+
+ /**
+ * Returns the next valid sequential call ID by incrementing an atomic counter
+ * and masking off the sign bit. Valid call IDs are non-negative integers in
+ * the range [ 0, 2^31 - 1 ]. Negative numbers are reserved for special
+ * purposes. The values can overflow back to 0 and be reused. Note that prior
+ * versions of the client did not mask off the sign bit, so a server may still
+ * see a negative call ID if it receives connections from an old client.
+ *
+ * @return int next valid call ID
+ */
+ private int nextCallId() {
+ return counter.getAndIncrement() & 0x7FFFFFFF;
+ }
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java?rev=1505588&r1=1505587&r2=1505588&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java Mon Jul 22 04:11:38 2013
@@ -31,6 +31,7 @@ public class RpcConstants {
public static final byte[] DUMMY_CLIENT_ID = new byte[0];
+ public static final int INVALID_CALL_ID = -2;
/**
* The first four bytes of Hadoop RPC connections
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1505588&r1=1505587&r2=1505588&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Mon Jul 22 04:11:38 2013
@@ -268,6 +268,18 @@ public abstract class Server {
*/
private static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
+ /**
+ * Returns the currently active RPC call's sequential ID number. A negative
+ * call ID indicates an invalid value, such as if there is no currently active
+ * RPC call.
+ *
+ * @return int sequential ID number of currently active RPC call
+ */
+ public static int getCallId() {
+ Call call = CurCall.get();
+ return call != null ? call.callId : RpcConstants.INVALID_CALL_ID;
+ }
+
/** Returns the remote side ip address when invoked inside an RPC
* Returns null incase of an error.
*/
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java?rev=1505588&r1=1505587&r2=1505588&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java Mon Jul 22 04:11:38 2013
@@ -30,6 +30,9 @@ import org.apache.hadoop.util.StringUtil
import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.net.NetUtils;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.Random;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
@@ -83,6 +86,10 @@ public class TestIPC {
private static final File FD_DIR = new File("/proc/self/fd");
private static class TestServer extends Server {
+ // Tests can set callListener to run a piece of code each time the server
+ // receives a call. This code executes on the server thread, so it has
+ // visibility of that thread's thread-local storage.
+ private Runnable callListener;
private boolean sleep;
private Class<? extends Writable> responseClass;
@@ -108,6 +115,9 @@ public class TestIPC {
Thread.sleep(RANDOM.nextInt(PING_INTERVAL) + MIN_SLEEP_TIME);
} catch (InterruptedException e) {}
}
+ if (callListener != null) {
+ callListener.run();
+ }
if (responseClass != null) {
try {
return responseClass.newInstance();
@@ -627,6 +637,57 @@ public class TestIPC {
assertRetriesOnSocketTimeouts(conf, 4);
}
+ /**
+ * Tests that client generates a unique sequential call ID for each RPC call,
+ * even if multiple threads are using the same client.
+ */
+ @Test
+ public void testUniqueSequentialCallIds() throws Exception {
+ int serverThreads = 10, callerCount = 100, perCallerCallCount = 100;
+ TestServer server = new TestServer(serverThreads, false);
+
+ // Attach a listener that tracks every call ID received by the server. This
+ // list must be synchronized, because multiple server threads will add to it.
+ final List<Integer> callIds = Collections.synchronizedList(
+ new ArrayList<Integer>());
+ server.callListener = new Runnable() {
+ @Override
+ public void run() {
+ callIds.add(Server.getCallId());
+ }
+ };
+
+ Client client = new Client(LongWritable.class, conf);
+
+ try {
+ InetSocketAddress addr = NetUtils.getConnectAddress(server);
+ server.start();
+ SerialCaller[] callers = new SerialCaller[callerCount];
+ for (int i = 0; i < callerCount; ++i) {
+ callers[i] = new SerialCaller(client, addr, perCallerCallCount);
+ callers[i].start();
+ }
+ for (int i = 0; i < callerCount; ++i) {
+ callers[i].join();
+ assertFalse(callers[i].failed);
+ }
+ } finally {
+ client.stop();
+ server.stop();
+ }
+
+ int expectedCallCount = callerCount * perCallerCallCount;
+ assertEquals(expectedCallCount, callIds.size());
+
+ // It is not guaranteed that the server executes requests in sequential order
+ // of client call ID, so we must sort the call IDs before checking that it
+ // contains every expected value.
+ Collections.sort(callIds);
+ for (int i = 0; i < expectedCallCount; ++i) {
+ assertEquals(i, callIds.get(i).intValue());
+ }
+ }
+
private void assertRetriesOnSocketTimeouts(Configuration conf,
int maxTimeoutRetries) throws IOException, InterruptedException {
SocketFactory mockFactory = Mockito.mock(SocketFactory.class);