You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2022/07/23 00:06:47 UTC
[hadoop] branch trunk updated: YARN-8900. [Router] Federation: routing getContainers REST invocations transparently to multiple RMs (#4543)
This is an automated email from the ASF dual-hosted git repository.
inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5c84cb81ba9 YARN-8900. [Router] Federation: routing getContainers REST invocations transparently to multiple RMs (#4543)
5c84cb81ba9 is described below
commit 5c84cb81ba988ba4ac2c55f6b1c27f3081799989
Author: slfan1989 <55...@users.noreply.github.com>
AuthorDate: Sat Jul 23 08:06:38 2022 +0800
YARN-8900. [Router] Federation: routing getContainers REST invocations transparently to multiple RMs (#4543)
---
.../yarn/server/webapp/dao/ContainersInfo.java | 4 ++
.../router/webapp/FederationInterceptorREST.java | 71 +++++++++++++++++++++-
.../webapp/MockDefaultRequestInterceptorREST.java | 48 +++++++++++++++
.../webapp/TestFederationInterceptorREST.java | 61 ++++++++++++++++---
4 files changed, 174 insertions(+), 10 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainersInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainersInfo.java
index 545f84e3232..a1627152d5f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainersInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainersInfo.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.webapp.dao;
import java.util.ArrayList;
+import java.util.Collection;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
@@ -46,4 +47,7 @@ public class ContainersInfo {
return container;
}
+ public void addAll(Collection<ContainerInfo> containersInfo) {
+ container.addAll(containersInfo);
+ }
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
index db2a8edcb2c..10b15775af1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.router.webapp;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.security.Principal;
import java.util.ArrayList;
import java.util.Collection;
@@ -92,6 +93,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesIn
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.router.RouterMetrics;
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
+import org.apache.hadoop.yarn.server.router.clientrm.ClientMethod;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
@@ -1053,7 +1055,6 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
}
// Collect all the responses in parallel
-
for (int i = 0; i < subClustersActive.size(); i++) {
try {
Future<ClusterMetricsInfo> future = compSvc.take();
@@ -1336,7 +1337,33 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
@Override
public ContainersInfo getContainers(HttpServletRequest req,
HttpServletResponse res, String appId, String appAttemptId) {
- throw new NotImplementedException("Code is not implemented");
+
+ ContainersInfo containersInfo = new ContainersInfo();
+
+ Map<SubClusterId, SubClusterInfo> subClustersActive;
+ try {
+ subClustersActive = getActiveSubclusters();
+ } catch (NotFoundException e) {
+ LOG.error("Get all active sub cluster(s) error.", e);
+ return containersInfo;
+ }
+
+ try {
+ Class[] argsClasses = new Class[]{
+ HttpServletRequest.class, HttpServletResponse.class, String.class, String.class};
+ Object[] args = new Object[]{req, res, appId, appAttemptId};
+ ClientMethod remoteMethod = new ClientMethod("getContainers", argsClasses, args);
+ Map<SubClusterInfo, ContainersInfo> containersInfoMap =
+ invokeConcurrent(subClustersActive.values(), remoteMethod, ContainersInfo.class);
+ if (containersInfoMap != null) {
+ containersInfoMap.values().forEach(containers ->
+ containersInfo.addAll(containers.getContainers()));
+ }
+ } catch (Exception ex) {
+ LOG.error("Failed to return GetContainers.", ex);
+ }
+
+ return containersInfo;
}
@Override
@@ -1366,4 +1393,44 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
threadpool.shutdown();
}
}
+
+ private <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo> clusterIds,
+ ClientMethod request, Class<R> clazz) {
+
+ Map<SubClusterInfo, R> results = new HashMap<>();
+
+ // Send the requests in parallel
+ CompletionService<R> compSvc = new ExecutorCompletionService<>(this.threadpool);
+
+ for (final SubClusterInfo info : clusterIds) {
+ compSvc.submit(() -> {
+ DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
+ info.getSubClusterId(), info.getRMWebServiceAddress());
+ try {
+ Method method = DefaultRequestInterceptorREST.class.
+ getMethod(request.getMethodName(), request.getTypes());
+ Object retObj = method.invoke(interceptor, request.getParams());
+ R ret = clazz.cast(retObj);
+ return ret;
+ } catch (Exception e) {
+ LOG.error("SubCluster {} failed to call {} method.", info.getSubClusterId(),
+ request.getMethodName(), e);
+ return null;
+ }
+ });
+ }
+
+ clusterIds.stream().forEach(clusterId -> {
+ try {
+ Future<R> future = compSvc.take();
+ R response = future.get();
+ if (response != null) {
+ results.put(clusterId, response);
+ }
+ } catch (Throwable e) {
+ LOG.warn("SubCluster {} failed to {} report.", clusterId, request.getMethodName(), e);
+ }
+ });
+ return results;
+ }
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
index b96aa5a581c..d41a0aee580 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
@@ -25,6 +25,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
@@ -32,6 +33,12 @@ import javax.ws.rs.core.Response.Status;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@@ -45,6 +52,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
+import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
+import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -231,4 +240,43 @@ public class MockDefaultRequestInterceptorREST
public void setRunning(boolean runningMode) {
this.isRunning = runningMode;
}
+
+ @Override
+ public ContainersInfo getContainers(HttpServletRequest req, HttpServletResponse res,
+ String appId, String appAttemptId) {
+ if (!isRunning) {
+ throw new RuntimeException("RM is stopped");
+ }
+
+ // We avoid to check if the Application exists in the system because we need
+ // to validate that each subCluster returns 1 container.
+ ContainersInfo containers = new ContainersInfo();
+
+ int subClusterId = Integer.valueOf(getSubClusterId().getId());
+
+ ContainerId containerId = ContainerId.newContainerId(
+ ApplicationAttemptId.fromString(appAttemptId), subClusterId);
+ Resource allocatedResource =
+ Resource.newInstance(subClusterId, subClusterId);
+
+ NodeId assignedNode = NodeId.newInstance("Node", subClusterId);
+ Priority priority = Priority.newInstance(subClusterId);
+ long creationTime = subClusterId;
+ long finishTime = subClusterId;
+ String diagnosticInfo = "Diagnostic " + subClusterId;
+ String logUrl = "Log " + subClusterId;
+ int containerExitStatus = subClusterId;
+ ContainerState containerState = ContainerState.COMPLETE;
+ String nodeHttpAddress = "HttpAddress " + subClusterId;
+
+ ContainerReport containerReport = ContainerReport.newInstance(
+ containerId, allocatedResource, assignedNode, priority,
+ creationTime, finishTime, diagnosticInfo, logUrl,
+ containerExitStatus, containerState, nodeHttpAddress);
+
+ ContainerInfo container = new ContainerInfo(containerReport);
+ containers.add(container);
+
+ return containers;
+ }
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
index b3a7e9060c5..2e099f54e0c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
@@ -24,9 +24,11 @@ import java.util.List;
import javax.ws.rs.core.Response;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
@@ -47,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
+import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.junit.Assert;
import org.junit.Test;
@@ -160,7 +163,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
throws YarnException, IOException, InterruptedException {
ApplicationId appId =
- ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
@@ -187,7 +190,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
throws YarnException, IOException, InterruptedException {
ApplicationId appId =
- ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
@@ -259,7 +262,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
throws YarnException, IOException, InterruptedException {
ApplicationId appId =
- ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
@@ -286,7 +289,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
throws YarnException, IOException, InterruptedException {
ApplicationId appId =
- ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ ApplicationId.newInstance(Time.now(), 1);
AppState appState = new AppState("KILLED");
Response response =
@@ -317,7 +320,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
public void testForceKillApplicationEmptyRequest()
throws YarnException, IOException, InterruptedException {
ApplicationId appId =
- ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
@@ -341,7 +344,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
throws YarnException, IOException, InterruptedException {
ApplicationId appId =
- ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
@@ -478,7 +481,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
throws YarnException, IOException, InterruptedException {
ApplicationId appId =
- ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
@@ -505,7 +508,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
throws YarnException, IOException, InterruptedException {
ApplicationId appId =
- ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ ApplicationId.newInstance(Time.now(), 1);
AppState response = interceptor.getAppState(null, appId.toString());
@@ -560,4 +563,46 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
SubClusterRegisterRequest.newInstance(subClusterInfo));
}
+ @Test
+ public void testGetContainers()
+ throws YarnException, IOException, InterruptedException {
+
+ ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
+ ApplicationSubmissionContextInfo context =
+ new ApplicationSubmissionContextInfo();
+ context.setApplicationId(appId.toString());
+
+ // Submit the application we want the report later
+ Response response = interceptor.submitApplication(context, null);
+
+ Assert.assertNotNull(response);
+ Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
+
+ ApplicationAttemptId appAttempt = ApplicationAttemptId.newInstance(appId, 1);
+
+ ContainersInfo responseGet = interceptor.getContainers(
+ null, null, appId.toString(), appAttempt.toString());
+
+ Assert.assertEquals(4, responseGet.getContainers().size());
+ }
+
+ @Test
+ public void testGetContainersNotExists() {
+ ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
+ ContainersInfo response = interceptor.getContainers(null, null, appId.toString(), null);
+ Assert.assertTrue(response.getContainers().isEmpty());
+ }
+
+ @Test
+ public void testGetContainersWrongFormat() {
+ ContainersInfo response = interceptor.getContainers(null, null, "Application_wrong_id", null);
+
+ Assert.assertNotNull(response);
+ Assert.assertTrue(response.getContainers().isEmpty());
+
+ ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
+ response = interceptor.getContainers(null, null, appId.toString(), "AppAttempt_wrong_id");
+
+ Assert.assertTrue(response.getContainers().isEmpty());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org