You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2022/07/23 00:06:47 UTC

[hadoop] branch trunk updated: YARN-8900. [Router] Federation: routing getContainers REST invocations transparently to multiple RMs (#4543)

This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5c84cb81ba9 YARN-8900. [Router] Federation: routing getContainers REST invocations transparently to multiple RMs (#4543)
5c84cb81ba9 is described below

commit 5c84cb81ba988ba4ac2c55f6b1c27f3081799989
Author: slfan1989 <55...@users.noreply.github.com>
AuthorDate: Sat Jul 23 08:06:38 2022 +0800

    YARN-8900. [Router] Federation: routing getContainers REST invocations transparently to multiple RMs (#4543)
---
 .../yarn/server/webapp/dao/ContainersInfo.java     |  4 ++
 .../router/webapp/FederationInterceptorREST.java   | 71 +++++++++++++++++++++-
 .../webapp/MockDefaultRequestInterceptorREST.java  | 48 +++++++++++++++
 .../webapp/TestFederationInterceptorREST.java      | 61 ++++++++++++++++---
 4 files changed, 174 insertions(+), 10 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainersInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainersInfo.java
index 545f84e3232..a1627152d5f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainersInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainersInfo.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.yarn.server.webapp.dao;
 
 import java.util.ArrayList;
+import java.util.Collection;
 
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
@@ -46,4 +47,7 @@ public class ContainersInfo {
     return container;
   }
 
+  public void addAll(Collection<ContainerInfo> containersInfo) {
+    container.addAll(containersInfo);
+  }
 }
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
index db2a8edcb2c..10b15775af1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.router.webapp;
 
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.security.Principal;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -92,6 +93,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesIn
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
 import org.apache.hadoop.yarn.server.router.RouterMetrics;
 import org.apache.hadoop.yarn.server.router.RouterServerUtil;
+import org.apache.hadoop.yarn.server.router.clientrm.ClientMethod;
 import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
@@ -1053,7 +1055,6 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
     }
 
     // Collect all the responses in parallel
-
     for (int i = 0; i < subClustersActive.size(); i++) {
       try {
         Future<ClusterMetricsInfo> future = compSvc.take();
@@ -1336,7 +1337,33 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
   @Override
   public ContainersInfo getContainers(HttpServletRequest req,
       HttpServletResponse res, String appId, String appAttemptId) {
-    throw new NotImplementedException("Code is not implemented");
+
+    ContainersInfo containersInfo = new ContainersInfo();
+
+    Map<SubClusterId, SubClusterInfo> subClustersActive;
+    try {
+      subClustersActive = getActiveSubclusters();
+    } catch (NotFoundException e) {
+      LOG.error("Get all active sub cluster(s) error.", e);
+      return containersInfo;
+    }
+
+    try {
+      Class[] argsClasses = new Class[]{
+          HttpServletRequest.class, HttpServletResponse.class, String.class, String.class};
+      Object[] args = new Object[]{req, res, appId, appAttemptId};
+      ClientMethod remoteMethod = new ClientMethod("getContainers", argsClasses, args);
+      Map<SubClusterInfo, ContainersInfo> containersInfoMap =
+          invokeConcurrent(subClustersActive.values(), remoteMethod, ContainersInfo.class);
+      if (containersInfoMap != null) {
+        containersInfoMap.values().forEach(containers ->
+            containersInfo.addAll(containers.getContainers()));
+      }
+    } catch (Exception ex) {
+      LOG.error("Failed to return GetContainers.",  ex);
+    }
+
+    return containersInfo;
   }
 
   @Override
@@ -1366,4 +1393,44 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
       threadpool.shutdown();
     }
   }
+
+  private <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo> clusterIds,
+      ClientMethod request, Class<R> clazz) {
+
+    Map<SubClusterInfo, R> results = new HashMap<>();
+
+    // Send the requests in parallel
+    CompletionService<R> compSvc = new ExecutorCompletionService<>(this.threadpool);
+
+    for (final SubClusterInfo info : clusterIds) {
+      compSvc.submit(() -> {
+        DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
+            info.getSubClusterId(), info.getRMWebServiceAddress());
+        try {
+          Method method = DefaultRequestInterceptorREST.class.
+              getMethod(request.getMethodName(), request.getTypes());
+          Object retObj = method.invoke(interceptor, request.getParams());
+          R ret = clazz.cast(retObj);
+          return ret;
+        } catch (Exception e) {
+          LOG.error("SubCluster {} failed to call {} method.", info.getSubClusterId(),
+              request.getMethodName(), e);
+          return null;
+        }
+      });
+    }
+
+    clusterIds.stream().forEach(clusterId -> {
+      try {
+        Future<R> future = compSvc.take();
+        R response = future.get();
+        if (response != null) {
+          results.put(clusterId, response);
+        }
+      } catch (Throwable e) {
+        LOG.warn("SubCluster {} failed to {} report.", clusterId, request.getMethodName(), e);
+      }
+    });
+    return results;
+  }
 }
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
index b96aa5a581c..d41a0aee580 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
@@ -32,6 +33,12 @@ import javax.ws.rs.core.Response.Status;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
 import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@@ -45,6 +52,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
+import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
+import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
 import org.apache.hadoop.yarn.webapp.NotFoundException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -231,4 +240,43 @@ public class MockDefaultRequestInterceptorREST
   public void setRunning(boolean runningMode) {
     this.isRunning = runningMode;
   }
+
+  @Override
+  public ContainersInfo getContainers(HttpServletRequest req, HttpServletResponse res,
+      String appId, String appAttemptId) {
+    if (!isRunning) {
+      throw new RuntimeException("RM is stopped");
+    }
+
+    // We avoid to check if the Application exists in the system because we need
+    // to validate that each subCluster returns 1 container.
+    ContainersInfo containers = new ContainersInfo();
+
+    int subClusterId = Integer.valueOf(getSubClusterId().getId());
+
+    ContainerId containerId = ContainerId.newContainerId(
+        ApplicationAttemptId.fromString(appAttemptId), subClusterId);
+    Resource allocatedResource =
+        Resource.newInstance(subClusterId, subClusterId);
+
+    NodeId assignedNode = NodeId.newInstance("Node", subClusterId);
+    Priority priority = Priority.newInstance(subClusterId);
+    long creationTime = subClusterId;
+    long finishTime = subClusterId;
+    String diagnosticInfo = "Diagnostic " + subClusterId;
+    String logUrl = "Log " + subClusterId;
+    int containerExitStatus = subClusterId;
+    ContainerState containerState = ContainerState.COMPLETE;
+    String nodeHttpAddress = "HttpAddress " + subClusterId;
+
+    ContainerReport containerReport = ContainerReport.newInstance(
+        containerId, allocatedResource, assignedNode, priority,
+        creationTime, finishTime, diagnosticInfo, logUrl,
+        containerExitStatus, containerState, nodeHttpAddress);
+
+    ContainerInfo container = new ContainerInfo(containerReport);
+    containers.add(container);
+
+    return containers;
+  }
 }
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
index b3a7e9060c5..2e099f54e0c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
@@ -24,9 +24,11 @@ import java.util.List;
 
 import javax.ws.rs.core.Response;
 
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
@@ -47,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
+import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
 import org.apache.hadoop.yarn.util.MonotonicClock;
 import org.junit.Assert;
 import org.junit.Test;
@@ -160,7 +163,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
       throws YarnException, IOException, InterruptedException {
 
     ApplicationId appId =
-        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+        ApplicationId.newInstance(Time.now(), 1);
 
     ApplicationSubmissionContextInfo context =
         new ApplicationSubmissionContextInfo();
@@ -187,7 +190,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
       throws YarnException, IOException, InterruptedException {
 
     ApplicationId appId =
-        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+        ApplicationId.newInstance(Time.now(), 1);
     ApplicationSubmissionContextInfo context =
         new ApplicationSubmissionContextInfo();
     context.setApplicationId(appId.toString());
@@ -259,7 +262,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
       throws YarnException, IOException, InterruptedException {
 
     ApplicationId appId =
-        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+        ApplicationId.newInstance(Time.now(), 1);
     ApplicationSubmissionContextInfo context =
         new ApplicationSubmissionContextInfo();
     context.setApplicationId(appId.toString());
@@ -286,7 +289,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
       throws YarnException, IOException, InterruptedException {
 
     ApplicationId appId =
-        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+        ApplicationId.newInstance(Time.now(), 1);
     AppState appState = new AppState("KILLED");
 
     Response response =
@@ -317,7 +320,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
   public void testForceKillApplicationEmptyRequest()
       throws YarnException, IOException, InterruptedException {
     ApplicationId appId =
-        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+        ApplicationId.newInstance(Time.now(), 1);
 
     ApplicationSubmissionContextInfo context =
         new ApplicationSubmissionContextInfo();
@@ -341,7 +344,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
       throws YarnException, IOException, InterruptedException {
 
     ApplicationId appId =
-        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+        ApplicationId.newInstance(Time.now(), 1);
     ApplicationSubmissionContextInfo context =
         new ApplicationSubmissionContextInfo();
     context.setApplicationId(appId.toString());
@@ -478,7 +481,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
       throws YarnException, IOException, InterruptedException {
 
     ApplicationId appId =
-        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+        ApplicationId.newInstance(Time.now(), 1);
     ApplicationSubmissionContextInfo context =
         new ApplicationSubmissionContextInfo();
     context.setApplicationId(appId.toString());
@@ -505,7 +508,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
       throws YarnException, IOException, InterruptedException {
 
     ApplicationId appId =
-        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+        ApplicationId.newInstance(Time.now(), 1);
 
     AppState response = interceptor.getAppState(null, appId.toString());
 
@@ -560,4 +563,46 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
             SubClusterRegisterRequest.newInstance(subClusterInfo));
   }
 
+  @Test
+  public void testGetContainers()
+      throws YarnException, IOException, InterruptedException {
+
+    ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
+    ApplicationSubmissionContextInfo context =
+        new ApplicationSubmissionContextInfo();
+    context.setApplicationId(appId.toString());
+
+    // Submit the application we want the report later
+    Response response = interceptor.submitApplication(context, null);
+
+    Assert.assertNotNull(response);
+    Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
+
+    ApplicationAttemptId appAttempt = ApplicationAttemptId.newInstance(appId, 1);
+
+    ContainersInfo responseGet = interceptor.getContainers(
+        null, null, appId.toString(), appAttempt.toString());
+
+    Assert.assertEquals(4, responseGet.getContainers().size());
+  }
+
+  @Test
+  public void testGetContainersNotExists() {
+    ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
+    ContainersInfo response = interceptor.getContainers(null, null, appId.toString(), null);
+    Assert.assertTrue(response.getContainers().isEmpty());
+  }
+
+  @Test
+  public void testGetContainersWrongFormat() {
+    ContainersInfo response = interceptor.getContainers(null, null, "Application_wrong_id", null);
+
+    Assert.assertNotNull(response);
+    Assert.assertTrue(response.getContainers().isEmpty());
+
+    ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
+    response = interceptor.getContainers(null, null, appId.toString(), "AppAttempt_wrong_id");
+
+    Assert.assertTrue(response.getContainers().isEmpty());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org