You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2013/07/22 06:11:38 UTC

svn commit: r1505588 - in /hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/ipc/ src/test/java/org/apache/hadoop/ipc/

Author: suresh
Date: Mon Jul 22 04:11:38 2013
New Revision: 1505588

URL: http://svn.apache.org/r1505588
Log:
HADOOP-9691. Merge 1505042 from branch-2

Modified:
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1505588&r1=1505587&r2=1505588&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/CHANGES.txt Mon Jul 22 04:11:38 2013
@@ -144,6 +144,9 @@ Release 2.1.0-beta - 2013-07-02
     HADOOP-9676.  Make maximum RPC buffer size configurable (Colin Patrick
     McCabe)
 
+    HADOOP-9691. RPC clients can generate call ID using AtomicInteger instead of
+    synchronizing on the Client instance. (cnauroth)
+
     HADOOP-9661. Allow metrics sources to be extended. (sandyr via tucu)
 
     HADOOP-9370.  Write FSWrapper class to wrap FileSystem and FileContext for

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1505588&r1=1505587&r2=1505588&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Mon Jul 22 04:11:38 2013
@@ -45,6 +45,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.net.SocketFactory;
@@ -105,7 +106,7 @@ public class Client {
     new Hashtable<ConnectionId, Connection>();
 
   private Class<? extends Writable> valueClass;   // class of call values
-  private int counter;                            // counter for call ids
+  private final AtomicInteger counter = new AtomicInteger(); // call ID sequence
   private AtomicBoolean running = new AtomicBoolean(true); // if client runs
   final private Configuration conf;
 
@@ -218,9 +219,7 @@ public class Client {
     protected Call(RPC.RpcKind rpcKind, Writable param) {
       this.rpcKind = rpcKind;
       this.rpcRequest = param;
-      synchronized (Client.this) {
-        this.id = counter++;
-      }
+      this.id = nextCallId();
     }
 
     /** Indicate when the call is complete and the
@@ -1566,4 +1565,18 @@ public class Client {
       return serverPrincipal + "@" + address;
     }
   }  
+
+  /**
+   * Returns the next valid sequential call ID by incrementing an atomic counter
+   * and masking off the sign bit.  Valid call IDs are non-negative integers in
+   * the range [ 0, 2^31 - 1 ].  Negative numbers are reserved for special
+   * purposes.  The values can overflow back to 0 and be reused.  Note that prior
+   * versions of the client did not mask off the sign bit, so a server may still
+   * see a negative call ID if it receives connections from an old client.
+   * 
+   * @return int next valid call ID
+   */
+  private int nextCallId() {
+    return counter.getAndIncrement() & 0x7FFFFFFF;
+  }
 }

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java?rev=1505588&r1=1505587&r2=1505588&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java Mon Jul 22 04:11:38 2013
@@ -31,6 +31,7 @@ public class RpcConstants {
   
   public static final byte[] DUMMY_CLIENT_ID = new byte[0];
   
+  public static final int INVALID_CALL_ID = -2;
   
   /**
    * The first four bytes of Hadoop RPC connections

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1505588&r1=1505587&r2=1505588&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Mon Jul 22 04:11:38 2013
@@ -268,6 +268,18 @@ public abstract class Server {
    */
   private static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
   
+  /**
+   * Returns the currently active RPC call's sequential ID number.  A negative
+   * call ID indicates an invalid value, such as if there is no currently active
+   * RPC call.
+   * 
+   * @return int sequential ID number of currently active RPC call
+   */
+  public static int getCallId() {
+    Call call = CurCall.get();
+    return call != null ? call.callId : RpcConstants.INVALID_CALL_ID;
+  }
+
   /** Returns the remote side ip address when invoked inside an RPC 
    *  Returns null incase of an error.
    */

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java?rev=1505588&r1=1505587&r2=1505588&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java Mon Jul 22 04:11:38 2013
@@ -30,6 +30,9 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.hadoop.net.ConnectTimeoutException;
 import org.apache.hadoop.net.NetUtils;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.Random;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
@@ -83,6 +86,10 @@ public class TestIPC {
   private static final File FD_DIR = new File("/proc/self/fd");
 
   private static class TestServer extends Server {
+    // Tests can set callListener to run a piece of code each time the server
+    // receives a call.  This code executes on the server thread, so it has
+    // visibility of that thread's thread-local storage.
+    private Runnable callListener;
     private boolean sleep;
     private Class<? extends Writable> responseClass;
 
@@ -108,6 +115,9 @@ public class TestIPC {
           Thread.sleep(RANDOM.nextInt(PING_INTERVAL) + MIN_SLEEP_TIME);
         } catch (InterruptedException e) {}
       }
+      if (callListener != null) {
+        callListener.run();
+      }
       if (responseClass != null) {
         try {
           return responseClass.newInstance();
@@ -627,6 +637,57 @@ public class TestIPC {
     assertRetriesOnSocketTimeouts(conf, 4);
   }
 
+  /**
+   * Tests that client generates a unique sequential call ID for each RPC call,
+   * even if multiple threads are using the same client.
+   */
+  @Test
+  public void testUniqueSequentialCallIds() throws Exception {
+    int serverThreads = 10, callerCount = 100, perCallerCallCount = 100;
+    TestServer server = new TestServer(serverThreads, false);
+
+    // Attach a listener that tracks every call ID received by the server.  This
+    // list must be synchronized, because multiple server threads will add to it.
+    final List<Integer> callIds = Collections.synchronizedList(
+      new ArrayList<Integer>());
+    server.callListener = new Runnable() {
+      @Override
+      public void run() {
+        callIds.add(Server.getCallId());
+      }
+    };
+
+    Client client = new Client(LongWritable.class, conf);
+
+    try {
+      InetSocketAddress addr = NetUtils.getConnectAddress(server);
+      server.start();
+      SerialCaller[] callers = new SerialCaller[callerCount];
+      for (int i = 0; i < callerCount; ++i) {
+        callers[i] = new SerialCaller(client, addr, perCallerCallCount);
+        callers[i].start();
+      }
+      for (int i = 0; i < callerCount; ++i) {
+        callers[i].join();
+        assertFalse(callers[i].failed);
+      }
+    } finally {
+      client.stop();
+      server.stop();
+    }
+
+    int expectedCallCount = callerCount * perCallerCallCount;
+    assertEquals(expectedCallCount, callIds.size());
+
+    // It is not guaranteed that the server executes requests in sequential order
+    // of client call ID, so we must sort the call IDs before checking that it
+    // contains every expected value.
+    Collections.sort(callIds);
+    for (int i = 0; i < expectedCallCount; ++i) {
+      assertEquals(i, callIds.get(i).intValue());
+    }
+  }
+
   private void assertRetriesOnSocketTimeouts(Configuration conf,
       int maxTimeoutRetries) throws IOException, InterruptedException {
     SocketFactory mockFactory = Mockito.mock(SocketFactory.class);