You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2013/07/18 06:42:56 UTC
svn commit: r1504362 - in
/hadoop/common/trunk/hadoop-common-project/hadoop-common: ./
src/main/java/org/apache/hadoop/io/retry/
src/main/java/org/apache/hadoop/ipc/ src/test/java/org/apache/hadoop/ipc/
Author: szetszwo
Date: Thu Jul 18 04:42:56 2013
New Revision: 1504362
URL: http://svn.apache.org/r1504362
Log:
HADOOP-9716. Rpc retries should use the same call ID as the original call.
Modified:
hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1504362&r1=1504361&r2=1504362&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Thu Jul 18 04:42:56 2013
@@ -477,6 +477,9 @@ Release 2.1.0-beta - 2013-07-02
HADOOP-9734. Common protobuf definitions for GetUserMappingsProtocol,
RefreshAuthorizationPolicyProtocol and RefreshUserMappingsProtocol (jlowe)
+ HADOOP-9716. Rpc retries should use the same call ID as the original call.
+ (szetszwo)
+
OPTIMIZATIONS
HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java?rev=1504362&r1=1504361&r2=1504362&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java Thu Jul 18 04:42:56 2013
@@ -18,18 +18,22 @@
package org.apache.hadoop.io.retry;
import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
import java.util.Collections;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
-import org.apache.hadoop.util.ThreadUtil;
+import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RpcConstants;
import org.apache.hadoop.ipc.RpcInvocationHandler;
+import org.apache.hadoop.util.ThreadUtil;
class RetryInvocationHandler implements RpcInvocationHandler {
public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
@@ -44,13 +48,13 @@ class RetryInvocationHandler implements
private final RetryPolicy defaultPolicy;
private final Map<String,RetryPolicy> methodNameToPolicyMap;
private Object currentProxy;
-
- public RetryInvocationHandler(FailoverProxyProvider proxyProvider,
+
+ RetryInvocationHandler(FailoverProxyProvider proxyProvider,
RetryPolicy retryPolicy) {
this(proxyProvider, retryPolicy, Collections.<String, RetryPolicy>emptyMap());
}
- public RetryInvocationHandler(FailoverProxyProvider proxyProvider,
+ RetryInvocationHandler(FailoverProxyProvider proxyProvider,
RetryPolicy defaultPolicy,
Map<String, RetryPolicy> methodNameToPolicyMap) {
this.proxyProvider = proxyProvider;
@@ -69,6 +73,8 @@ class RetryInvocationHandler implements
// The number of times this method invocation has been failed over.
int invocationFailoverCount = 0;
+ final boolean isRpc = isRpcInvocation();
+ final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID;
int retries = 0;
while (true) {
// The number of times this invocation handler has ever been failed over,
@@ -78,6 +84,10 @@ class RetryInvocationHandler implements
synchronized (proxyProvider) {
invocationAttemptFailoverCount = proxyProviderFailoverCount;
}
+
+ if (isRpc) {
+ Client.setCallId(callId);
+ }
try {
Object ret = invokeMethod(method, args);
hasMadeASuccessfulCall = true;
@@ -166,6 +176,14 @@ class RetryInvocationHandler implements
}
}
+ private boolean isRpcInvocation() {
+ if (!Proxy.isProxyClass(currentProxy.getClass())) {
+ return false;
+ }
+ final InvocationHandler ih = Proxy.getInvocationHandler(currentProxy);
+ return ih instanceof RpcInvocationHandler;
+ }
+
@Override
public void close() throws IOException {
proxyProvider.close();
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1504362&r1=1504361&r2=1504362&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Thu Jul 18 04:42:56 2013
@@ -91,6 +91,7 @@ import org.apache.hadoop.util.Reflection
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.CodedOutputStream;
@@ -106,11 +107,22 @@ public class Client {
public static final Log LOG = LogFactory.getLog(Client.class);
+ /** A counter for generating call IDs. */
+ private static final AtomicInteger callIdCounter = new AtomicInteger();
+
+ private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
+
+ /** Set call id for the next call. */
+ public static void setCallId(int cid) {
+ Preconditions.checkArgument(cid != RpcConstants.INVALID_CALL_ID);
+ Preconditions.checkState(callId.get() == null);
+ callId.set(cid);
+ }
+
private Hashtable<ConnectionId, Connection> connections =
new Hashtable<ConnectionId, Connection>();
private Class<? extends Writable> valueClass; // class of call values
- private final AtomicInteger counter = new AtomicInteger(); // call ID sequence
private AtomicBoolean running = new AtomicBoolean(true); // if client runs
final private Configuration conf;
@@ -259,11 +271,15 @@ public class Client {
synchronized boolean isZeroReference() {
return refCount==0;
}
-
+
+ Call createCall(RPC.RpcKind rpcKind, Writable rpcRequest) {
+ return new Call(rpcKind, rpcRequest);
+ }
+
/**
* Class that represents an RPC call
*/
- private class Call {
+ static class Call {
final int id; // call id
final Writable rpcRequest; // the serialized rpc request
Writable rpcResponse; // null if rpc has error
@@ -271,10 +287,17 @@ public class Client {
final RPC.RpcKind rpcKind; // Rpc EngineKind
boolean done; // true when call is done
- protected Call(RPC.RpcKind rpcKind, Writable param) {
+ private Call(RPC.RpcKind rpcKind, Writable param) {
this.rpcKind = rpcKind;
this.rpcRequest = param;
- this.id = nextCallId();
+
+ final Integer id = callId.get();
+ if (id == null) {
+ this.id = nextCallId();
+ } else {
+ callId.set(null);
+ this.id = id;
+ }
}
/** Indicate when the call is complete and the
@@ -1346,7 +1369,7 @@ public class Client {
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, int serviceClass)
throws InterruptedException, IOException {
- Call call = new Call(rpcKind, rpcRequest);
+ final Call call = createCall(rpcKind, rpcRequest);
Connection connection = getConnection(remoteId, call, serviceClass);
try {
connection.sendRpcRequest(call); // send the rpc request
@@ -1633,9 +1656,9 @@ public class Client {
* versions of the client did not mask off the sign bit, so a server may still
* see a negative call ID if it receives connections from an old client.
*
- * @return int next valid call ID
+ * @return next call ID
*/
- private int nextCallId() {
- return counter.getAndIncrement() & 0x7FFFFFFF;
+ public static int nextCallId() {
+ return callIdCounter.getAndIncrement() & 0x7FFFFFFF;
}
}
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java?rev=1504362&r1=1504361&r2=1504362&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java Thu Jul 18 04:42:56 2013
@@ -124,7 +124,7 @@ public class ProtobufRpcEngine implement
/**
* This constructor takes a connectionId, instead of creating a new one.
*/
- public Invoker(Class<?> protocol, Client.ConnectionId connId,
+ private Invoker(Class<?> protocol, Client.ConnectionId connId,
Configuration conf, SocketFactory factory) {
this.remoteId = connId;
this.client = CLIENTS.getClient(conf, factory, RpcResponseWrapper.class);
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1504362&r1=1504361&r2=1504362&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Thu Jul 18 04:42:56 2013
@@ -278,7 +278,7 @@ public abstract class Server {
*
* @return int sequential ID number of currently active RPC call
*/
- public static int getCallId() {
+ static int getCallId() {
Call call = CurCall.get();
return call != null ? call.callId : RpcConstants.INVALID_CALL_ID;
}
@@ -464,12 +464,12 @@ public abstract class Server {
private final RPC.RpcKind rpcKind;
private final byte[] clientId;
- public Call(int id, Writable param, Connection connection) {
+ private Call(int id, Writable param, Connection connection) {
this(id, param, connection, RPC.RpcKind.RPC_BUILTIN,
RpcConstants.DUMMY_CLIENT_ID);
}
- public Call(int id, Writable param, Connection connection,
+ private Call(int id, Writable param, Connection connection,
RPC.RpcKind kind, byte[] clientId) {
this.callId = id;
this.rpcRequest = param;
@@ -482,7 +482,7 @@ public abstract class Server {
@Override
public String toString() {
- return rpcRequest.toString() + " from " + connection.toString();
+ return rpcRequest + " from " + connection + " Call#" + callId;
}
public void setResponse(ByteBuffer response) {
@@ -987,8 +987,7 @@ public abstract class Server {
call = responseQueue.removeFirst();
SocketChannel channel = call.connection.channel;
if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": responding to #" + call.callId + " from " +
- call.connection);
+ LOG.debug(getName() + ": responding to " + call);
}
//
// Send as much data as we can in the non-blocking fashion
@@ -1007,8 +1006,8 @@ public abstract class Server {
done = false; // more calls pending to be sent.
}
if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": responding to #" + call.callId + " from " +
- call.connection + " Wrote " + numBytes + " bytes.");
+ LOG.debug(getName() + ": responding to " + call
+ + " Wrote " + numBytes + " bytes.");
}
} else {
//
@@ -1035,9 +1034,8 @@ public abstract class Server {
}
}
if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": responding to #" + call.callId + " from " +
- call.connection + " Wrote partial " + numBytes +
- " bytes.");
+ LOG.debug(getName() + ": responding to " + call
+ + " Wrote partial " + numBytes + " bytes.");
}
}
error = false; // everything went off well
@@ -2004,8 +2002,7 @@ public abstract class Server {
try {
final Call call = callQueue.take(); // pop the queue; maybe blocked here
if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": has Call#" + call.callId +
- "for RpcKind " + call.rpcKind + " from " + call.connection);
+ LOG.debug(getName() + ": " + call + " for RpcKind " + call.rpcKind);
}
String errorClass = null;
String error = null;
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java?rev=1504362&r1=1504361&r2=1504362&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java Thu Jul 18 04:42:56 2013
@@ -18,40 +18,49 @@
package org.apache.hadoop.ipc;
-import org.apache.commons.logging.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.ipc.Server.Connection;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.net.ConnectTimeoutException;
-import org.apache.hadoop.net.NetUtils;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
-import java.io.File;
import java.io.DataOutput;
+import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
-import javax.net.SocketFactory;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
-import org.junit.Test;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import javax.net.SocketFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RPC.RpcKind;
+import org.apache.hadoop.ipc.Server.Connection;
+import org.apache.hadoop.net.ConnectTimeoutException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.Assert;
import org.junit.Assume;
+import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -655,6 +664,48 @@ public class TestIPC {
assertRetriesOnSocketTimeouts(conf, 4);
}
+ private static class CallId {
+ int id = RpcConstants.INVALID_CALL_ID;
+ }
+
+ /**
+ * Test if the rpc server uses the call id generated by the rpc client.
+ */
+ @Test
+ public void testCallIds() throws Exception {
+ final CallId callId = new CallId();
+
+ // Override client to store the call id
+ final Client client = new Client(LongWritable.class, conf) {
+ @Override
+ Call createCall(RpcKind rpcKind, Writable rpcRequest) {
+ final Call call = super.createCall(rpcKind, rpcRequest);
+ callId.id = call.id;
+ return call;
+ }
+ };
+
+ // Attach a listener that tracks every call ID received by the server.
+ final TestServer server = new TestServer(1, false);
+ server.callListener = new Runnable() {
+ @Override
+ public void run() {
+ Assert.assertEquals(callId.id, Server.getCallId());
+ }
+ };
+
+ try {
+ InetSocketAddress addr = NetUtils.getConnectAddress(server);
+ server.start();
+ final SerialCaller caller = new SerialCaller(client, addr, 10);
+ caller.run();
+ assertFalse(caller.failed);
+ } finally {
+ client.stop();
+ server.stop();
+ }
+ }
+
/**
* Tests that client generates a unique sequential call ID for each RPC call,
* even if multiple threads are using the same client.
@@ -701,8 +752,9 @@ public class TestIPC {
// of client call ID, so we must sort the call IDs before checking that it
// contains every expected value.
Collections.sort(callIds);
+ final int startID = callIds.get(0).intValue();
for (int i = 0; i < expectedCallCount; ++i) {
- assertEquals(i, callIds.get(i).intValue());
+ assertEquals(startID + i, callIds.get(i).intValue());
}
}
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java?rev=1504362&r1=1504361&r2=1504362&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java Thu Jul 18 04:42:56 2013
@@ -105,10 +105,10 @@ public class TestIPCServerResponder exte
byte[] bytes = new byte[byteSize];
System.arraycopy(BYTES, 0, bytes, 0, byteSize);
Writable param = new BytesWritable(bytes);
- Writable value = client.call(param, address);
+ client.call(param, address);
Thread.sleep(RANDOM.nextInt(20));
} catch (Exception e) {
- LOG.fatal("Caught: " + e);
+ LOG.fatal("Caught Exception", e);
failed = true;
}
}
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java?rev=1504362&r1=1504361&r2=1504362&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java Thu Jul 18 04:42:56 2013
@@ -151,10 +151,8 @@ public class TestProtoBufRpc {
private static TestRpcService getClient() throws IOException {
// Set RPC engine to protobuf RPC engine
- RPC.setProtocolEngine(conf, TestRpcService.class,
- ProtobufRpcEngine.class);
- return RPC.getProxy(TestRpcService.class, 0, addr,
- conf);
+ RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class);
+ return RPC.getProxy(TestRpcService.class, 0, addr, conf);
}
private static TestRpcService2 getClient2() throws IOException {
@@ -191,6 +189,7 @@ public class TestProtoBufRpc {
RemoteException re = (RemoteException)e.getCause();
RpcServerException rse = (RpcServerException) re
.unwrapRemoteException(RpcServerException.class);
+ Assert.assertNotNull(rse);
Assert.assertTrue(re.getErrorCode().equals(
RpcErrorCodeProto.ERROR_RPC_SERVER));
}
@@ -246,6 +245,7 @@ public class TestProtoBufRpc {
.setMessage(shortString).build();
// short message goes through
EchoResponseProto echoResponse = client.echo2(null, echoRequest);
+ Assert.assertEquals(shortString, echoResponse.getMessage());
final String longString = StringUtils.repeat("X", 4096);
echoRequest = EchoRequestProto.newBuilder()
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java?rev=1504362&r1=1504361&r2=1504362&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java Thu Jul 18 04:42:56 2013
@@ -94,7 +94,7 @@ public class TestRPC {
int datasize = 1024*100;
int numThreads = 50;
-
+
public interface TestProtocol extends VersionedProtocol {
public static final long versionID = 1L;
@@ -360,8 +360,7 @@ public class TestRPC {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
// create a client
- proxy = (TestProtocol)RPC.getProxy(
- TestProtocol.class, TestProtocol.versionID, addr, conf);
+ proxy = RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf);
assertEquals(addr, RPC.getServerAddress(proxy));
} finally {
@@ -388,8 +387,7 @@ public class TestRPC {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
// create a client
- proxy = (TestProtocol)RPC.getProxy(
- TestProtocol.class, TestProtocol.versionID, addr, conf);
+ proxy = RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf);
SlowRPC slowrpc = new SlowRPC(proxy);
Thread thread = new Thread(slowrpc, "SlowRPC");
@@ -432,8 +430,7 @@ public class TestRPC {
server.start();
InetSocketAddress addr = NetUtils.getConnectAddress(server);
- proxy = (TestProtocol)RPC.getProxy(
- TestProtocol.class, TestProtocol.versionID, addr, conf);
+ proxy = RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf);
proxy.ping();
@@ -557,8 +554,7 @@ public class TestRPC {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
try {
- proxy = (TestProtocol)RPC.getProxy(
- TestProtocol.class, TestProtocol.versionID, addr, conf);
+ proxy = RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf);
proxy.ping();
if (expectFailure) {
@@ -660,7 +656,7 @@ public class TestRPC {
@Test
public void testStopProxy() throws IOException {
- StoppedProtocol proxy = (StoppedProtocol) RPC.getProxy(StoppedProtocol.class,
+ StoppedProtocol proxy = RPC.getProxy(StoppedProtocol.class,
StoppedProtocol.versionID, null, conf);
StoppedInvocationHandler invocationHandler = (StoppedInvocationHandler)
Proxy.getInvocationHandler(proxy);
@@ -671,7 +667,7 @@ public class TestRPC {
@Test
public void testWrappedStopProxy() throws IOException {
- StoppedProtocol wrappedProxy = (StoppedProtocol) RPC.getProxy(StoppedProtocol.class,
+ StoppedProtocol wrappedProxy = RPC.getProxy(StoppedProtocol.class,
StoppedProtocol.versionID, null, conf);
StoppedInvocationHandler invocationHandler = (StoppedInvocationHandler)
Proxy.getInvocationHandler(wrappedProxy);
@@ -701,8 +697,7 @@ public class TestRPC {
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
TestProtocol proxy = null;
try {
- proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
- TestProtocol.versionID, addr, conf);
+ proxy = RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf);
proxy.echo("");
} catch (RemoteException e) {
LOG.info("LOGGING MESSAGE: " + e.getLocalizedMessage());
@@ -730,7 +725,7 @@ public class TestRPC {
proxy = null;
try {
UserGroupInformation.setConfiguration(conf);
- proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
+ proxy = RPC.getProxy(TestProtocol.class,
TestProtocol.versionID, mulitServerAddr, conf);
proxy.echo("");
} catch (RemoteException e) {
@@ -847,7 +842,7 @@ public class TestRPC {
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
- final TestProtocol proxy = (TestProtocol) RPC.getProxy(
+ final TestProtocol proxy = RPC.getProxy(
TestProtocol.class, TestProtocol.versionID, addr, conf);
// Connect to the server
proxy.ping();
@@ -887,8 +882,8 @@ public class TestRPC {
for (int i = 0; i < numConcurrentRPC; i++) {
final int num = i;
- final TestProtocol proxy = (TestProtocol) RPC.getProxy(
- TestProtocol.class, TestProtocol.versionID, addr, conf);
+ final TestProtocol proxy = RPC.getProxy(TestProtocol.class,
+ TestProtocol.versionID, addr, conf);
Thread rpcThread = new Thread(new Runnable() {
@Override
public void run() {
@@ -906,7 +901,7 @@ public class TestRPC {
error.set(e);
}
- LOG.error(e);
+ LOG.error("thread " + num, e);
} finally {
latch.countDown();
}