You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2017/08/09 17:36:50 UTC

[30/51] [abbrv] hadoop git commit: YARN-6955. Handle concurrent register AM requests in FederationInterceptor. (Botong Huang via Subru).

YARN-6955. Handle concurrent register AM requests in FederationInterceptor. (Botong Huang via Subru).


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c61f2c41
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c61f2c41
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c61f2c41

Branch: refs/heads/HADOOP-13345
Commit: c61f2c419830e40bb47fb2b1fe1f7d6109ed29a9
Parents: bc20680
Author: Subru Krishnan <su...@apache.org>
Authored: Mon Aug 7 16:58:29 2017 -0700
Committer: Subru Krishnan <su...@apache.org>
Committed: Mon Aug 7 16:58:29 2017 -0700

----------------------------------------------------------------------
 .../dev-support/findbugs-exclude.xml            |  4 +-
 .../yarn/server/MockResourceManagerFacade.java  | 18 ++--
 .../amrmproxy/FederationInterceptor.java        | 43 ++++------
 .../amrmproxy/TestFederationInterceptor.java    | 88 ++++++++++++++++++--
 4 files changed, 110 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c61f2c41/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 034f03c..6825a36 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -594,11 +594,9 @@
     <Bug pattern="UL_UNRELEASED_LOCK_EXCEPTION_PATH" />
   </Match>
 
-  <!-- Ignore false alert for RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE -->
   <Match>
     <Class name="org.apache.hadoop.yarn.server.nodemanager.amrmproxy.FederationInterceptor" />
-    <Method name="registerApplicationMaster" />
-    <Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE" />
+    <Bug pattern="IS2_INCONSISTENT_SYNC" />
   </Match>
 
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c61f2c41/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
index 68c55ac..e33d7e1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
@@ -246,6 +246,16 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
 
     shouldReRegisterNext = false;
 
+    synchronized (applicationContainerIdMap) {
+      if (applicationContainerIdMap.containsKey(amrmToken)) {
+        throw new InvalidApplicationMasterRequestException(
+            AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE);
+      }
+      // Keep track of the containers that are returned to this application
+      applicationContainerIdMap.put(amrmToken, new ArrayList<ContainerId>());
+    }
+
+    // Make sure we wait for certain test cases last in the method
     synchronized (syncObj) {
       syncObj.notifyAll();
       // We reuse the port number to indicate whether the unit test want us to
@@ -261,14 +271,6 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
       }
     }
 
-    synchronized (applicationContainerIdMap) {
-      if (applicationContainerIdMap.containsKey(amrmToken)) {
-        throw new InvalidApplicationMasterRequestException(
-            AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE);
-      }
-      // Keep track of the containers that are returned to this application
-      applicationContainerIdMap.put(amrmToken, new ArrayList<ContainerId>());
-    }
     return RegisterApplicationMasterResponse.newInstance(null, null, null, null,
         null, request.getHost(), null);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c61f2c41/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index ffe47f4..28724aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -208,22 +208,25 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
    * requests from AM because of timeout between AM and AMRMProxy, which is
    * shorter than the timeout + failOver between FederationInterceptor
    * (AMRMProxy) and RM.
+   *
+   * For the same reason, this method needs to be synchronized.
    */
   @Override
-  public RegisterApplicationMasterResponse registerApplicationMaster(
-      RegisterApplicationMasterRequest request)
-      throws YarnException, IOException {
+  public synchronized RegisterApplicationMasterResponse
+      registerApplicationMaster(RegisterApplicationMasterRequest request)
+          throws YarnException, IOException {
     // If AM is calling with a different request, complain
-    if (this.amRegistrationRequest != null
-        && !this.amRegistrationRequest.equals(request)) {
-      throw new YarnException("A different request body recieved. AM should"
-          + " not call registerApplicationMaster with different request body");
+    if (this.amRegistrationRequest != null) {
+      if (!this.amRegistrationRequest.equals(request)) {
+        throw new YarnException("AM should not call "
+            + "registerApplicationMaster with a different request body");
+      }
+    } else {
+      // Save the registration request. This will be used for registering with
+      // secondary sub-clusters using UAMs, as well as re-register later
+      this.amRegistrationRequest = request;
     }
 
-    // Save the registration request. This will be used for registering with
-    // secondary sub-clusters using UAMs, as well as re-register later
-    this.amRegistrationRequest = request;
-
     /*
      * Present to AM as if we are the RM that never fails over. When actual RM
      * fails over, we always re-register automatically.
@@ -245,22 +248,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
      * is running and will breaks the elasticity feature. The registration with
      * the other sub-cluster RM will be done lazily as needed later.
      */
-    try {
-      this.amRegistrationResponse =
-          this.homeRM.registerApplicationMaster(request);
-    } catch (InvalidApplicationMasterRequestException e) {
-      if (e.getMessage()
-          .contains(AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE)) {
-        // Some other register thread might have succeeded in the meantime
-        if (this.amRegistrationResponse != null) {
-          LOG.info("Other concurrent thread registered successfully, "
-              + "simply return the same success register response");
-          return this.amRegistrationResponse;
-        }
-      }
-      // This is a real issue, throw back to AM
-      throw e;
-    }
+    this.amRegistrationResponse =
+        this.homeRM.registerApplicationMaster(request);
 
     // the queue this application belongs will be used for getting
     // AMRMProxy policy from state store.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c61f2c41/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
index 4e15323..34b0741 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
@@ -21,6 +21,11 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -36,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
 import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
 import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
@@ -234,7 +240,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
     RegisterApplicationMasterRequest registerReq =
         Records.newRecord(RegisterApplicationMasterRequest.class);
     registerReq.setHost(Integer.toString(testAppId));
-    registerReq.setRpcPort(testAppId);
+    registerReq.setRpcPort(0);
     registerReq.setTrackingUrl("");
 
     RegisterApplicationMasterResponse registerResponse =
@@ -298,7 +304,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
     RegisterApplicationMasterRequest registerReq =
         Records.newRecord(RegisterApplicationMasterRequest.class);
     registerReq.setHost(Integer.toString(testAppId));
-    registerReq.setRpcPort(testAppId);
+    registerReq.setRpcPort(0);
     registerReq.setTrackingUrl("");
 
     RegisterApplicationMasterResponse registerResponse =
@@ -338,6 +344,78 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
     Assert.assertEquals(true, finshResponse.getIsUnregistered());
   }
 
+  /*
+   * Test concurrent register threads. This is possible because the timeout
+   * between AM and AMRMProxy is shorter than the timeout + failOver between
+   * FederationInterceptor (AMRMProxy) and RM. When first call is blocked due to
+   * RM failover and AM timeout, it will call us resulting in a second register
+   * thread.
+   */
+  @Test(timeout = 5000)
+  public void testConcurrentRegister()
+      throws InterruptedException, ExecutionException {
+    ExecutorService threadpool = Executors.newCachedThreadPool();
+    ExecutorCompletionService<RegisterApplicationMasterResponse> compSvc =
+        new ExecutorCompletionService<>(threadpool);
+
+    Object syncObj = MockResourceManagerFacade.getSyncObj();
+
+    // Two register threads
+    synchronized (syncObj) {
+      // Make sure first thread will block within RM, before the second thread
+      // starts
+      LOG.info("Starting first register thread");
+      compSvc.submit(new ConcurrentRegisterAMCallable());
+
+      try {
+        LOG.info("Test main starts waiting for the first thread to block");
+        syncObj.wait();
+        LOG.info("Test main wait finished");
+      } catch (Exception e) {
+        LOG.info("Test main wait interrupted", e);
+      }
+    }
+
+    // The second thread will get already registered exception from RM.
+    LOG.info("Starting second register thread");
+    compSvc.submit(new ConcurrentRegisterAMCallable());
+
+    // Notify the first register thread to return
+    LOG.info("Let first blocked register thread move on");
+    synchronized (syncObj) {
+      syncObj.notifyAll();
+    }
+
+    // Both thread should return without exception
+    RegisterApplicationMasterResponse response = compSvc.take().get();
+    Assert.assertNotNull(response);
+
+    response = compSvc.take().get();
+    Assert.assertNotNull(response);
+
+    threadpool.shutdown();
+  }
+
+  /**
+   * A callable that calls registerAM to RM with blocking.
+   */
+  public class ConcurrentRegisterAMCallable
+      implements Callable<RegisterApplicationMasterResponse> {
+    @Override
+    public RegisterApplicationMasterResponse call() throws Exception {
+      RegisterApplicationMasterResponse response = null;
+      try {
+        // Use port number 1001 to let mock RM block in the register call
+        response = interceptor.registerApplicationMaster(
+            RegisterApplicationMasterRequest.newInstance(null, 1001, null));
+      } catch (Exception e) {
+        LOG.info("Register thread exception", e);
+        response = null;
+      }
+      return response;
+    }
+  }
+
   @Test
   public void testRequestInterceptorChainCreation() throws Exception {
     RequestInterceptor root =
@@ -381,7 +459,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
     RegisterApplicationMasterRequest registerReq =
         Records.newRecord(RegisterApplicationMasterRequest.class);
     registerReq.setHost(Integer.toString(testAppId));
-    registerReq.setRpcPort(testAppId);
+    registerReq.setRpcPort(0);
     registerReq.setTrackingUrl("");
 
     for (int i = 0; i < 2; i++) {
@@ -397,7 +475,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
     RegisterApplicationMasterRequest registerReq =
         Records.newRecord(RegisterApplicationMasterRequest.class);
     registerReq.setHost(Integer.toString(testAppId));
-    registerReq.setRpcPort(testAppId);
+    registerReq.setRpcPort(0);
     registerReq.setTrackingUrl("");
 
     RegisterApplicationMasterResponse registerResponse =
@@ -407,7 +485,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
     // Register the application second time with a different request obj
     registerReq = Records.newRecord(RegisterApplicationMasterRequest.class);
     registerReq.setHost(Integer.toString(testAppId));
-    registerReq.setRpcPort(testAppId);
+    registerReq.setRpcPort(0);
     registerReq.setTrackingUrl("different");
     try {
       registerResponse = interceptor.registerApplicationMaster(registerReq);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org