You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2013/07/19 23:49:58 UTC

svn commit: r1505036 - in /hadoop/common/trunk/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/ipc/ src/main/proto/ src/test/java/org/apache/hadoop/ipc/

Author: szetszwo
Date: Fri Jul 19 21:49:58 2013
New Revision: 1505036

URL: http://svn.apache.org/r1505036
Log:
HADOOP-9751. Add clientId and retryCount to RpcResponseHeaderProto.

Modified:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1505036&r1=1505035&r2=1505036&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Fri Jul 19 21:49:58 2013
@@ -475,6 +475,9 @@ Release 2.1.0-beta - 2013-07-02
 
     HADOOP-9717. Add retry attempt count to the RPC requests. (jing9)
 
+    HADOOP-9751. Add clientId and retryCount to RpcResponseHeaderProto.
+    (szetszwo)
+
   OPTIMIZATIONS
 
     HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1505036&r1=1505035&r2=1505036&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Fri Jul 19 21:49:58 2013
@@ -35,6 +35,7 @@ import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.net.UnknownHostException;
 import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
 import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.Map.Entry;
@@ -276,6 +277,24 @@ public class Client {
     return refCount==0;
   }
 
+  /** Check the rpc response header. */
+  void checkResponse(RpcResponseHeaderProto header) throws IOException {
+    if (header == null) {
+      throw new IOException("Response is null.");
+    }
+    if (header.hasClientId()) {
+      // check client IDs
+      final byte[] id = header.getClientId().toByteArray();
+      if (!Arrays.equals(id, RpcConstants.DUMMY_CLIENT_ID)) {
+        if (!Arrays.equals(id, clientId)) {
+          throw new IOException("Client IDs not matched: local ID="
+              + StringUtils.byteToHexString(clientId) + ", ID in reponse="
+              + StringUtils.byteToHexString(header.getClientId().toByteArray()));
+        }
+      }
+    }
+  }
+
   Call createCall(RPC.RpcKind rpcKind, Writable rpcRequest) {
     return new Call(rpcKind, rpcRequest);
   }
@@ -1054,9 +1073,8 @@ public class Client {
         int totalLen = in.readInt();
         RpcResponseHeaderProto header = 
             RpcResponseHeaderProto.parseDelimitedFrom(in);
-        if (header == null) {
-          throw new IOException("Response is null.");
-        }
+        checkResponse(header);
+
         int headerLen = header.getSerializedSize();
         headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);
 

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1505036&r1=1505035&r2=1505036&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Fri Jul 19 21:49:58 2013
@@ -2289,7 +2289,9 @@ public abstract class Server {
     DataOutputStream out = new DataOutputStream(responseBuf);
     RpcResponseHeaderProto.Builder headerBuilder =  
         RpcResponseHeaderProto.newBuilder();
+    headerBuilder.setClientId(ByteString.copyFrom(call.clientId));
     headerBuilder.setCallId(call.callId);
+    headerBuilder.setRetryCount(call.retryCount);
     headerBuilder.setStatus(status);
     headerBuilder.setServerIpcVersionNum(CURRENT_VERSION);
 

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto?rev=1505036&r1=1505035&r2=1505036&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto Fri Jul 19 21:49:58 2013
@@ -130,6 +130,8 @@ message RpcResponseHeaderProto {
   optional string exceptionClassName = 4;  // if request fails
   optional string errorMsg = 5;  // if request fails, often contains strack trace
   optional RpcErrorCodeProto errorDetail = 6; // in case of error
+  optional bytes clientId = 7; // Globally unique client ID
+  optional sint32 retryCount = 8 [default = -1];
 }
 
 message RpcSaslProto {
@@ -153,4 +155,4 @@ message RpcSaslProto {
   required SaslState state = 2;
   optional bytes token     = 3;
   repeated SaslAuth auths  = 4;
-}
\ No newline at end of file
+}

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java?rev=1505036&r1=1505035&r2=1505036&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java Fri Jul 19 21:49:58 2013
@@ -60,6 +60,7 @@ import org.apache.hadoop.io.retry.RetryP
 import org.apache.hadoop.ipc.Client.ConnectionId;
 import org.apache.hadoop.ipc.RPC.RpcKind;
 import org.apache.hadoop.ipc.Server.Connection;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
 import org.apache.hadoop.net.ConnectTimeoutException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.StringUtils;
@@ -708,33 +709,45 @@ public class TestIPC {
     assertRetriesOnSocketTimeouts(conf, 4);
   }
 
-  private static class CallId {
+  private static class CallInfo {
     int id = RpcConstants.INVALID_CALL_ID;
+    int retry = RpcConstants.INVALID_RETRY_COUNT;
   }
 
   /**
-   * Test if the rpc server uses the call id generated by the rpc client.
+   * Test if
+   * (1) the rpc server uses the call id/retry provided by the rpc client, and
+   * (2) the rpc client receives the same call id/retry from the rpc server.
    */
   @Test
-  public void testCallIds() throws Exception {
-    final CallId callId = new CallId();
+  public void testCallIdAndRetry() throws Exception {
+    final CallInfo info = new CallInfo();
 
-    // Override client to store the call id
+    // Override client to store the call info and check response
     final Client client = new Client(LongWritable.class, conf) {
       @Override
       Call createCall(RpcKind rpcKind, Writable rpcRequest) {
         final Call call = super.createCall(rpcKind, rpcRequest);
-        callId.id = call.id;
+        info.id = call.id;
+        info.retry = call.retry;
         return call;
       }
+      
+      @Override
+      void checkResponse(RpcResponseHeaderProto header) throws IOException {
+        super.checkResponse(header);
+        Assert.assertEquals(info.id, header.getCallId());
+        Assert.assertEquals(info.retry, header.getRetryCount());
+      }
     };
 
-    // Attach a listener that tracks every call ID received by the server.
+    // Attach a listener that tracks every call received by the server.
     final TestServer server = new TestServer(1, false);
     server.callListener = new Runnable() {
       @Override
       public void run() {
-        Assert.assertEquals(callId.id, Server.getCallId());
+        Assert.assertEquals(info.id, Server.getCallId());
+        Assert.assertEquals(info.retry, Server.getCallRetryCount());
       }
     };