You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2016/10/03 22:58:46 UTC

[33/57] [abbrv] hadoop git commit: HADOOP-13537. Support external calls in the RPC call queue. Contributed by Daryn Sharp.

HADOOP-13537. Support external calls in the RPC call queue. Contributed by Daryn Sharp.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/236ac773
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/236ac773
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/236ac773

Branch: refs/heads/HDFS-10467
Commit: 236ac773c964fa21d6d5f1496023cd61818dd3b1
Parents: ee0c722
Author: Kihwal Lee <ki...@apache.org>
Authored: Thu Sep 29 13:27:30 2016 -0500
Committer: Kihwal Lee <ki...@apache.org>
Committed: Thu Sep 29 13:27:30 2016 -0500

----------------------------------------------------------------------
 .../dev-support/findbugsExcludeFile.xml         |  5 ++
 .../org/apache/hadoop/ipc/ExternalCall.java     | 91 ++++++++++++++++++++
 .../main/java/org/apache/hadoop/ipc/Server.java | 63 +++++++++-----
 .../java/org/apache/hadoop/ipc/TestRPC.java     | 85 ++++++++++++++++++
 4 files changed, 221 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/236ac773/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
index ec7c396..bded4b99 100644
--- a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
@@ -405,4 +405,9 @@
     <Bug pattern="NP_NULL_PARAM_DEREF"/>
   </Match>
 
+  <Match>
+    <Class name="org.apache.hadoop.ipc.ExternalCall"/>
+    <Filed name="done"/>
+    <Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER"/>
+  </Match>
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/236ac773/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java
new file mode 100644
index 0000000..9b4cbcf
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.ipc.Server.Call;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public abstract class ExternalCall<T> extends Call {
+  private final PrivilegedExceptionAction<T> action;
+  private final AtomicBoolean done = new AtomicBoolean();
+  private T result;
+  private Throwable error;
+
+  public ExternalCall(PrivilegedExceptionAction<T> action) {
+    this.action = action;
+  }
+
+  public abstract UserGroupInformation getRemoteUser();
+
+  public final T get() throws IOException, InterruptedException {
+    waitForCompletion();
+    if (error != null) {
+      if (error instanceof IOException) {
+        throw (IOException)error;
+      } else {
+        throw new IOException(error);
+      }
+    }
+    return result;
+  }
+
+  // wait for response to be triggered to support postponed calls
+  private void waitForCompletion() throws InterruptedException {
+    synchronized(done) {
+      while (!done.get()) {
+        try {
+          done.wait();
+        } catch (InterruptedException ie) {
+          if (Thread.interrupted()) {
+            throw ie;
+          }
+        }
+      }
+    }
+  }
+
+  boolean isDone() {
+    return done.get();
+  }
+
+  // invoked by ipc handler
+  @Override
+  public final Void run() throws IOException {
+    try {
+      result = action.run();
+      sendResponse();
+    } catch (Throwable t) {
+      abortResponse(t);
+    }
+    return null;
+  }
+
+  @Override
+  final void doResponse(Throwable t) {
+    synchronized(done) {
+      error = t;
+      done.set(true);
+      done.notify();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/236ac773/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index f509d71..1c7e76a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -384,6 +384,11 @@ public abstract class Server {
     return (call != null) ? call.getRemoteUser() : null;
   }
 
+  public static String getProtocol() {
+    Call call = CurCall.get();
+    return (call != null) ? call.getProtocol() : null;
+  }
+
   /** Return true if the invocation was through an RPC.
    */
   public static boolean isRpcInvocation() {
@@ -672,6 +677,11 @@ public abstract class Server {
     private int priorityLevel;
     // the priority level assigned by scheduler, 0 by default
 
+    Call() {
+      this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT,
+        RPC.RpcKind.RPC_BUILTIN, RpcConstants.DUMMY_CLIENT_ID);
+    }
+
     Call(Call call) {
       this(call.callId, call.retryCount, call.rpcKind, call.clientId,
           call.traceScope, call.callerContext);
@@ -703,6 +713,7 @@ public abstract class Server {
       return "Call#" + callId + " Retry#" + retryCount;
     }
 
+    @Override
     public Void run() throws Exception {
       return null;
     }
@@ -718,6 +729,10 @@ public abstract class Server {
       return (addr != null) ? addr.getHostAddress() : null;
     }
 
+    public String getProtocol() {
+      return null;
+    }
+
     /**
      * Allow a IPC response to be postponed instead of sent immediately
      * after the handler returns from the proxy method.  The intended use
@@ -800,6 +815,11 @@ public abstract class Server {
     }
 
     @Override
+    public String getProtocol() {
+      return "rpc";
+    }
+
+    @Override
     public UserGroupInformation getRemoteUser() {
       return connection.user;
     }
@@ -2333,33 +2353,15 @@ public abstract class Server {
       // Save the priority level assignment by the scheduler
       call.setPriorityLevel(callQueue.getPriorityLevel(call));
 
-      if (callQueue.isClientBackoffEnabled()) {
-        // if RPC queue is full, we will ask the RPC client to back off by
-        // throwing RetriableException. Whether RPC client will honor
-        // RetriableException and retry depends on client ipc retry policy.
-        // For example, FailoverOnNetworkExceptionRetry handles
-        // RetriableException.
-        queueRequestOrAskClientToBackOff(call);
-      } else {
-        callQueue.put(call);              // queue the call; maybe blocked here
+      try {
+        queueCall(call);
+      } catch (IOException ioe) {
+        throw new WrappedRpcServerException(
+            RpcErrorCodeProto.ERROR_RPC_SERVER, ioe);
       }
       incRpcCount();  // Increment the rpc count
     }
 
-    private void queueRequestOrAskClientToBackOff(Call call)
-        throws WrappedRpcServerException, InterruptedException {
-      // If rpc scheduler indicates back off based on performance
-      // degradation such as response time or rpc queue is full,
-      // we will ask the client to back off.
-      if (callQueue.shouldBackOff(call) || !callQueue.offer(call)) {
-        rpcMetrics.incrClientBackoff();
-        RetriableException retriableException =
-            new RetriableException("Server is too busy.");
-        throw new WrappedRpcServerExceptionSuppressed(
-            RpcErrorCodeProto.ERROR_RPC_SERVER, retriableException);
-      }
-    }
-
     /**
      * Establish RPC connection setup by negotiating SASL if required, then
      * reading and authorizing the connection header
@@ -2487,6 +2489,21 @@ public abstract class Server {
     }
   }
 
+  public void queueCall(Call call) throws IOException, InterruptedException {
+    if (!callQueue.isClientBackoffEnabled()) {
+      callQueue.put(call); // queue the call; maybe blocked here
+    } else if (callQueue.shouldBackOff(call) || !callQueue.offer(call)) {
+      // If rpc scheduler indicates back off based on performance degradation
+      // such as response time or rpc queue is full, we will ask the client
+      // to back off by throwing RetriableException. Whether the client will
+      // honor RetriableException and retry depends the client and its policy.
+      // For example, IPC clients using FailoverOnNetworkExceptionRetry handle
+      // RetriableException.
+      rpcMetrics.incrClientBackoff();
+      throw new RetriableException("Server is too busy.");
+    }
+  }
+
   /** Handles queued calls . */
   private class Handler extends Thread {
     public Handler(int instanceNumber) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/236ac773/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index ff6b25e..92d9183 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -64,6 +64,7 @@ import java.net.ConnectException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -926,6 +927,90 @@ public class TestRPC extends TestRpcBase {
     }
   }
 
+  @Test(timeout=30000)
+  public void testExternalCall() throws Exception {
+    final UserGroupInformation ugi = UserGroupInformation
+        .createUserForTesting("user123", new String[0]);
+    final IOException expectedIOE = new IOException("boom");
+
+    // use 1 handler so the callq can be plugged
+    final Server server = setupTestServer(conf, 1);
+    try {
+      final AtomicBoolean result = new AtomicBoolean();
+
+      ExternalCall<String> remoteUserCall = newExtCall(ugi,
+          new PrivilegedExceptionAction<String>() {
+            @Override
+            public String run() throws Exception {
+              return UserGroupInformation.getCurrentUser().getUserName();
+            }
+          });
+
+      ExternalCall<String> exceptionCall = newExtCall(ugi,
+          new PrivilegedExceptionAction<String>() {
+            @Override
+            public String run() throws Exception {
+              throw expectedIOE;
+            }
+          });
+
+      final CountDownLatch latch = new CountDownLatch(1);
+      final CyclicBarrier barrier = new CyclicBarrier(2);
+
+      ExternalCall<Void> barrierCall = newExtCall(ugi,
+          new PrivilegedExceptionAction<Void>() {
+            @Override
+            public Void run() throws Exception {
+              // notify we are in a handler and then wait to keep the callq
+              // plugged up
+              latch.countDown();
+              barrier.await();
+              return null;
+            }
+          });
+
+      server.queueCall(barrierCall);
+      server.queueCall(exceptionCall);
+      server.queueCall(remoteUserCall);
+
+      // wait for barrier call to enter the handler, check that the other 2
+      // calls are actually queued
+      latch.await();
+      assertEquals(2, server.getCallQueueLen());
+
+      // unplug the callq
+      barrier.await();
+      barrierCall.get();
+
+      // verify correct ugi is used
+      String answer = remoteUserCall.get();
+      assertEquals(ugi.getUserName(), answer);
+
+      try {
+        exceptionCall.get();
+        fail("didn't throw");
+      } catch (IOException ioe) {
+        assertEquals(expectedIOE.getMessage(), ioe.getMessage());
+      }
+    } finally {
+      server.stop();
+    }
+  }
+
+  private <T> ExternalCall<T> newExtCall(UserGroupInformation ugi,
+      PrivilegedExceptionAction<T> callable) {
+    return new ExternalCall<T>(callable) {
+      @Override
+      public String getProtocol() {
+        return "test";
+      }
+      @Override
+      public UserGroupInformation getRemoteUser() {
+        return ugi;
+      }
+    };
+  }
+
   @Test
   public void testRpcMetrics() throws Exception {
     Server server;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org