You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/05/09 05:47:30 UTC

[47/50] [abbrv] hadoop git commit: YARN-6234. Support multiple attempts on the node when AMRMProxy is enabled. (Giovanni Matteo Fumarola via Subru).

YARN-6234. Support multiple attempts on the node when AMRMProxy is enabled. (Giovanni Matteo Fumarola via Subru).


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cd9ff27f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cd9ff27f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cd9ff27f

Branch: refs/heads/HDFS-7240
Commit: cd9ff27ffc9369820d0c39200a11bf00e6a767c8
Parents: 1769b12
Author: Subru Krishnan <su...@apache.org>
Authored: Mon May 8 16:41:30 2017 -0700
Committer: Subru Krishnan <su...@apache.org>
Committed: Mon May 8 16:41:30 2017 -0700

----------------------------------------------------------------------
 .../nodemanager/amrmproxy/AMRMProxyService.java | 32 ++++++++++++++---
 .../amrmproxy/TestAMRMProxyService.java         | 36 ++++++++++++++++++++
 2 files changed, 63 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd9ff27f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
index 9f2d9a1..2696bca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
@@ -270,18 +270,40 @@ public class AMRMProxyService extends AbstractService implements
    * @param user
    * @param amrmToken
    */
-  protected void initializePipeline(
-      ApplicationAttemptId applicationAttemptId, String user,
-      Token<AMRMTokenIdentifier> amrmToken,
+  protected void initializePipeline(ApplicationAttemptId applicationAttemptId,
+      String user, Token<AMRMTokenIdentifier> amrmToken,
       Token<AMRMTokenIdentifier> localToken) {
     RequestInterceptorChainWrapper chainWrapper = null;
     synchronized (applPipelineMap) {
-      if (applPipelineMap.containsKey(applicationAttemptId.getApplicationId())) {
+      if (applPipelineMap
+          .containsKey(applicationAttemptId.getApplicationId())) {
         LOG.warn("Request to start an already existing appId was received. "
             + " This can happen if an application failed and a new attempt "
             + "was created on this machine.  ApplicationId: "
             + applicationAttemptId.toString());
-        return;
+
+        RequestInterceptorChainWrapper chainWrapperBackup =
+            this.applPipelineMap.get(applicationAttemptId.getApplicationId());
+        if (chainWrapperBackup != null
+            && chainWrapperBackup.getApplicationAttemptId() != null
+            && !chainWrapperBackup.getApplicationAttemptId()
+                .equals(applicationAttemptId)) {
+          // Remove the existing pipeline
+          LOG.info("Remove the previous pipeline for ApplicationId: "
+              + applicationAttemptId.toString());
+          RequestInterceptorChainWrapper pipeline =
+              applPipelineMap.remove(applicationAttemptId.getApplicationId());
+          try {
+            pipeline.getRootInterceptor().shutdown();
+          } catch (Throwable ex) {
+            LOG.warn(
+                "Failed to shutdown the request processing pipeline for app:"
+                    + applicationAttemptId.getApplicationId(),
+                ex);
+          }
+        } else {
+          return;
+        }
       }
 
       chainWrapper = new RequestInterceptorChainWrapper();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd9ff27f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
index 7fffddf..837278c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -27,10 +28,14 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService.RequestInterceptorChainWrapper;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
 import org.junit.Test;
@@ -380,6 +385,37 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
     }
   }
 
+  @Test
+  public void testMultipleAttemptsSameNode()
+      throws YarnException, IOException, Exception {
+
+    String user = "hadoop";
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    ApplicationAttemptId applicationAttemptId;
+
+    // First Attempt
+
+    RegisterApplicationMasterResponse response1 =
+        registerApplicationMaster(appId.getId());
+    Assert.assertNotNull(response1);
+
+    AllocateResponse allocateResponse = allocate(appId.getId());
+    Assert.assertNotNull(allocateResponse);
+
+    // Second Attempt
+
+    applicationAttemptId = ApplicationAttemptId.newInstance(appId, 2);
+    getAMRMProxyService().initializePipeline(applicationAttemptId, user, null,
+        null);
+
+    RequestInterceptorChainWrapper chain2 =
+        getAMRMProxyService().getPipelines().get(appId);
+    Assert.assertEquals(applicationAttemptId, chain2.getApplicationAttemptId());
+
+    allocateResponse = allocate(appId.getId());
+    Assert.assertNotNull(allocateResponse);
+  }
+
   private List<Container> getContainersAndAssert(int appId,
       int numberOfResourceRequests) throws Exception {
     AllocateRequest allocateRequest =


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org