You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2013/07/22 06:19:28 UTC

svn commit: r1505590 - in /hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/io/retry/ src/main/java/org/apache/hadoop/ipc/ src/main/java/org/apache/hadoop/security/ src/main/java/org/apache/...

Author: suresh
Date: Mon Jul 22 04:19:28 2013
New Revision: 1505590

URL: http://svn.apache.org/r1505590
Log:
HADOOP-9717. Merge 1505053 from branch-2

Modified:
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
    hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1505590&r1=1505589&r2=1505590&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/CHANGES.txt Mon Jul 22 04:19:28 2013
@@ -173,6 +173,8 @@ Release 2.1.0-beta - 2013-07-02
     HADOOP-9716. Rpc retries should use the same call ID as the original call.
     (szetszwo)
 
+    HADOOP-9717. Add retry attempt count to the RPC requests. (jing9)
+
   OPTIMIZATIONS
 
     HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java?rev=1505590&r1=1505589&r2=1505590&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java Mon Jul 22 04:19:28 2013
@@ -36,6 +36,8 @@ import org.apache.hadoop.ipc.RpcConstant
 import org.apache.hadoop.ipc.RpcInvocationHandler;
 import org.apache.hadoop.util.ThreadUtil;
 
+import com.google.common.base.Preconditions;
+
 class RetryInvocationHandler implements RpcInvocationHandler {
   public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
   private final FailoverProxyProvider proxyProvider;
@@ -87,7 +89,7 @@ class RetryInvocationHandler implements 
       }
 
       if (isRpc) {
-        Client.setCallId(callId);
+        Client.setCallIdAndRetryCount(callId, retries);
       }
       try {
         Object ret = invokeMethod(method, args);
@@ -97,8 +99,8 @@ class RetryInvocationHandler implements 
         boolean isMethodIdempotent = proxyProvider.getInterface()
             .getMethod(method.getName(), method.getParameterTypes())
             .isAnnotationPresent(Idempotent.class);
-        RetryAction action = policy.shouldRetry(e, retries++, invocationFailoverCount,
-            isMethodIdempotent);
+        RetryAction action = policy.shouldRetry(e, retries++,
+            invocationFailoverCount, isMethodIdempotent);
         if (action.action == RetryAction.RetryDecision.FAIL) {
           if (action.reason != null) {
             LOG.warn("Exception while invoking " + 

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1505590&r1=1505589&r2=1505590&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Mon Jul 22 04:19:28 2013
@@ -107,12 +107,16 @@ public class Client {
   private static final AtomicInteger callIdCounter = new AtomicInteger();
 
   private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
+  private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
 
-  /** Set call id for the next call. */
-  public static void setCallId(int cid) {
+  /** Set call id and retry count for the next call. */
+  public static void setCallIdAndRetryCount(int cid, int rc) {
     Preconditions.checkArgument(cid != RpcConstants.INVALID_CALL_ID);
     Preconditions.checkState(callId.get() == null);
+    Preconditions.checkArgument(rc != RpcConstants.INVALID_RETRY_COUNT);
+
     callId.set(cid);
+    retryCount.set(rc);
   }
 
   private Hashtable<ConnectionId, Connection> connections =
@@ -226,6 +230,7 @@ public class Client {
    */
   static class Call {
     final int id;               // call id
+    final int retry;           // retry count
     final Writable rpcRequest;  // the serialized rpc request
     Writable rpcResponse;       // null if rpc has error
     IOException error;          // exception, null if success
@@ -243,6 +248,13 @@ public class Client {
         callId.set(null);
         this.id = id;
       }
+      
+      final Integer rc = retryCount.get();
+      if (rc == null) {
+        this.retry = 0;
+      } else {
+        this.retry = rc;
+      }
     }
 
     /** Indicate when the call is complete and the
@@ -813,7 +825,7 @@ public class Client {
       RpcRequestHeaderProto connectionContextHeader = ProtoUtil
           .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
               OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID,
-              clientId);
+              RpcConstants.INVALID_RETRY_COUNT, clientId);
       RpcRequestMessageWrapper request =
           new RpcRequestMessageWrapper(connectionContextHeader, message);
       
@@ -920,7 +932,8 @@ public class Client {
       // Items '1' and '2' are prepared here. 
       final DataOutputBuffer d = new DataOutputBuffer();
       RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
-         call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, clientId);
+          call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
+          clientId);
       header.writeDelimitedTo(d);
       call.rpcRequest.write(d);
 

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java?rev=1505590&r1=1505589&r2=1505590&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java Mon Jul 22 04:19:28 2013
@@ -33,6 +33,8 @@ public class RpcConstants {
   
   public static final int INVALID_CALL_ID = -2;
   
+  public static final int INVALID_RETRY_COUNT = -1;
+  
   /**
    * The first four bytes of Hadoop RPC connections
    */

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1505590&r1=1505589&r2=1505590&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Mon Jul 22 04:19:28 2013
@@ -279,6 +279,15 @@ public abstract class Server {
     Call call = CurCall.get();
     return call != null ? call.callId : RpcConstants.INVALID_CALL_ID;
   }
+  
+  /**
+   * @return The current active RPC call's retry count. -1 indicates the retry
+   *         cache is not supported in the client side.
+   */
+  public static int getCallRetryCount() {
+    Call call = CurCall.get();
+    return call != null ? call.retryCount : RpcConstants.INVALID_RETRY_COUNT;
+  }
 
   /** Returns the remote side ip address when invoked inside an RPC 
    *  Returns null incase of an error.
@@ -453,6 +462,7 @@ public abstract class Server {
   /** A call queued for handling. */
   private static class Call {
     private final int callId;             // the client's call id
+    private final int retryCount;        // the retry count of the call
     private final Writable rpcRequest;    // Serialized Rpc request from client
     private final Connection connection;  // connection to client
     private long timestamp;               // time received when response is null
@@ -461,14 +471,16 @@ public abstract class Server {
     private final RPC.RpcKind rpcKind;
     private final byte[] clientId;
 
-    private Call(int id, Writable param, Connection connection) {
-      this(id, param, connection, RPC.RpcKind.RPC_BUILTIN,
+    private Call(int id, int retryCount, Writable param, 
+        Connection connection) {
+      this(id, retryCount, param, connection, RPC.RpcKind.RPC_BUILTIN,
           RpcConstants.DUMMY_CLIENT_ID);
     }
 
-    private Call(int id, Writable param, Connection connection,
+    private Call(int id, int retryCount, Writable param, Connection connection,
         RPC.RpcKind kind, byte[] clientId) {
       this.callId = id;
+      this.retryCount = retryCount;
       this.rpcRequest = param;
       this.connection = connection;
       this.timestamp = Time.now();
@@ -479,7 +491,8 @@ public abstract class Server {
     
     @Override
     public String toString() {
-      return rpcRequest + " from " + connection + " Call#" + callId;
+      return rpcRequest + " from " + connection + " Call#" + callId + " Retry#"
+          + retryCount;
     }
 
     public void setResponse(ByteBuffer response) {
@@ -1160,11 +1173,12 @@ public abstract class Server {
     private static final int AUTHORIZATION_FAILED_CALLID = -1;
     private static final int CONNECTION_CONTEXT_CALL_ID = -3;
     
-    private final Call authFailedCall = 
-      new Call(AUTHORIZATION_FAILED_CALLID, null, this);
+    private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID,
+        RpcConstants.INVALID_RETRY_COUNT, null, this);
     private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
     
-    private final Call saslCall = new Call(AuthProtocol.SASL.callId, null, this);
+    private final Call saslCall = new Call(AuthProtocol.SASL.callId,
+        RpcConstants.INVALID_RETRY_COUNT, null, this);
     private final ByteArrayOutputStream saslResponse = new ByteArrayOutputStream();
     
     private boolean sentNegotiate = false;
@@ -1592,20 +1606,23 @@ public abstract class Server {
       
       if (clientVersion >= 9) {
         // Versions >>9  understand the normal response
-        Call fakeCall =  new Call(-1, null, this);
+        Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null,
+            this);
         setupResponse(buffer, fakeCall, 
             RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_VERSION_MISMATCH,
             null, VersionMismatch.class.getName(), errMsg);
         responder.doRespond(fakeCall);
       } else if (clientVersion >= 3) {
-        Call fakeCall =  new Call(-1, null, this);
+        Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null,
+            this);
         // Versions 3 to 8 use older response
         setupResponseOldVersionFatal(buffer, fakeCall,
             null, VersionMismatch.class.getName(), errMsg);
 
         responder.doRespond(fakeCall);
       } else if (clientVersion == 2) { // Hadoop 0.18.3
-        Call fakeCall =  new Call(0, null, this);
+        Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null,
+            this);
         DataOutputStream out = new DataOutputStream(buffer);
         out.writeInt(0); // call ID
         out.writeBoolean(true); // error
@@ -1618,7 +1635,7 @@ public abstract class Server {
     }
     
     private void setupHttpRequestOnIpcPortResponse() throws IOException {
-      Call fakeCall =  new Call(0, null, this);
+      Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null, this);
       fakeCall.setResponse(ByteBuffer.wrap(
           RECEIVED_HTTP_REQ_RESPONSE.getBytes()));
       responder.doRespond(fakeCall);
@@ -1750,12 +1767,14 @@ public abstract class Server {
     private void processOneRpc(byte[] buf)
         throws IOException, WrappedRpcServerException, InterruptedException {
       int callId = -1;
+      int retry = RpcConstants.INVALID_RETRY_COUNT;
       try {
         final DataInputStream dis =
             new DataInputStream(new ByteArrayInputStream(buf));
         final RpcRequestHeaderProto header =
             decodeProtobufFromStream(RpcRequestHeaderProto.newBuilder(), dis);
         callId = header.getCallId();
+        retry = header.getRetryCount();
         if (LOG.isDebugEnabled()) {
           LOG.debug(" got #" + callId);
         }
@@ -1772,7 +1791,7 @@ public abstract class Server {
         }
       } catch (WrappedRpcServerException wrse) { // inform client of error
         Throwable ioe = wrse.getCause();
-        final Call call = new Call(callId, null, this);
+        final Call call = new Call(callId, retry, null, this);
         setupResponse(authFailedResponse, call,
             RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null,
             ioe.getClass().getName(), ioe.getMessage());
@@ -1846,9 +1865,9 @@ public abstract class Server {
             RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err);
       }
         
-      Call call = new Call(header.getCallId(), rpcRequest, this,
-          ProtoUtil.convert(header.getRpcKind()), header.getClientId()
-              .toByteArray());
+      Call call = new Call(header.getCallId(), header.getRetryCount(),
+          rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), header
+              .getClientId().toByteArray());
       callQueue.put(call);              // queue the call; maybe blocked here
       incRpcCount();  // Increment the rpc count
     }

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java?rev=1505590&r1=1505589&r2=1505590&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java Mon Jul 22 04:19:28 2013
@@ -75,7 +75,7 @@ public class SaslRpcClient {
   private static final RpcRequestHeaderProto saslHeader = ProtoUtil
       .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
           OperationProto.RPC_FINAL_PACKET, AuthProtocol.SASL.callId,
-          RpcConstants.DUMMY_CLIENT_ID);
+          RpcConstants.INVALID_RETRY_COUNT, RpcConstants.DUMMY_CLIENT_ID);
   private static final RpcSaslProto negotiateRequest =
       RpcSaslProto.newBuilder().setState(SaslState.NEGOTIATE).build();
   

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java?rev=1505590&r1=1505589&r2=1505590&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java Mon Jul 22 04:19:28 2013
@@ -160,10 +160,11 @@ public abstract class ProtoUtil {
   }
  
   public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind,
-      RpcRequestHeaderProto.OperationProto operation, int callId, byte[] uuid) {
+      RpcRequestHeaderProto.OperationProto operation, int callId,
+      int retryCount, byte[] uuid) {
     RpcRequestHeaderProto.Builder result = RpcRequestHeaderProto.newBuilder();
     result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId)
-        .setClientId(ByteString.copyFrom(uuid));
+        .setRetryCount(retryCount).setClientId(ByteString.copyFrom(uuid));
     return result.build();
   }
 }

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto?rev=1505590&r1=1505589&r2=1505590&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto Mon Jul 22 04:19:28 2013
@@ -65,6 +65,8 @@ message RpcRequestHeaderProto { // the h
   required uint32 callId = 3; // a sequence number that is sent back in response
   required bytes clientId = 4; // Globally unique client ID
   // clientId + callId uniquely identifies a request
+  // retry count, 1 means this is the first retry
+  optional sint32 retryCount = 5 [default = -1];
 }
 
 

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java?rev=1505590&r1=1505589&r2=1505590&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java Mon Jul 22 04:19:28 2013
@@ -35,6 +35,8 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
@@ -53,6 +55,9 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.ipc.Client.ConnectionId;
 import org.apache.hadoop.ipc.RPC.RpcKind;
 import org.apache.hadoop.ipc.Server.Connection;
 import org.apache.hadoop.net.ConnectTimeoutException;
@@ -171,6 +176,45 @@ public class TestIPC {
     }
   }
 
+  /**
+   * A RpcInvocationHandler instance for test. Its invoke function uses the same
+   * {@link Client} instance, and will fail the first totalRetry times (by 
+   * throwing an IOException).
+   */
+  private static class TestInvocationHandler implements RpcInvocationHandler {
+    private static int retry = 0;
+    private final Client client;
+    private final Server server;
+    private final int total;
+    
+    TestInvocationHandler(Client client, Server server, int total) {
+      this.client = client;
+      this.server = server;
+      this.total = total;
+    }
+    
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args)
+        throws Throwable {
+      LongWritable param = new LongWritable(RANDOM.nextLong());
+      LongWritable value = (LongWritable) client.call(param,
+          NetUtils.getConnectAddress(server), null, null, 0, conf);
+      if (retry++ < total) {
+        throw new IOException("Fake IOException");
+      } else {
+        return value;
+      }
+    }
+
+    @Override
+    public void close() throws IOException {}
+    
+    @Override
+    public ConnectionId getConnectionId() {
+      return null;
+    }
+  }
+  
   @Test
   public void testSerial() throws Exception {
     testSerial(3, false, 2, 5, 100);
@@ -687,6 +731,110 @@ public class TestIPC {
       server.stop();
     }
   }
+  
+  /** A dummy protocol */
+  private interface DummyProtocol {
+    public void dummyRun();
+  }
+  
+  /**
+   * Test the retry count while used in a retry proxy.
+   */
+  @Test
+  public void testRetryProxy() throws Exception {
+    final Client client = new Client(LongWritable.class, conf);
+    
+    final TestServer server = new TestServer(1, false);
+    server.callListener = new Runnable() {
+      private int retryCount = 0;
+      @Override
+      public void run() {
+        Assert.assertEquals(retryCount++, Server.getCallRetryCount());
+      }
+    };
+
+    final int totalRetry = 256;
+    DummyProtocol proxy = (DummyProtocol) Proxy.newProxyInstance(
+        DummyProtocol.class.getClassLoader(),
+        new Class[] { DummyProtocol.class }, new TestInvocationHandler(client,
+            server, totalRetry));
+    DummyProtocol retryProxy = (DummyProtocol) RetryProxy.create(
+        DummyProtocol.class, proxy, RetryPolicies.RETRY_FOREVER);
+    
+    try {
+      server.start();
+      retryProxy.dummyRun();
+      Assert.assertEquals(TestInvocationHandler.retry, totalRetry + 1);
+    } finally {
+      Client.setCallIdAndRetryCount(0, 0);
+      client.stop();
+      server.stop();
+    }
+  }
+  
+  /**
+   * Test if the rpc server gets the default retry count (0) from client.
+   */
+  @Test
+  public void testInitialCallRetryCount() throws Exception {
+    // Override client to store the call id
+    final Client client = new Client(LongWritable.class, conf);
+
+    // Attach a listener that tracks every call ID received by the server.
+    final TestServer server = new TestServer(1, false);
+    server.callListener = new Runnable() {
+      @Override
+      public void run() {
+        // we have not set the retry count for the client, thus on the server
+        // side we should see retry count as 0
+        Assert.assertEquals(0, Server.getCallRetryCount());
+      }
+    };
+
+    try {
+      InetSocketAddress addr = NetUtils.getConnectAddress(server);
+      server.start();
+      final SerialCaller caller = new SerialCaller(client, addr, 10);
+      caller.run();
+      assertFalse(caller.failed);
+    } finally {
+      client.stop();
+      server.stop();
+    }
+  }
+  
+  /**
+   * Test if the rpc server gets the retry count from client.
+   */
+  @Test
+  public void testCallRetryCount() throws Exception {
+    final int retryCount = 255;
+    // Override client to store the call id
+    final Client client = new Client(LongWritable.class, conf);
+    Client.setCallIdAndRetryCount(Client.nextCallId(), 255);
+
+    // Attach a listener that tracks every call ID received by the server.
+    final TestServer server = new TestServer(1, false);
+    server.callListener = new Runnable() {
+      @Override
+      public void run() {
+        // we have not set the retry count for the client, thus on the server
+        // side we should see retry count as 0
+        Assert.assertEquals(retryCount, Server.getCallRetryCount());
+      }
+    };
+
+    try {
+      InetSocketAddress addr = NetUtils.getConnectAddress(server);
+      server.start();
+      final SerialCaller caller = new SerialCaller(client, addr, 10);
+      caller.run();
+      assertFalse(caller.failed);
+    } finally {
+      client.stop();
+      server.stop();
+    }
+  }
 
   /**
    * Tests that client generates a unique sequential call ID for each RPC call,

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java?rev=1505590&r1=1505589&r2=1505590&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java Mon Jul 22 04:19:28 2013
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.util.Arrays;
 
 import org.apache.hadoop.ipc.RPC.RpcKind;
+import org.apache.hadoop.ipc.RpcConstants;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
 import org.junit.Test;
@@ -79,7 +80,8 @@ public class TestProtoUtil {
   public void testRpcClientId() {
     byte[] uuid = StringUtils.getUuidBytes();
     RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
-        RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, 0, uuid);
+        RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, 0,
+        RpcConstants.INVALID_RETRY_COUNT, uuid);
     assertTrue(Arrays.equals(uuid, header.getClientId().toByteArray()));
   }
 }