You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by br...@apache.org on 2020/08/03 07:26:11 UTC

[hadoop] branch branch-3.3 updated: YARN-10229. [Federation] Client should be able to submit application to RM directly using normal client conf. Contributed by Bilwa S T.

This is an automated email from the ASF dual-hosted git repository.

brahma pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 643ff48  YARN-10229. [Federation] Client should be able to submit application to RM directly using normal client conf. Contributed by Bilwa S T.
643ff48 is described below

commit 643ff4881dba5379741c45292376fd98f0a32ba0
Author: Brahma Reddy Battula <br...@apache.org>
AuthorDate: Mon Aug 3 12:54:36 2020 +0530

    YARN-10229. [Federation] Client should be able to submit application to RM directly using normal client conf. Contributed by Bilwa S T.
    
    (cherry picked from commit eac558380fd7d3c2e78b8956e2080688bb1dd8bb)
---
 .../nodemanager/amrmproxy/AMRMProxyService.java    | 35 ++++++++++++++++++++--
 .../amrmproxy/TestAMRMProxyService.java            | 21 +++++++++++++
 2 files changed, 53 insertions(+), 3 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
index d3c4a1d..fe278f3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
@@ -108,6 +108,8 @@ public class AMRMProxyService extends CompositeService implements
   private Map<ApplicationId, RequestInterceptorChainWrapper> applPipelineMap;
   private RegistryOperations registry;
   private AMRMProxyMetrics metrics;
+  private FederationStateStoreFacade federationFacade;
+  private boolean federationEnabled = false;
 
   /**
    * Creates an instance of the service.
@@ -144,7 +146,10 @@ public class AMRMProxyService extends CompositeService implements
           RegistryOperations.class);
       addService(this.registry);
     }
-
+    this.federationFacade = FederationStateStoreFacade.getInstance();
+    this.federationEnabled =
+        conf.getBoolean(YarnConfiguration.FEDERATION_ENABLED,
+            YarnConfiguration.DEFAULT_FEDERATION_ENABLED);
     super.serviceInit(conf);
   }
 
@@ -389,13 +394,22 @@ public class AMRMProxyService extends CompositeService implements
       throws IOException, YarnException {
     long startTime = clock.getTime();
     try {
-      LOG.info("Callback received for initializing request "
-          + "processing pipeline for an AM");
       ContainerTokenIdentifier containerTokenIdentifierForKey =
           BuilderUtils.newContainerTokenIdentifier(request.getContainerToken());
       ApplicationAttemptId appAttemptId =
           containerTokenIdentifierForKey.getContainerID()
               .getApplicationAttemptId();
+      ApplicationId applicationID = appAttemptId.getApplicationId();
+      // Checking if application is there in federation state store only
+      // if federation is enabled. If
+      // application is submitted to router then it adds it in statestore.
+      // if application is not found in statestore that means its
+      // submitted to RM
+      if (!checkIfAppExistsInStateStore(applicationID)) {
+        return;
+      }
+      LOG.info("Callback received for initializing request "
+          + "processing pipeline for an AM");
       Credentials credentials = YarnServerSecurityUtils
           .parseCredentials(request.getContainerLaunchContext());
 
@@ -772,6 +786,21 @@ public class AMRMProxyService extends CompositeService implements
     }
   }
 
+  boolean checkIfAppExistsInStateStore(ApplicationId applicationID) {
+    if (!federationEnabled) {
+      return true;
+    }
+
+    try {
+      // Check if app is there in state store. If app is not there then it
+      // throws Exception
+      this.federationFacade.getApplicationHomeSubCluster(applicationID);
+    } catch (YarnException ex) {
+      return false;
+    }
+    return true;
+  }
+
   @SuppressWarnings("unchecked")
   private Token<AMRMTokenIdentifier> getFirstAMRMToken(
       Collection<Token<? extends TokenIdentifier>> allTokens) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
index b269fa4..60e3838 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
@@ -662,6 +662,27 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
     Assert.assertEquals(0, state.getAppContexts().size());
   }
 
+  @Test
+  public void testCheckIfAppExistsInStateStore()
+      throws IOException, YarnException {
+    ApplicationId appId = ApplicationId.newInstance(0, 0);
+    Configuration conf = createConfiguration();
+    conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+
+    createAndStartAMRMProxyService(conf);
+
+    Assert.assertEquals(false,
+        getAMRMProxyService().checkIfAppExistsInStateStore(appId));
+
+    Configuration distConf = createConfiguration();
+    conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
+
+    createAndStartAMRMProxyService(distConf);
+
+    Assert.assertEquals(true,
+        getAMRMProxyService().checkIfAppExistsInStateStore(appId));
+  }
+
   /**
    * A mock intercepter implementation that uses the same mockRM instance across
    * restart.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org