You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2013/07/20 00:20:10 UTC
svn commit: r1505053 - in
/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common: ./
src/main/java/org/apache/hadoop/io/retry/
src/main/java/org/apache/hadoop/ipc/
src/main/java/org/apache/hadoop/security/ src/main/java/org/apache/hadoop/...
Author: suresh
Date: Fri Jul 19 22:20:09 2013
New Revision: 1505053
URL: http://svn.apache.org/r1505053
Log:
HADOOP-9717. Merge r1504725 from trunk.
Modified:
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1505053&r1=1505052&r2=1505053&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt Fri Jul 19 22:20:09 2013
@@ -203,6 +203,8 @@ Release 2.1.0-beta - 2013-07-02
HADOOP-9716. Rpc retries should use the same call ID as the original call.
(szetszwo)
+ HADOOP-9717. Add retry attempt count to the RPC requests. (jing9)
+
OPTIMIZATIONS
HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java?rev=1505053&r1=1505052&r2=1505053&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java Fri Jul 19 22:20:09 2013
@@ -36,6 +36,8 @@ import org.apache.hadoop.ipc.RpcConstant
import org.apache.hadoop.ipc.RpcInvocationHandler;
import org.apache.hadoop.util.ThreadUtil;
+import com.google.common.base.Preconditions;
+
class RetryInvocationHandler implements RpcInvocationHandler {
public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
private final FailoverProxyProvider proxyProvider;
@@ -87,7 +89,7 @@ class RetryInvocationHandler implements
}
if (isRpc) {
- Client.setCallId(callId);
+ Client.setCallIdAndRetryCount(callId, retries);
}
try {
Object ret = invokeMethod(method, args);
@@ -97,8 +99,8 @@ class RetryInvocationHandler implements
boolean isMethodIdempotent = proxyProvider.getInterface()
.getMethod(method.getName(), method.getParameterTypes())
.isAnnotationPresent(Idempotent.class);
- RetryAction action = policy.shouldRetry(e, retries++, invocationFailoverCount,
- isMethodIdempotent);
+ RetryAction action = policy.shouldRetry(e, retries++,
+ invocationFailoverCount, isMethodIdempotent);
if (action.action == RetryAction.RetryDecision.FAIL) {
if (action.reason != null) {
LOG.warn("Exception while invoking " +
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1505053&r1=1505052&r2=1505053&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Fri Jul 19 22:20:09 2013
@@ -107,12 +107,16 @@ public class Client {
private static final AtomicInteger callIdCounter = new AtomicInteger();
private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
+ private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
- /** Set call id for the next call. */
- public static void setCallId(int cid) {
+ /** Set call id and retry count for the next call. */
+ public static void setCallIdAndRetryCount(int cid, int rc) {
Preconditions.checkArgument(cid != RpcConstants.INVALID_CALL_ID);
Preconditions.checkState(callId.get() == null);
+ Preconditions.checkArgument(rc != RpcConstants.INVALID_RETRY_COUNT);
+
callId.set(cid);
+ retryCount.set(rc);
}
private Hashtable<ConnectionId, Connection> connections =
@@ -279,6 +283,7 @@ public class Client {
*/
static class Call {
final int id; // call id
+ final int retry; // retry count
final Writable rpcRequest; // the serialized rpc request
Writable rpcResponse; // null if rpc has error
IOException error; // exception, null if success
@@ -296,6 +301,13 @@ public class Client {
callId.set(null);
this.id = id;
}
+
+ final Integer rc = retryCount.get();
+ if (rc == null) {
+ this.retry = 0;
+ } else {
+ this.retry = rc;
+ }
}
/** Indicate when the call is complete and the
@@ -866,7 +878,7 @@ public class Client {
RpcRequestHeaderProto connectionContextHeader = ProtoUtil
.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID,
- clientId);
+ RpcConstants.INVALID_RETRY_COUNT, clientId);
RpcRequestMessageWrapper request =
new RpcRequestMessageWrapper(connectionContextHeader, message);
@@ -974,7 +986,8 @@ public class Client {
// Items '1' and '2' are prepared here.
final DataOutputBuffer d = new DataOutputBuffer();
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
- call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, clientId);
+ call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
+ clientId);
header.writeDelimitedTo(d);
call.rpcRequest.write(d);
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java?rev=1505053&r1=1505052&r2=1505053&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java Fri Jul 19 22:20:09 2013
@@ -33,6 +33,8 @@ public class RpcConstants {
public static final int INVALID_CALL_ID = -2;
+ public static final int INVALID_RETRY_COUNT = -1;
+
/**
* The first four bytes of Hadoop RPC connections
*/
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1505053&r1=1505052&r2=1505053&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Fri Jul 19 22:20:09 2013
@@ -279,6 +279,15 @@ public abstract class Server {
Call call = CurCall.get();
return call != null ? call.callId : RpcConstants.INVALID_CALL_ID;
}
+
+ /**
+ * @return The current active RPC call's retry count. -1 indicates the retry
+ * cache is not supported in the client side.
+ */
+ public static int getCallRetryCount() {
+ Call call = CurCall.get();
+ return call != null ? call.retryCount : RpcConstants.INVALID_RETRY_COUNT;
+ }
/** Returns the remote side ip address when invoked inside an RPC
* Returns null incase of an error.
@@ -453,6 +462,7 @@ public abstract class Server {
/** A call queued for handling. */
private static class Call {
private final int callId; // the client's call id
+ private final int retryCount; // the retry count of the call
private final Writable rpcRequest; // Serialized Rpc request from client
private final Connection connection; // connection to client
private long timestamp; // time received when response is null
@@ -461,14 +471,16 @@ public abstract class Server {
private final RPC.RpcKind rpcKind;
private final byte[] clientId;
- private Call(int id, Writable param, Connection connection) {
- this(id, param, connection, RPC.RpcKind.RPC_BUILTIN,
+ private Call(int id, int retryCount, Writable param,
+ Connection connection) {
+ this(id, retryCount, param, connection, RPC.RpcKind.RPC_BUILTIN,
RpcConstants.DUMMY_CLIENT_ID);
}
- private Call(int id, Writable param, Connection connection,
+ private Call(int id, int retryCount, Writable param, Connection connection,
RPC.RpcKind kind, byte[] clientId) {
this.callId = id;
+ this.retryCount = retryCount;
this.rpcRequest = param;
this.connection = connection;
this.timestamp = Time.now();
@@ -479,7 +491,8 @@ public abstract class Server {
@Override
public String toString() {
- return rpcRequest + " from " + connection + " Call#" + callId;
+ return rpcRequest + " from " + connection + " Call#" + callId + " Retry#"
+ + retryCount;
}
public void setResponse(ByteBuffer response) {
@@ -1160,11 +1173,12 @@ public abstract class Server {
private static final int AUTHORIZATION_FAILED_CALLID = -1;
private static final int CONNECTION_CONTEXT_CALL_ID = -3;
- private final Call authFailedCall =
- new Call(AUTHORIZATION_FAILED_CALLID, null, this);
+ private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID,
+ RpcConstants.INVALID_RETRY_COUNT, null, this);
private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
- private final Call saslCall = new Call(AuthProtocol.SASL.callId, null, this);
+ private final Call saslCall = new Call(AuthProtocol.SASL.callId,
+ RpcConstants.INVALID_RETRY_COUNT, null, this);
private final ByteArrayOutputStream saslResponse = new ByteArrayOutputStream();
private boolean sentNegotiate = false;
@@ -1592,20 +1606,23 @@ public abstract class Server {
if (clientVersion >= 9) {
// Versions >>9 understand the normal response
- Call fakeCall = new Call(-1, null, this);
+ Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null,
+ this);
setupResponse(buffer, fakeCall,
RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_VERSION_MISMATCH,
null, VersionMismatch.class.getName(), errMsg);
responder.doRespond(fakeCall);
} else if (clientVersion >= 3) {
- Call fakeCall = new Call(-1, null, this);
+ Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null,
+ this);
// Versions 3 to 8 use older response
setupResponseOldVersionFatal(buffer, fakeCall,
null, VersionMismatch.class.getName(), errMsg);
responder.doRespond(fakeCall);
} else if (clientVersion == 2) { // Hadoop 0.18.3
- Call fakeCall = new Call(0, null, this);
+ Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null,
+ this);
DataOutputStream out = new DataOutputStream(buffer);
out.writeInt(0); // call ID
out.writeBoolean(true); // error
@@ -1618,7 +1635,7 @@ public abstract class Server {
}
private void setupHttpRequestOnIpcPortResponse() throws IOException {
- Call fakeCall = new Call(0, null, this);
+ Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null, this);
fakeCall.setResponse(ByteBuffer.wrap(
RECEIVED_HTTP_REQ_RESPONSE.getBytes()));
responder.doRespond(fakeCall);
@@ -1750,12 +1767,14 @@ public abstract class Server {
private void processOneRpc(byte[] buf)
throws IOException, WrappedRpcServerException, InterruptedException {
int callId = -1;
+ int retry = RpcConstants.INVALID_RETRY_COUNT;
try {
final DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(buf));
final RpcRequestHeaderProto header =
decodeProtobufFromStream(RpcRequestHeaderProto.newBuilder(), dis);
callId = header.getCallId();
+ retry = header.getRetryCount();
if (LOG.isDebugEnabled()) {
LOG.debug(" got #" + callId);
}
@@ -1772,7 +1791,7 @@ public abstract class Server {
}
} catch (WrappedRpcServerException wrse) { // inform client of error
Throwable ioe = wrse.getCause();
- final Call call = new Call(callId, null, this);
+ final Call call = new Call(callId, retry, null, this);
setupResponse(authFailedResponse, call,
RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null,
ioe.getClass().getName(), ioe.getMessage());
@@ -1846,9 +1865,9 @@ public abstract class Server {
RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err);
}
- Call call = new Call(header.getCallId(), rpcRequest, this,
- ProtoUtil.convert(header.getRpcKind()), header.getClientId()
- .toByteArray());
+ Call call = new Call(header.getCallId(), header.getRetryCount(),
+ rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), header
+ .getClientId().toByteArray());
callQueue.put(call); // queue the call; maybe blocked here
incRpcCount(); // Increment the rpc count
}
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java?rev=1505053&r1=1505052&r2=1505053&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java Fri Jul 19 22:20:09 2013
@@ -75,7 +75,7 @@ public class SaslRpcClient {
private static final RpcRequestHeaderProto saslHeader = ProtoUtil
.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
OperationProto.RPC_FINAL_PACKET, AuthProtocol.SASL.callId,
- RpcConstants.DUMMY_CLIENT_ID);
+ RpcConstants.INVALID_RETRY_COUNT, RpcConstants.DUMMY_CLIENT_ID);
private static final RpcSaslProto negotiateRequest =
RpcSaslProto.newBuilder().setState(SaslState.NEGOTIATE).build();
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java?rev=1505053&r1=1505052&r2=1505053&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java Fri Jul 19 22:20:09 2013
@@ -160,10 +160,11 @@ public abstract class ProtoUtil {
}
public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind,
- RpcRequestHeaderProto.OperationProto operation, int callId, byte[] uuid) {
+ RpcRequestHeaderProto.OperationProto operation, int callId,
+ int retryCount, byte[] uuid) {
RpcRequestHeaderProto.Builder result = RpcRequestHeaderProto.newBuilder();
result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId)
- .setClientId(ByteString.copyFrom(uuid));
+ .setRetryCount(retryCount).setClientId(ByteString.copyFrom(uuid));
return result.build();
}
}
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto?rev=1505053&r1=1505052&r2=1505053&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto Fri Jul 19 22:20:09 2013
@@ -65,6 +65,8 @@ message RpcRequestHeaderProto { // the h
required uint32 callId = 3; // a sequence number that is sent back in response
required bytes clientId = 4; // Globally unique client ID
// clientId + callId uniquely identifies a request
+ // retry count, 1 means this is the first retry
+ optional sint32 retryCount = 5 [default = -1];
}
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java?rev=1505053&r1=1505052&r2=1505053&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java Fri Jul 19 22:20:09 2013
@@ -35,6 +35,8 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
@@ -53,6 +55,9 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.Server.Connection;
import org.apache.hadoop.net.ConnectTimeoutException;
@@ -171,6 +176,45 @@ public class TestIPC {
}
}
+ /**
+ * A RpcInvocationHandler instance for test. Its invoke function uses the same
+ * {@link Client} instance, and will fail the first totalRetry times (by
+ * throwing an IOException).
+ */
+ private static class TestInvocationHandler implements RpcInvocationHandler {
+ private static int retry = 0;
+ private final Client client;
+ private final Server server;
+ private final int total;
+
+ TestInvocationHandler(Client client, Server server, int total) {
+ this.client = client;
+ this.server = server;
+ this.total = total;
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args)
+ throws Throwable {
+ LongWritable param = new LongWritable(RANDOM.nextLong());
+ LongWritable value = (LongWritable) client.call(param,
+ NetUtils.getConnectAddress(server), null, null, 0, conf);
+ if (retry++ < total) {
+ throw new IOException("Fake IOException");
+ } else {
+ return value;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {}
+
+ @Override
+ public ConnectionId getConnectionId() {
+ return null;
+ }
+ }
+
@Test
public void testSerial() throws Exception {
testSerial(3, false, 2, 5, 100);
@@ -705,6 +749,110 @@ public class TestIPC {
server.stop();
}
}
+
+ /** A dummy protocol */
+ private interface DummyProtocol {
+ public void dummyRun();
+ }
+
+ /**
+ * Test the retry count while used in a retry proxy.
+ */
+ @Test
+ public void testRetryProxy() throws Exception {
+ final Client client = new Client(LongWritable.class, conf);
+
+ final TestServer server = new TestServer(1, false);
+ server.callListener = new Runnable() {
+ private int retryCount = 0;
+ @Override
+ public void run() {
+ Assert.assertEquals(retryCount++, Server.getCallRetryCount());
+ }
+ };
+
+ final int totalRetry = 256;
+ DummyProtocol proxy = (DummyProtocol) Proxy.newProxyInstance(
+ DummyProtocol.class.getClassLoader(),
+ new Class[] { DummyProtocol.class }, new TestInvocationHandler(client,
+ server, totalRetry));
+ DummyProtocol retryProxy = (DummyProtocol) RetryProxy.create(
+ DummyProtocol.class, proxy, RetryPolicies.RETRY_FOREVER);
+
+ try {
+ server.start();
+ retryProxy.dummyRun();
+ Assert.assertEquals(TestInvocationHandler.retry, totalRetry + 1);
+ } finally {
+ Client.setCallIdAndRetryCount(0, 0);
+ client.stop();
+ server.stop();
+ }
+ }
+
+ /**
+ * Test if the rpc server gets the default retry count (0) from client.
+ */
+ @Test
+ public void testInitialCallRetryCount() throws Exception {
+ // Override client to store the call id
+ final Client client = new Client(LongWritable.class, conf);
+
+ // Attach a listener that tracks every call ID received by the server.
+ final TestServer server = new TestServer(1, false);
+ server.callListener = new Runnable() {
+ @Override
+ public void run() {
+ // we have not set the retry count for the client, thus on the server
+ // side we should see retry count as 0
+ Assert.assertEquals(0, Server.getCallRetryCount());
+ }
+ };
+
+ try {
+ InetSocketAddress addr = NetUtils.getConnectAddress(server);
+ server.start();
+ final SerialCaller caller = new SerialCaller(client, addr, 10);
+ caller.run();
+ assertFalse(caller.failed);
+ } finally {
+ client.stop();
+ server.stop();
+ }
+ }
+
+ /**
+ * Test if the rpc server gets the retry count from client.
+ */
+ @Test
+ public void testCallRetryCount() throws Exception {
+ final int retryCount = 255;
+ // Override client to store the call id
+ final Client client = new Client(LongWritable.class, conf);
+ Client.setCallIdAndRetryCount(Client.nextCallId(), 255);
+
+ // Attach a listener that tracks every call ID received by the server.
+ final TestServer server = new TestServer(1, false);
+ server.callListener = new Runnable() {
+ @Override
+ public void run() {
+ // we have not set the retry count for the client, thus on the server
+ // side we should see retry count as 0
+ Assert.assertEquals(retryCount, Server.getCallRetryCount());
+ }
+ };
+
+ try {
+ InetSocketAddress addr = NetUtils.getConnectAddress(server);
+ server.start();
+ final SerialCaller caller = new SerialCaller(client, addr, 10);
+ caller.run();
+ assertFalse(caller.failed);
+ } finally {
+ client.stop();
+ server.stop();
+ }
+ }
/**
* Tests that client generates a unique sequential call ID for each RPC call,
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java?rev=1505053&r1=1505052&r2=1505053&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java Fri Jul 19 22:20:09 2013
@@ -27,6 +27,7 @@ import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.ipc.RPC.RpcKind;
+import org.apache.hadoop.ipc.RpcConstants;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
import org.junit.Test;
@@ -79,7 +80,8 @@ public class TestProtoUtil {
public void testRpcClientId() {
byte[] uuid = StringUtils.getUuidBytes();
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
- RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, 0, uuid);
+ RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, 0,
+ RpcConstants.INVALID_RETRY_COUNT, uuid);
assertTrue(Arrays.equals(uuid, header.getClientId().toByteArray()));
}
}