You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2016/10/03 22:58:46 UTC
[33/57] [abbrv] hadoop git commit: HADOOP-13537. Support external
calls in the RPC call queue. Contributed by Daryn Sharp.
HADOOP-13537. Support external calls in the RPC call queue. Contributed by Daryn Sharp.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/236ac773
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/236ac773
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/236ac773
Branch: refs/heads/HDFS-10467
Commit: 236ac773c964fa21d6d5f1496023cd61818dd3b1
Parents: ee0c722
Author: Kihwal Lee <ki...@apache.org>
Authored: Thu Sep 29 13:27:30 2016 -0500
Committer: Kihwal Lee <ki...@apache.org>
Committed: Thu Sep 29 13:27:30 2016 -0500
----------------------------------------------------------------------
.../dev-support/findbugsExcludeFile.xml | 5 ++
.../org/apache/hadoop/ipc/ExternalCall.java | 91 ++++++++++++++++++++
.../main/java/org/apache/hadoop/ipc/Server.java | 63 +++++++++-----
.../java/org/apache/hadoop/ipc/TestRPC.java | 85 ++++++++++++++++++
4 files changed, 221 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/236ac773/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
index ec7c396..bded4b99 100644
--- a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
@@ -405,4 +405,9 @@
<Bug pattern="NP_NULL_PARAM_DEREF"/>
</Match>
+ <Match>
+ <Class name="org.apache.hadoop.ipc.ExternalCall"/>
+ <Filed name="done"/>
+ <Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER"/>
+ </Match>
</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/236ac773/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java
new file mode 100644
index 0000000..9b4cbcf
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.ipc.Server.Call;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public abstract class ExternalCall<T> extends Call {
+ private final PrivilegedExceptionAction<T> action;
+ private final AtomicBoolean done = new AtomicBoolean();
+ private T result;
+ private Throwable error;
+
+ public ExternalCall(PrivilegedExceptionAction<T> action) {
+ this.action = action;
+ }
+
+ public abstract UserGroupInformation getRemoteUser();
+
+ public final T get() throws IOException, InterruptedException {
+ waitForCompletion();
+ if (error != null) {
+ if (error instanceof IOException) {
+ throw (IOException)error;
+ } else {
+ throw new IOException(error);
+ }
+ }
+ return result;
+ }
+
+ // wait for response to be triggered to support postponed calls
+ private void waitForCompletion() throws InterruptedException {
+ synchronized(done) {
+ while (!done.get()) {
+ try {
+ done.wait();
+ } catch (InterruptedException ie) {
+ if (Thread.interrupted()) {
+ throw ie;
+ }
+ }
+ }
+ }
+ }
+
+ boolean isDone() {
+ return done.get();
+ }
+
+ // invoked by ipc handler
+ @Override
+ public final Void run() throws IOException {
+ try {
+ result = action.run();
+ sendResponse();
+ } catch (Throwable t) {
+ abortResponse(t);
+ }
+ return null;
+ }
+
+ @Override
+ final void doResponse(Throwable t) {
+ synchronized(done) {
+ error = t;
+ done.set(true);
+ done.notify();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/236ac773/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index f509d71..1c7e76a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -384,6 +384,11 @@ public abstract class Server {
return (call != null) ? call.getRemoteUser() : null;
}
+ public static String getProtocol() {
+ Call call = CurCall.get();
+ return (call != null) ? call.getProtocol() : null;
+ }
+
/** Return true if the invocation was through an RPC.
*/
public static boolean isRpcInvocation() {
@@ -672,6 +677,11 @@ public abstract class Server {
private int priorityLevel;
// the priority level assigned by scheduler, 0 by default
+ Call() {
+ this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT,
+ RPC.RpcKind.RPC_BUILTIN, RpcConstants.DUMMY_CLIENT_ID);
+ }
+
Call(Call call) {
this(call.callId, call.retryCount, call.rpcKind, call.clientId,
call.traceScope, call.callerContext);
@@ -703,6 +713,7 @@ public abstract class Server {
return "Call#" + callId + " Retry#" + retryCount;
}
+ @Override
public Void run() throws Exception {
return null;
}
@@ -718,6 +729,10 @@ public abstract class Server {
return (addr != null) ? addr.getHostAddress() : null;
}
+ public String getProtocol() {
+ return null;
+ }
+
/**
* Allow a IPC response to be postponed instead of sent immediately
* after the handler returns from the proxy method. The intended use
@@ -800,6 +815,11 @@ public abstract class Server {
}
@Override
+ public String getProtocol() {
+ return "rpc";
+ }
+
+ @Override
public UserGroupInformation getRemoteUser() {
return connection.user;
}
@@ -2333,33 +2353,15 @@ public abstract class Server {
// Save the priority level assignment by the scheduler
call.setPriorityLevel(callQueue.getPriorityLevel(call));
- if (callQueue.isClientBackoffEnabled()) {
- // if RPC queue is full, we will ask the RPC client to back off by
- // throwing RetriableException. Whether RPC client will honor
- // RetriableException and retry depends on client ipc retry policy.
- // For example, FailoverOnNetworkExceptionRetry handles
- // RetriableException.
- queueRequestOrAskClientToBackOff(call);
- } else {
- callQueue.put(call); // queue the call; maybe blocked here
+ try {
+ queueCall(call);
+ } catch (IOException ioe) {
+ throw new WrappedRpcServerException(
+ RpcErrorCodeProto.ERROR_RPC_SERVER, ioe);
}
incRpcCount(); // Increment the rpc count
}
- private void queueRequestOrAskClientToBackOff(Call call)
- throws WrappedRpcServerException, InterruptedException {
- // If rpc scheduler indicates back off based on performance
- // degradation such as response time or rpc queue is full,
- // we will ask the client to back off.
- if (callQueue.shouldBackOff(call) || !callQueue.offer(call)) {
- rpcMetrics.incrClientBackoff();
- RetriableException retriableException =
- new RetriableException("Server is too busy.");
- throw new WrappedRpcServerExceptionSuppressed(
- RpcErrorCodeProto.ERROR_RPC_SERVER, retriableException);
- }
- }
-
/**
* Establish RPC connection setup by negotiating SASL if required, then
* reading and authorizing the connection header
@@ -2487,6 +2489,21 @@ public abstract class Server {
}
}
+ public void queueCall(Call call) throws IOException, InterruptedException {
+ if (!callQueue.isClientBackoffEnabled()) {
+ callQueue.put(call); // queue the call; maybe blocked here
+ } else if (callQueue.shouldBackOff(call) || !callQueue.offer(call)) {
+ // If rpc scheduler indicates back off based on performance degradation
+ // such as response time or rpc queue is full, we will ask the client
+ // to back off by throwing RetriableException. Whether the client will
+ // honor RetriableException and retry depends the client and its policy.
+ // For example, IPC clients using FailoverOnNetworkExceptionRetry handle
+ // RetriableException.
+ rpcMetrics.incrClientBackoff();
+ throw new RetriableException("Server is too busy.");
+ }
+ }
+
/** Handles queued calls . */
private class Handler extends Thread {
public Handler(int instanceNumber) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/236ac773/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index ff6b25e..92d9183 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -64,6 +64,7 @@ import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -926,6 +927,90 @@ public class TestRPC extends TestRpcBase {
}
}
+ @Test(timeout=30000)
+ public void testExternalCall() throws Exception {
+ final UserGroupInformation ugi = UserGroupInformation
+ .createUserForTesting("user123", new String[0]);
+ final IOException expectedIOE = new IOException("boom");
+
+ // use 1 handler so the callq can be plugged
+ final Server server = setupTestServer(conf, 1);
+ try {
+ final AtomicBoolean result = new AtomicBoolean();
+
+ ExternalCall<String> remoteUserCall = newExtCall(ugi,
+ new PrivilegedExceptionAction<String>() {
+ @Override
+ public String run() throws Exception {
+ return UserGroupInformation.getCurrentUser().getUserName();
+ }
+ });
+
+ ExternalCall<String> exceptionCall = newExtCall(ugi,
+ new PrivilegedExceptionAction<String>() {
+ @Override
+ public String run() throws Exception {
+ throw expectedIOE;
+ }
+ });
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+
+ ExternalCall<Void> barrierCall = newExtCall(ugi,
+ new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ // notify we are in a handler and then wait to keep the callq
+ // plugged up
+ latch.countDown();
+ barrier.await();
+ return null;
+ }
+ });
+
+ server.queueCall(barrierCall);
+ server.queueCall(exceptionCall);
+ server.queueCall(remoteUserCall);
+
+ // wait for barrier call to enter the handler, check that the other 2
+ // calls are actually queued
+ latch.await();
+ assertEquals(2, server.getCallQueueLen());
+
+ // unplug the callq
+ barrier.await();
+ barrierCall.get();
+
+ // verify correct ugi is used
+ String answer = remoteUserCall.get();
+ assertEquals(ugi.getUserName(), answer);
+
+ try {
+ exceptionCall.get();
+ fail("didn't throw");
+ } catch (IOException ioe) {
+ assertEquals(expectedIOE.getMessage(), ioe.getMessage());
+ }
+ } finally {
+ server.stop();
+ }
+ }
+
+ private <T> ExternalCall<T> newExtCall(UserGroupInformation ugi,
+ PrivilegedExceptionAction<T> callable) {
+ return new ExternalCall<T>(callable) {
+ @Override
+ public String getProtocol() {
+ return "test";
+ }
+ @Override
+ public UserGroupInformation getRemoteUser() {
+ return ugi;
+ }
+ };
+ }
+
@Test
public void testRpcMetrics() throws Exception {
Server server;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org