You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2016/08/30 22:05:23 UTC
[04/50] [abbrv] hadoop git commit: Revert "HADOOP-13465. Design
Server.Call to be extensible for unified call queue. Contributed by Daryn
Sharp."
Revert "HADOOP-13465. Design Server.Call to be extensible for unified call queue. Contributed by Daryn Sharp."
This reverts commit d288a0ba8364d81aacda9f4a21022eecb6dc4e22.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/81485dbf
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/81485dbf
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/81485dbf
Branch: refs/heads/YARN-2915
Commit: 81485dbfc1ffb8daa609be8eb31094cc28646dd3
Parents: 1360bd2
Author: Kihwal Lee <ki...@apache.org>
Authored: Thu Aug 25 16:04:54 2016 -0500
Committer: Kihwal Lee <ki...@apache.org>
Committed: Thu Aug 25 16:04:54 2016 -0500
----------------------------------------------------------------------
.../main/java/org/apache/hadoop/ipc/Server.java | 336 ++++++++-----------
1 file changed, 145 insertions(+), 191 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/81485dbf/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 09fe889..4c73f6a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -354,9 +354,10 @@ public abstract class Server {
*/
public static InetAddress getRemoteIp() {
Call call = CurCall.get();
- return (call != null ) ? call.getHostInetAddress() : null;
+ return (call != null && call.connection != null) ? call.connection
+ .getHostInetAddress() : null;
}
-
+
/**
* Returns the clientId from the current RPC request
*/
@@ -379,9 +380,10 @@ public abstract class Server {
*/
public static UserGroupInformation getRemoteUser() {
Call call = CurCall.get();
- return (call != null) ? call.getRemoteUser() : null;
+ return (call != null && call.connection != null) ? call.connection.user
+ : null;
}
-
+
/** Return true if the invocation was through an RPC.
*/
public static boolean isRpcInvocation() {
@@ -481,7 +483,7 @@ public abstract class Server {
if ((rpcMetrics.getProcessingSampleCount() > minSampleSize) &&
(processingTime > threeSigma)) {
if(LOG.isWarnEnabled()) {
- String client = CurCall.get().toString();
+ String client = CurCall.get().connection.toString();
LOG.warn(
"Slow RPC : " + methodName + " took " + processingTime +
" milliseconds to process from client " + client);
@@ -655,65 +657,62 @@ public abstract class Server {
CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT);
}
- /** A generic call queued for handling. */
- public static class Call implements Schedulable,
- PrivilegedExceptionAction<Void> {
- final int callId; // the client's call id
- final int retryCount; // the retry count of the call
- long timestamp; // time received when response is null
- // time served when response is not null
+ /** A call queued for handling. */
+ public static class Call implements Schedulable {
+ private final int callId; // the client's call id
+ private final int retryCount; // the retry count of the call
+ private final Writable rpcRequest; // Serialized Rpc request from client
+ private final Connection connection; // connection to client
+ private long timestamp; // time received when response is null
+ // time served when response is not null
+ private ByteBuffer rpcResponse; // the response for this call
private AtomicInteger responseWaitCount = new AtomicInteger(1);
- final RPC.RpcKind rpcKind;
- final byte[] clientId;
+ private final RPC.RpcKind rpcKind;
+ private final byte[] clientId;
private final TraceScope traceScope; // the HTrace scope on the server side
private final CallerContext callerContext; // the call context
private int priorityLevel;
// the priority level assigned by scheduler, 0 by default
- Call(Call call) {
- this(call.callId, call.retryCount, call.rpcKind, call.clientId,
- call.traceScope, call.callerContext);
+ private Call(Call call) {
+ this(call.callId, call.retryCount, call.rpcRequest, call.connection,
+ call.rpcKind, call.clientId, call.traceScope, call.callerContext);
}
- Call(int id, int retryCount, RPC.RpcKind kind, byte[] clientId) {
- this(id, retryCount, kind, clientId, null, null);
+ public Call(int id, int retryCount, Writable param,
+ Connection connection) {
+ this(id, retryCount, param, connection, RPC.RpcKind.RPC_BUILTIN,
+ RpcConstants.DUMMY_CLIENT_ID);
}
- @VisibleForTesting // primarily TestNamenodeRetryCache
- public Call(int id, int retryCount, Void ignore1, Void ignore2,
+ public Call(int id, int retryCount, Writable param, Connection connection,
RPC.RpcKind kind, byte[] clientId) {
- this(id, retryCount, kind, clientId, null, null);
+ this(id, retryCount, param, connection, kind, clientId, null, null);
}
- Call(int id, int retryCount, RPC.RpcKind kind, byte[] clientId,
- TraceScope traceScope, CallerContext callerContext) {
+ public Call(int id, int retryCount, Writable param, Connection connection,
+ RPC.RpcKind kind, byte[] clientId, TraceScope traceScope,
+ CallerContext callerContext) {
this.callId = id;
this.retryCount = retryCount;
+ this.rpcRequest = param;
+ this.connection = connection;
this.timestamp = Time.now();
+ this.rpcResponse = null;
this.rpcKind = kind;
this.clientId = clientId;
this.traceScope = traceScope;
this.callerContext = callerContext;
}
-
+
@Override
public String toString() {
- return "Call#" + callId + " Retry#" + retryCount;
+ return rpcRequest + " from " + connection + " Call#" + callId + " Retry#"
+ + retryCount;
}
- public Void run() throws Exception {
- return null;
- }
- // should eventually be abstract but need to avoid breaking tests
- public UserGroupInformation getRemoteUser() {
- return null;
- }
- public InetAddress getHostInetAddress() {
- return null;
- }
- public String getHostAddress() {
- InetAddress addr = getHostInetAddress();
- return (addr != null) ? addr.getHostAddress() : null;
+ public void setResponse(ByteBuffer response) {
+ this.rpcResponse = response;
}
/**
@@ -725,36 +724,34 @@ public abstract class Server {
*/
@InterfaceStability.Unstable
@InterfaceAudience.LimitedPrivate({"HDFS"})
- public final void postponeResponse() {
+ public void postponeResponse() {
int count = responseWaitCount.incrementAndGet();
assert count > 0 : "response has already been sent";
}
@InterfaceStability.Unstable
@InterfaceAudience.LimitedPrivate({"HDFS"})
- public final void sendResponse() throws IOException {
+ public void sendResponse() throws IOException {
int count = responseWaitCount.decrementAndGet();
assert count >= 0 : "response has already been sent";
if (count == 0) {
- doResponse(null);
+ connection.sendResponse(this);
}
}
@InterfaceStability.Unstable
@InterfaceAudience.LimitedPrivate({"HDFS"})
- public final void abortResponse(Throwable t) throws IOException {
+ public void abortResponse(Throwable t) throws IOException {
// don't send response if the call was already sent or aborted.
if (responseWaitCount.getAndSet(-1) > 0) {
- doResponse(t);
+ connection.abortResponse(this, t);
}
}
- void doResponse(Throwable t) throws IOException {}
-
// For Schedulable
@Override
public UserGroupInformation getUserGroupInformation() {
- return getRemoteUser();
+ return connection.user;
}
@Override
@@ -767,114 +764,6 @@ public abstract class Server {
}
}
- /** A RPC extended call queued for handling. */
- private class RpcCall extends Call {
- final Connection connection; // connection to client
- final Writable rpcRequest; // Serialized Rpc request from client
- ByteBuffer rpcResponse; // the response for this call
-
- RpcCall(RpcCall call) {
- super(call);
- this.connection = call.connection;
- this.rpcRequest = call.rpcRequest;
- }
-
- RpcCall(Connection connection, int id) {
- this(connection, id, RpcConstants.INVALID_RETRY_COUNT);
- }
-
- RpcCall(Connection connection, int id, int retryCount) {
- this(connection, id, retryCount, null,
- RPC.RpcKind.RPC_BUILTIN, RpcConstants.DUMMY_CLIENT_ID,
- null, null);
- }
-
- RpcCall(Connection connection, int id, int retryCount,
- Writable param, RPC.RpcKind kind, byte[] clientId,
- TraceScope traceScope, CallerContext context) {
- super(id, retryCount, kind, clientId, traceScope, context);
- this.connection = connection;
- this.rpcRequest = param;
- }
-
- @Override
- public UserGroupInformation getRemoteUser() {
- return connection.user;
- }
-
- @Override
- public InetAddress getHostInetAddress() {
- return connection.getHostInetAddress();
- }
-
- @Override
- public Void run() throws Exception {
- if (!connection.channel.isOpen()) {
- Server.LOG.info(Thread.currentThread().getName() + ": skipped " + this);
- return null;
- }
- String errorClass = null;
- String error = null;
- RpcStatusProto returnStatus = RpcStatusProto.SUCCESS;
- RpcErrorCodeProto detailedErr = null;
- Writable value = null;
-
- try {
- value = call(
- rpcKind, connection.protocolName, rpcRequest, timestamp);
- } catch (Throwable e) {
- if (e instanceof UndeclaredThrowableException) {
- e = e.getCause();
- }
- logException(Server.LOG, e, this);
- if (e instanceof RpcServerException) {
- RpcServerException rse = ((RpcServerException)e);
- returnStatus = rse.getRpcStatusProto();
- detailedErr = rse.getRpcErrorCodeProto();
- } else {
- returnStatus = RpcStatusProto.ERROR;
- detailedErr = RpcErrorCodeProto.ERROR_APPLICATION;
- }
- errorClass = e.getClass().getName();
- error = StringUtils.stringifyException(e);
- // Remove redundant error class name from the beginning of the
- // stack trace
- String exceptionHdr = errorClass + ": ";
- if (error.startsWith(exceptionHdr)) {
- error = error.substring(exceptionHdr.length());
- }
- }
- setupResponse(this, returnStatus, detailedErr,
- value, errorClass, error);
- sendResponse();
- return null;
- }
-
- void setResponse(ByteBuffer response) throws IOException {
- this.rpcResponse = response;
- }
-
- @Override
- void doResponse(Throwable t) throws IOException {
- RpcCall call = this;
- if (t != null) {
- // clone the call to prevent a race with another thread stomping
- // on the response while being sent. the original call is
- // effectively discarded since the wait count won't hit zero
- call = new RpcCall(this);
- setupResponse(call,
- RpcStatusProto.FATAL, RpcErrorCodeProto.ERROR_RPC_SERVER,
- null, t.getClass().getName(), StringUtils.stringifyException(t));
- }
- connection.sendResponse(this);
- }
-
- @Override
- public String toString() {
- return super.toString() + " " + rpcRequest + " from " + connection;
- }
- }
-
/** Listens on the socket. Creates jobs for the handler threads*/
private class Listener extends Thread {
@@ -1205,22 +1094,22 @@ public abstract class Server {
if(LOG.isDebugEnabled()) {
LOG.debug("Checking for old call responses.");
}
- ArrayList<RpcCall> calls;
+ ArrayList<Call> calls;
// get the list of channels from list of keys.
synchronized (writeSelector.keys()) {
- calls = new ArrayList<RpcCall>(writeSelector.keys().size());
+ calls = new ArrayList<Call>(writeSelector.keys().size());
iter = writeSelector.keys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
- RpcCall call = (RpcCall)key.attachment();
+ Call call = (Call)key.attachment();
if (call != null && key.channel() == call.connection.channel) {
calls.add(call);
}
}
}
-
- for (RpcCall call : calls) {
+
+ for(Call call : calls) {
doPurge(call, now);
}
} catch (OutOfMemoryError e) {
@@ -1238,7 +1127,7 @@ public abstract class Server {
}
private void doAsyncWrite(SelectionKey key) throws IOException {
- RpcCall call = (RpcCall)key.attachment();
+ Call call = (Call)key.attachment();
if (call == null) {
return;
}
@@ -1266,10 +1155,10 @@ public abstract class Server {
// Remove calls that have been pending in the responseQueue
// for a long time.
//
- private void doPurge(RpcCall call, long now) {
- LinkedList<RpcCall> responseQueue = call.connection.responseQueue;
+ private void doPurge(Call call, long now) {
+ LinkedList<Call> responseQueue = call.connection.responseQueue;
synchronized (responseQueue) {
- Iterator<RpcCall> iter = responseQueue.listIterator(0);
+ Iterator<Call> iter = responseQueue.listIterator(0);
while (iter.hasNext()) {
call = iter.next();
if (now > call.timestamp + PURGE_INTERVAL) {
@@ -1283,12 +1172,12 @@ public abstract class Server {
// Processes one response. Returns true if there are no more pending
// data for this channel.
//
- private boolean processResponse(LinkedList<RpcCall> responseQueue,
+ private boolean processResponse(LinkedList<Call> responseQueue,
boolean inHandler) throws IOException {
boolean error = true;
boolean done = false; // there is more data for this channel.
int numElements = 0;
- RpcCall call = null;
+ Call call = null;
try {
synchronized (responseQueue) {
//
@@ -1371,7 +1260,7 @@ public abstract class Server {
//
// Enqueue a response from the application.
//
- void doRespond(RpcCall call) throws IOException {
+ void doRespond(Call call) throws IOException {
synchronized (call.connection.responseQueue) {
// must only wrap before adding to the responseQueue to prevent
// postponed responses from being encrypted and sent out of order.
@@ -1469,7 +1358,7 @@ public abstract class Server {
private SocketChannel channel;
private ByteBuffer data;
private ByteBuffer dataLengthBuffer;
- private LinkedList<RpcCall> responseQueue;
+ private LinkedList<Call> responseQueue;
// number of outstanding rpcs
private AtomicInteger rpcCount = new AtomicInteger();
private long lastContact;
@@ -1496,8 +1385,8 @@ public abstract class Server {
public UserGroupInformation attemptingUser = null; // user name before auth
// Fake 'call' for failed authorization response
- private final RpcCall authFailedCall =
- new RpcCall(this, AUTHORIZATION_FAILED_CALL_ID);
+ private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALL_ID,
+ RpcConstants.INVALID_RETRY_COUNT, null, this);
private boolean sentNegotiate = false;
private boolean useWrap = false;
@@ -1520,7 +1409,7 @@ public abstract class Server {
this.hostAddress = addr.getHostAddress();
}
this.remotePort = socket.getPort();
- this.responseQueue = new LinkedList<RpcCall>();
+ this.responseQueue = new LinkedList<Call>();
if (socketSendBufferSize != 0) {
try {
socket.setSendBufferSize(socketSendBufferSize);
@@ -1815,7 +1704,8 @@ public abstract class Server {
}
private void doSaslReply(Message message) throws IOException {
- final RpcCall saslCall = new RpcCall(this, AuthProtocol.SASL.callId);
+ final Call saslCall = new Call(AuthProtocol.SASL.callId,
+ RpcConstants.INVALID_RETRY_COUNT, null, this);
setupResponse(saslCall,
RpcStatusProto.SUCCESS, null,
RpcWritable.wrap(message), null, null);
@@ -2032,20 +1922,23 @@ public abstract class Server {
if (clientVersion >= 9) {
// Versions >>9 understand the normal response
- RpcCall fakeCall = new RpcCall(this, -1);
+ Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null,
+ this);
setupResponse(fakeCall,
RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_VERSION_MISMATCH,
null, VersionMismatch.class.getName(), errMsg);
fakeCall.sendResponse();
} else if (clientVersion >= 3) {
- RpcCall fakeCall = new RpcCall(this, -1);
+ Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null,
+ this);
// Versions 3 to 8 use older response
setupResponseOldVersionFatal(buffer, fakeCall,
null, VersionMismatch.class.getName(), errMsg);
fakeCall.sendResponse();
} else if (clientVersion == 2) { // Hadoop 0.18.3
- RpcCall fakeCall = new RpcCall(this, 0);
+ Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null,
+ this);
DataOutputStream out = new DataOutputStream(buffer);
out.writeInt(0); // call ID
out.writeBoolean(true); // error
@@ -2057,7 +1950,7 @@ public abstract class Server {
}
private void setupHttpRequestOnIpcPortResponse() throws IOException {
- RpcCall fakeCall = new RpcCall(this, 0);
+ Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null, this);
fakeCall.setResponse(ByteBuffer.wrap(
RECEIVED_HTTP_REQ_RESPONSE.getBytes(StandardCharsets.UTF_8)));
fakeCall.sendResponse();
@@ -2205,7 +2098,7 @@ public abstract class Server {
}
} catch (WrappedRpcServerException wrse) { // inform client of error
Throwable ioe = wrse.getCause();
- final RpcCall call = new RpcCall(this, callId, retry);
+ final Call call = new Call(callId, retry, null, this);
setupResponse(call,
RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null,
ioe.getClass().getName(), ioe.getMessage());
@@ -2305,9 +2198,8 @@ public abstract class Server {
.build();
}
- RpcCall call = new RpcCall(this, header.getCallId(),
- header.getRetryCount(), rpcRequest,
- ProtoUtil.convert(header.getRpcKind()),
+ Call call = new Call(header.getCallId(), header.getRetryCount(),
+ rpcRequest, this, ProtoUtil.convert(header.getRpcKind()),
header.getClientId().toByteArray(), traceScope, callerContext);
// Save the priority level assignment by the scheduler
@@ -2431,10 +2323,21 @@ public abstract class Server {
}
}
- private void sendResponse(RpcCall call) throws IOException {
+ private void sendResponse(Call call) throws IOException {
responder.doRespond(call);
}
+ private void abortResponse(Call call, Throwable t) throws IOException {
+ // clone the call to prevent a race with the other thread stomping
+ // on the response while being sent. the original call is
+ // effectively discarded since the wait count won't hit zero
+ call = new Call(call);
+ setupResponse(call,
+ RpcStatusProto.FATAL, RpcErrorCodeProto.ERROR_RPC_SERVER,
+ null, t.getClass().getName(), StringUtils.stringifyException(t));
+ call.sendResponse();
+ }
+
/**
* Get service class for connection
* @return the serviceClass
@@ -2485,6 +2388,16 @@ public abstract class Server {
if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() + ": " + call + " for RpcKind " + call.rpcKind);
}
+ if (!call.connection.channel.isOpen()) {
+ LOG.info(Thread.currentThread().getName() + ": skipped " + call);
+ continue;
+ }
+ String errorClass = null;
+ String error = null;
+ RpcStatusProto returnStatus = RpcStatusProto.SUCCESS;
+ RpcErrorCodeProto detailedErr = null;
+ Writable value = null;
+
CurCall.set(call);
if (call.traceScope != null) {
call.traceScope.reattach();
@@ -2493,11 +2406,53 @@ public abstract class Server {
}
// always update the current call context
CallerContext.setCurrent(call.callerContext);
- UserGroupInformation remoteUser = call.getRemoteUser();
- if (remoteUser != null) {
- remoteUser.doAs(call);
- } else {
- call.run();
+
+ try {
+ // Make the call as the user via Subject.doAs, thus associating
+ // the call with the Subject
+ if (call.connection.user == null) {
+ value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest,
+ call.timestamp);
+ } else {
+ value =
+ call.connection.user.doAs
+ (new PrivilegedExceptionAction<Writable>() {
+ @Override
+ public Writable run() throws Exception {
+ // make the call
+ return call(call.rpcKind, call.connection.protocolName,
+ call.rpcRequest, call.timestamp);
+
+ }
+ }
+ );
+ }
+ } catch (Throwable e) {
+ if (e instanceof UndeclaredThrowableException) {
+ e = e.getCause();
+ }
+ logException(LOG, e, call);
+ if (e instanceof RpcServerException) {
+ RpcServerException rse = ((RpcServerException)e);
+ returnStatus = rse.getRpcStatusProto();
+ detailedErr = rse.getRpcErrorCodeProto();
+ } else {
+ returnStatus = RpcStatusProto.ERROR;
+ detailedErr = RpcErrorCodeProto.ERROR_APPLICATION;
+ }
+ errorClass = e.getClass().getName();
+ error = StringUtils.stringifyException(e);
+ // Remove redundant error class name from the beginning of the stack trace
+ String exceptionHdr = errorClass + ": ";
+ if (error.startsWith(exceptionHdr)) {
+ error = error.substring(exceptionHdr.length());
+ }
+ }
+ CurCall.set(null);
+ synchronized (call.connection.responseQueue) {
+ setupResponse(call, returnStatus, detailedErr,
+ value, errorClass, error);
+ call.sendResponse();
}
} catch (InterruptedException e) {
if (running) { // unexpected -- log it
@@ -2514,7 +2469,6 @@ public abstract class Server {
StringUtils.stringifyException(e));
}
} finally {
- CurCall.set(null);
IOUtils.cleanup(LOG, traceScope);
}
}
@@ -2716,7 +2670,7 @@ public abstract class Server {
* @throws IOException
*/
private void setupResponse(
- RpcCall call, RpcStatusProto status, RpcErrorCodeProto erCode,
+ Call call, RpcStatusProto status, RpcErrorCodeProto erCode,
Writable rv, String errorClass, String error)
throws IOException {
RpcResponseHeaderProto.Builder headerBuilder =
@@ -2750,7 +2704,7 @@ public abstract class Server {
}
}
- private void setupResponse(RpcCall call,
+ private void setupResponse(Call call,
RpcResponseHeaderProto header, Writable rv) throws IOException {
ResponseBuffer buf = responseBuffer.get().reset();
try {
@@ -2784,7 +2738,7 @@ public abstract class Server {
* @throws IOException
*/
private void setupResponseOldVersionFatal(ByteArrayOutputStream response,
- RpcCall call,
+ Call call,
Writable rv, String errorClass, String error)
throws IOException {
final int OLD_VERSION_FATAL_STATUS = -1;
@@ -2797,7 +2751,7 @@ public abstract class Server {
call.setResponse(ByteBuffer.wrap(response.toByteArray()));
}
- private void wrapWithSasl(RpcCall call) throws IOException {
+ private void wrapWithSasl(Call call) throws IOException {
if (call.connection.saslServer != null) {
byte[] token = call.rpcResponse.array();
// synchronization may be needed since there can be multiple Handler
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org