You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2022/06/09 00:34:53 UTC
[hadoop] branch trunk updated: YARN-11159. Support failApplicationAttempt, updateApplicationPriority, updateApplicationTimeouts API's for Federation (#4396)
This is an automated email from the ASF dual-hosted git repository.
inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 98ca6fa10a9 YARN-11159. Support failApplicationAttempt, updateApplicationPriority, updateApplicationTimeouts API's for Federation (#4396)
98ca6fa10a9 is described below
commit 98ca6fa10a93257dc7a72d534128864ff159cc88
Author: slfan1989 <55...@users.noreply.github.com>
AuthorDate: Wed Jun 8 17:34:43 2022 -0700
YARN-11159. Support failApplicationAttempt, updateApplicationPriority, updateApplicationTimeouts API's for Federation (#4396)
---
.../hadoop/yarn/server/router/RouterMetrics.java | 109 +++++++++++++-
.../clientrm/FederationClientInterceptor.java | 122 +++++++++++++++-
.../yarn/server/router/TestRouterMetrics.java | 98 +++++++++++++
.../clientrm/TestFederationClientInterceptor.java | 160 ++++++++++++++++++++-
.../TestableFederationClientInterceptor.java | 3 +
.../src/test/resources/yarn-site.xml | 4 +
6 files changed, 487 insertions(+), 9 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
index 1399b2f4b22..b02b3e155fa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
@@ -75,6 +75,12 @@ public final class RouterMetrics {
private MutableGaugeInt numListReservationsFailedRetrieved;
@Metric("# of getResourceTypeInfo failed to be retrieved")
private MutableGaugeInt numGetResourceTypeInfo;
+ @Metric("# of failApplicationAttempt failed to be retrieved")
+ private MutableGaugeInt numFailAppAttemptFailedRetrieved;
+ @Metric("# of updateApplicationPriority failed to be retrieved")
+ private MutableGaugeInt numUpdateAppPriorityFailedRetrieved;
+ @Metric("# of updateApplicationPriority failed to be retrieved")
+ private MutableGaugeInt numUpdateAppTimeoutsFailedRetrieved;
// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
@@ -114,6 +120,12 @@ public final class RouterMetrics {
private MutableRate totalSucceededListReservationsRetrieved;
@Metric("Total number of successful Retrieved getResourceTypeInfo and latency(ms)")
private MutableRate totalSucceededGetResourceTypeInfoRetrieved;
+ @Metric("Total number of successful Retrieved failApplicationAttempt and latency(ms)")
+ private MutableRate totalSucceededFailAppAttemptRetrieved;
+ @Metric("Total number of successful Retrieved updateApplicationPriority and latency(ms)")
+ private MutableRate totalSucceededUpdateAppPriorityRetrieved;
+ @Metric("Total number of successful Retrieved updateApplicationTimeouts and latency(ms)")
+ private MutableRate totalSucceededUpdateAppTimeoutsRetrieved;
/**
* Provide quantile counters for all latencies.
@@ -135,8 +147,11 @@ public final class RouterMetrics {
private MutableQuantiles getContainerLatency;
private MutableQuantiles listReservationsLatency;
private MutableQuantiles listResourceTypeInfoLatency;
+ private MutableQuantiles failAppAttemptLatency;
+ private MutableQuantiles updateAppPriorityLatency;
+ private MutableQuantiles updateAppTimeoutsLatency;
- private static volatile RouterMetrics INSTANCE = null;
+ private static volatile RouterMetrics instance = null;
private static MetricsRegistry registry;
private RouterMetrics() {
@@ -201,25 +216,37 @@ public final class RouterMetrics {
listResourceTypeInfoLatency =
registry.newQuantiles("getResourceTypeInfoLatency",
"latency of get resource type info", "ops", "latency", 10);
+
+ failAppAttemptLatency =
+ registry.newQuantiles("failApplicationAttemptLatency",
+ "latency of fail application attempt", "ops", "latency", 10);
+
+ updateAppPriorityLatency =
+ registry.newQuantiles("updateApplicationPriorityLatency",
+ "latency of update application priority", "ops", "latency", 10);
+
+ updateAppTimeoutsLatency =
+ registry.newQuantiles("updateApplicationTimeoutsLatency",
+ "latency of update application timeouts", "ops", "latency", 10);
}
public static RouterMetrics getMetrics() {
if (!isInitialized.get()) {
synchronized (RouterMetrics.class) {
- if (INSTANCE == null) {
- INSTANCE = DefaultMetricsSystem.instance().register("RouterMetrics",
+ if (instance == null) {
+ instance = DefaultMetricsSystem.instance().register("RouterMetrics",
"Metrics for the Yarn Router", new RouterMetrics());
isInitialized.set(true);
}
}
}
- return INSTANCE;
+ return instance;
}
@VisibleForTesting
synchronized static void destroy() {
isInitialized.set(false);
- INSTANCE = null;
+ instance = null;
}
@VisibleForTesting
@@ -307,6 +334,21 @@ public final class RouterMetrics {
return totalSucceededGetResourceTypeInfoRetrieved.lastStat().numSamples();
}
+ @VisibleForTesting
+ public long getNumSucceededFailAppAttemptRetrieved() {
+ return totalSucceededFailAppAttemptRetrieved.lastStat().numSamples();
+ }
+
+ @VisibleForTesting
+ public long getNumSucceededUpdateAppPriorityRetrieved() {
+ return totalSucceededUpdateAppPriorityRetrieved.lastStat().numSamples();
+ }
+
+ @VisibleForTesting
+ public long getNumSucceededUpdateAppTimeoutsRetrieved() {
+ return totalSucceededUpdateAppTimeoutsRetrieved.lastStat().numSamples();
+ }
+
@VisibleForTesting
public double getLatencySucceededAppsCreated() {
return totalSucceededAppsCreated.lastStat().mean();
@@ -392,6 +434,21 @@ public final class RouterMetrics {
return totalSucceededGetResourceTypeInfoRetrieved.lastStat().mean();
}
+ @VisibleForTesting
+ public double getLatencySucceededFailAppAttemptRetrieved() {
+ return totalSucceededFailAppAttemptRetrieved.lastStat().mean();
+ }
+
+ @VisibleForTesting
+ public double getLatencySucceededUpdateAppPriorityRetrieved() {
+ return totalSucceededUpdateAppPriorityRetrieved.lastStat().mean();
+ }
+
+ @VisibleForTesting
+ public double getLatencySucceededUpdateAppTimeoutsRetrieved() {
+ return totalSucceededUpdateAppTimeoutsRetrieved.lastStat().mean();
+ }
+
@VisibleForTesting
public int getAppsFailedCreated() {
return numAppsFailedCreated.value();
@@ -477,6 +534,21 @@ public final class RouterMetrics {
return numGetResourceTypeInfo.value();
}
+ @VisibleForTesting
+ public int getFailApplicationAttemptFailedRetrieved() {
+ return numFailAppAttemptFailedRetrieved.value();
+ }
+
+ @VisibleForTesting
+ public int getUpdateApplicationPriorityFailedRetrieved() {
+ return numUpdateAppPriorityFailedRetrieved.value();
+ }
+
+ @VisibleForTesting
+ public int getUpdateApplicationTimeoutsFailedRetrieved() {
+ return numUpdateAppTimeoutsFailedRetrieved.value();
+ }
+
public void succeededAppsCreated(long duration) {
totalSucceededAppsCreated.add(duration);
getNewApplicationLatency.add(duration);
@@ -562,6 +634,21 @@ public final class RouterMetrics {
listResourceTypeInfoLatency.add(duration);
}
+ public void succeededFailAppAttemptRetrieved(long duration) {
+ totalSucceededFailAppAttemptRetrieved.add(duration);
+ failAppAttemptLatency.add(duration);
+ }
+
+ public void succeededUpdateAppPriorityRetrieved(long duration) {
+ totalSucceededUpdateAppPriorityRetrieved.add(duration);
+ updateAppPriorityLatency.add(duration);
+ }
+
+ public void succeededUpdateAppTimeoutsRetrieved(long duration) {
+ totalSucceededUpdateAppTimeoutsRetrieved.add(duration);
+ updateAppTimeoutsLatency.add(duration);
+ }
+
public void incrAppsFailedCreated() {
numAppsFailedCreated.incr();
}
@@ -629,4 +716,16 @@ public final class RouterMetrics {
public void incrResourceTypeInfoFailedRetrieved() {
numGetResourceTypeInfo.incr();
}
+
+ public void incrFailAppAttemptFailedRetrieved() {
+ numFailAppAttemptFailedRetrieved.incr();
+ }
+
+ public void incrUpdateAppPriorityFailedRetrieved() {
+ numUpdateAppPriorityFailedRetrieved.incr();
+ }
+
+ public void incrUpdateApplicationTimeoutsRetrieved() {
+ numUpdateAppTimeoutsFailedRetrieved.incr();
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
index f92e3566ca2..fec62d4b080 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
@@ -1213,14 +1213,92 @@ public class FederationClientInterceptor
@Override
public FailApplicationAttemptResponse failApplicationAttempt(
FailApplicationAttemptRequest request) throws YarnException, IOException {
- throw new NotImplementedException("Code is not implemented");
+ if (request == null || request.getApplicationAttemptId() == null
+ || request.getApplicationAttemptId().getApplicationId() == null) {
+ routerMetrics.incrFailAppAttemptFailedRetrieved();
+ RouterServerUtil.logAndThrowException(
+ "Missing failApplicationAttempt request or applicationId " +
+ "or applicationAttemptId information.", null);
+ }
+ long startTime = clock.getTime();
+ SubClusterId subClusterId = null;
+ ApplicationId applicationId = request.getApplicationAttemptId().getApplicationId();
+
+ try {
+ subClusterId = getApplicationHomeSubCluster(applicationId);
+ } catch (YarnException e) {
+ routerMetrics.incrFailAppAttemptFailedRetrieved();
+ RouterServerUtil.logAndThrowException("ApplicationAttempt " +
+ request.getApplicationAttemptId() + " belongs to Application " +
+ request.getApplicationAttemptId().getApplicationId() +
+ " does not exist in FederationStateStore.", e);
+ }
+
+ ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
+ FailApplicationAttemptResponse response = null;
+ try {
+ response = clientRMProxy.failApplicationAttempt(request);
+ } catch (Exception e) {
+ routerMetrics.incrFailAppAttemptFailedRetrieved();
+ RouterServerUtil.logAndThrowException("Unable to get the applicationAttempt report for " +
+ request.getApplicationAttemptId() + " to SubCluster " + subClusterId.getId(), e);
+ }
+
+ if (response == null) {
+ LOG.error("No response when attempting to retrieve the report of " +
+ "the applicationAttempt {} to SubCluster {}.",
+ request.getApplicationAttemptId(), subClusterId.getId());
+ }
+
+ long stopTime = clock.getTime();
+ routerMetrics.succeededFailAppAttemptRetrieved(stopTime - startTime);
+ return response;
}
@Override
public UpdateApplicationPriorityResponse updateApplicationPriority(
UpdateApplicationPriorityRequest request)
throws YarnException, IOException {
- throw new NotImplementedException("Code is not implemented");
+ if (request == null || request.getApplicationId() == null
+ || request.getApplicationPriority() == null) {
+ routerMetrics.incrUpdateAppPriorityFailedRetrieved();
+ RouterServerUtil.logAndThrowException(
+ "Missing updateApplicationPriority request or applicationId " +
+ "or applicationPriority information.", null);
+ }
+
+ long startTime = clock.getTime();
+ SubClusterId subClusterId = null;
+ ApplicationId applicationId = request.getApplicationId();
+
+ try {
+ subClusterId = getApplicationHomeSubCluster(applicationId);
+ } catch (YarnException e) {
+ routerMetrics.incrUpdateAppPriorityFailedRetrieved();
+ RouterServerUtil.logAndThrowException("Application " +
+ request.getApplicationId() +
+ " does not exist in FederationStateStore.", e);
+ }
+
+ ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
+ UpdateApplicationPriorityResponse response = null;
+ try {
+ response = clientRMProxy.updateApplicationPriority(request);
+ } catch (Exception e) {
+ routerMetrics.incrFailAppAttemptFailedRetrieved();
+ RouterServerUtil.logAndThrowException("Unable to update application priority for " +
+ request.getApplicationId() + " to SubCluster " + subClusterId.getId(), e);
+ }
+
+ if (response == null) {
+ LOG.error("No response when update application priority of " +
+ "the applicationId {} to SubCluster {}.",
+ applicationId, subClusterId.getId());
+ }
+
+ long stopTime = clock.getTime();
+ routerMetrics.succeededUpdateAppPriorityRetrieved(stopTime - startTime);
+ return response;
}
@Override
@@ -1233,7 +1311,45 @@ public class FederationClientInterceptor
public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
UpdateApplicationTimeoutsRequest request)
throws YarnException, IOException {
- throw new NotImplementedException("Code is not implemented");
+ if (request == null || request.getApplicationId() == null
+ || request.getApplicationTimeouts() == null) {
+ routerMetrics.incrUpdateApplicationTimeoutsRetrieved();
+ RouterServerUtil.logAndThrowException(
+ "Missing updateApplicationTimeouts request or applicationId " +
+ "or applicationTimeouts information.", null);
+ }
+
+ long startTime = clock.getTime();
+ SubClusterId subClusterId = null;
+ ApplicationId applicationId = request.getApplicationId();
+ try {
+ subClusterId = getApplicationHomeSubCluster(applicationId);
+ } catch (YarnException e) {
+ routerMetrics.incrFailAppAttemptFailedRetrieved();
+ RouterServerUtil.logAndThrowException("Application " +
+ request.getApplicationId() +
+ " does not exist in FederationStateStore.", e);
+ }
+
+ ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
+ UpdateApplicationTimeoutsResponse response = null;
+ try {
+ response = clientRMProxy.updateApplicationTimeouts(request);
+ } catch (Exception e) {
+ routerMetrics.incrFailAppAttemptFailedRetrieved();
+ RouterServerUtil.logAndThrowException("Unable to update application timeout for " +
+ request.getApplicationId() + " to SubCluster " + subClusterId.getId(), e);
+ }
+
+ if (response == null) {
+ LOG.error("No response when update application timeout of " +
+ "the applicationId {} to SubCluster {}.",
+ applicationId, subClusterId.getId());
+ }
+
+ long stopTime = clock.getTime();
+ routerMetrics.succeededUpdateAppTimeoutsRetrieved(stopTime - startTime);
+ return response;
}
@Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
index a4df82f9dcb..4b1049e8b64 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
@@ -398,6 +398,21 @@ public class TestRouterMetrics {
LOG.info("Mocked: failed getResourceTypeInfo call");
metrics.incrResourceTypeInfoFailedRetrieved();
}
+
+ public void getFailApplicationAttempt() {
+ LOG.info("Mocked: failed failApplicationAttempt call");
+ metrics.incrFailAppAttemptFailedRetrieved();
+ }
+
+ public void getUpdateApplicationPriority() {
+ LOG.info("Mocked: failed updateApplicationPriority call");
+ metrics.incrUpdateAppPriorityFailedRetrieved();
+ }
+
+ public void getUpdateApplicationTimeouts() {
+ LOG.info("Mocked: failed updateApplicationTimeouts call");
+ metrics.incrUpdateApplicationTimeoutsRetrieved();
+ }
}
// Records successes for all calls
@@ -493,6 +508,21 @@ public class TestRouterMetrics {
LOG.info("Mocked: successful getResourceTypeInfo call with duration {}", duration);
metrics.succeededGetResourceTypeInfoRetrieved(duration);
}
+
+ public void getFailApplicationAttempt(long duration) {
+ LOG.info("Mocked: successful failApplicationAttempt call with duration {}", duration);
+ metrics.succeededFailAppAttemptRetrieved(duration);
+ }
+
+ public void getUpdateApplicationPriority(long duration) {
+ LOG.info("Mocked: successful updateApplicationPriority call with duration {}", duration);
+ metrics.succeededUpdateAppPriorityRetrieved(duration);
+ }
+
+ public void getUpdateApplicationTimeouts(long duration) {
+ LOG.info("Mocked: successful updateApplicationTimeouts call with duration {}", duration);
+ metrics.succeededUpdateAppTimeoutsRetrieved(duration);
+ }
}
@Test
@@ -708,4 +738,72 @@ public class TestRouterMetrics {
Assert.assertEquals(totalBadBefore + 1, metrics.getGetResourceTypeInfoRetrieved());
}
+ @Test
+ public void testSucceededFailApplicationAttempt() {
+ long totalGoodBefore = metrics.getNumSucceededFailAppAttemptRetrieved();
+ goodSubCluster.getFailApplicationAttempt(150);
+ Assert.assertEquals(totalGoodBefore + 1,
+ metrics.getNumSucceededFailAppAttemptRetrieved());
+ Assert.assertEquals(150,
+ metrics.getLatencySucceededFailAppAttemptRetrieved(), ASSERT_DOUBLE_DELTA);
+ goodSubCluster.getFailApplicationAttempt(300);
+ Assert.assertEquals(totalGoodBefore + 2,
+ metrics.getNumSucceededFailAppAttemptRetrieved());
+ Assert.assertEquals(225,
+ metrics.getLatencySucceededFailAppAttemptRetrieved(), ASSERT_DOUBLE_DELTA);
+ }
+
+ @Test
+ public void testFailApplicationAttemptFailed() {
+ long totalBadBefore = metrics.getFailApplicationAttemptFailedRetrieved();
+ badSubCluster.getFailApplicationAttempt();
+ Assert.assertEquals(totalBadBefore + 1, metrics.getFailApplicationAttemptFailedRetrieved());
+ }
+
+ @Test
+ public void testSucceededUpdateApplicationPriority() {
+ long totalGoodBefore = metrics.getNumSucceededUpdateAppPriorityRetrieved();
+ goodSubCluster.getUpdateApplicationPriority(150);
+ Assert.assertEquals(totalGoodBefore + 1,
+ metrics.getNumSucceededUpdateAppPriorityRetrieved());
+ Assert.assertEquals(150,
+ metrics.getLatencySucceededUpdateAppPriorityRetrieved(), ASSERT_DOUBLE_DELTA);
+ goodSubCluster.getUpdateApplicationPriority(300);
+ Assert.assertEquals(totalGoodBefore + 2,
+ metrics.getNumSucceededUpdateAppPriorityRetrieved());
+ Assert.assertEquals(225,
+ metrics.getLatencySucceededUpdateAppPriorityRetrieved(), ASSERT_DOUBLE_DELTA);
+ }
+
+ @Test
+ public void testUpdateApplicationPriorityFailed() {
+ long totalBadBefore = metrics.getUpdateApplicationPriorityFailedRetrieved();
+ badSubCluster.getUpdateApplicationPriority();
+ Assert.assertEquals(totalBadBefore + 1,
+ metrics.getUpdateApplicationPriorityFailedRetrieved());
+ }
+
+ @Test
+ public void testSucceededUpdateAppTimeoutsRetrieved() {
+ long totalGoodBefore = metrics.getNumSucceededUpdateAppTimeoutsRetrieved();
+ goodSubCluster.getUpdateApplicationTimeouts(150);
+ Assert.assertEquals(totalGoodBefore + 1,
+ metrics.getNumSucceededUpdateAppTimeoutsRetrieved());
+ Assert.assertEquals(150,
+ metrics.getLatencySucceededUpdateAppTimeoutsRetrieved(), ASSERT_DOUBLE_DELTA);
+ goodSubCluster.getUpdateApplicationTimeouts(300);
+ Assert.assertEquals(totalGoodBefore + 2,
+ metrics.getNumSucceededUpdateAppTimeoutsRetrieved());
+ Assert.assertEquals(225,
+ metrics.getLatencySucceededUpdateAppTimeoutsRetrieved(), ASSERT_DOUBLE_DELTA);
+ }
+
+ @Test
+ public void testUpdateAppTimeoutsFailed() {
+ long totalBadBefore = metrics.getUpdateApplicationTimeoutsFailedRetrieved();
+ badSubCluster.getUpdateApplicationTimeouts();
+ Assert.assertEquals(totalBadBefore + 1,
+ metrics.getUpdateApplicationTimeoutsFailedRetrieved());
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
index 8fa52e8f92b..9ead9fbe721 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
@@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
+import java.util.HashMap;
import java.util.Set;
import java.util.stream.Collectors;
@@ -65,6 +66,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -75,6 +82,7 @@ import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
@@ -82,7 +90,12 @@ import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationState
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Test;
@@ -109,6 +122,8 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
private final static int NUM_SUBCLUSTER = 4;
+ private final static int APP_PRIORITY_ZERO = 0;
+
@Override
public void setUp() {
super.setUpConfig();
@@ -212,7 +227,7 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class);
ApplicationSubmissionContext context = ApplicationSubmissionContext
.newInstance(appId, MockApps.newAppName(), "default",
- Priority.newInstance(0), amContainerSpec, false, false, -1,
+ Priority.newInstance(APP_PRIORITY_ZERO), amContainerSpec, false, false, -1,
Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB),
"MockApp");
@@ -898,4 +913,147 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
interceptor.getResourceTypeInfo(GetAllResourceTypeInfoRequest.newInstance());
Assert.assertEquals(2, response.getResourceTypeInfo().size());
}
+
+ @Test
+ public void testFailApplicationAttempt() throws Exception {
+ LOG.info("Test FederationClientInterceptor : Fail Application Attempt request.");
+
+ // null request
+ LambdaTestUtils.intercept(YarnException.class, "Missing failApplicationAttempt request " +
+ "or applicationId or applicationAttemptId information.",
+ () -> interceptor.failApplicationAttempt(null));
+
+ // normal request
+ ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
+
+ // Submit the application
+ SubmitApplicationResponse response = interceptor.submitApplication(request);
+
+ Assert.assertNotNull(response);
+ Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
+
+ SubClusterId subClusterId = interceptor.getApplicationHomeSubCluster(appId);
+ Assert.assertNotNull(subClusterId);
+
+ MockRM mockRM = interceptor.getMockRMs().get(subClusterId);
+ mockRM.waitForState(appId, RMAppState.ACCEPTED);
+ RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId);
+ mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
+ RMAppAttemptState.SCHEDULED);
+
+ // Call GetApplicationAttempts
+ GetApplicationAttemptsRequest attemptsRequest =
+ GetApplicationAttemptsRequest.newInstance(appId);
+ GetApplicationAttemptsResponse attemptsResponse =
+ interceptor.getApplicationAttempts(attemptsRequest);
+ Assert.assertNotNull(attemptsResponse);
+
+ ApplicationAttemptId attemptId = attemptsResponse.getApplicationAttemptList().
+ get(0).getApplicationAttemptId();
+
+ FailApplicationAttemptRequest requestFailAppAttempt =
+ FailApplicationAttemptRequest.newInstance(attemptId);
+ FailApplicationAttemptResponse responseFailAppAttempt =
+ interceptor.failApplicationAttempt(requestFailAppAttempt);
+
+ Assert.assertNotNull(responseFailAppAttempt);
+ }
+
+ @Test
+ public void testUpdateApplicationPriority() throws Exception {
+ LOG.info("Test FederationClientInterceptor : Update Application Priority request.");
+
+ // null request
+ LambdaTestUtils.intercept(YarnException.class, "Missing updateApplicationPriority request " +
+ "or applicationId or applicationPriority information.",
+ () -> interceptor.updateApplicationPriority(null));
+
+ // normal request
+ ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
+
+ // Submit the application
+ SubmitApplicationResponse response = interceptor.submitApplication(request);
+
+ Assert.assertNotNull(response);
+ Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
+
+ SubClusterId subClusterId = interceptor.getApplicationHomeSubCluster(appId);
+ Assert.assertNotNull(subClusterId);
+
+ MockRM mockRM = interceptor.getMockRMs().get(subClusterId);
+ mockRM.waitForState(appId, RMAppState.ACCEPTED);
+ RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId);
+ mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
+ RMAppAttemptState.SCHEDULED);
+
+ // Call GetApplicationAttempts
+ GetApplicationAttemptsRequest attemptsRequest =
+ GetApplicationAttemptsRequest.newInstance(appId);
+ GetApplicationAttemptsResponse attemptsResponse =
+ interceptor.getApplicationAttempts(attemptsRequest);
+ Assert.assertNotNull(attemptsResponse);
+
+ Priority priority = Priority.newInstance(20);
+ UpdateApplicationPriorityRequest requestUpdateAppPriority =
+ UpdateApplicationPriorityRequest.newInstance(appId, priority);
+ UpdateApplicationPriorityResponse responseAppPriority =
+ interceptor.updateApplicationPriority(requestUpdateAppPriority);
+
+ Assert.assertNotNull(responseAppPriority);
+ Assert.assertEquals(20,
+ responseAppPriority.getApplicationPriority().getPriority());
+ }
+
+ @Test
+ public void testUpdateApplicationTimeouts() throws Exception {
+ LOG.info("Test FederationClientInterceptor : Update Application Timeouts request.");
+
+ // null request
+ LambdaTestUtils.intercept(YarnException.class, "Missing updateApplicationTimeouts request " +
+ "or applicationId or applicationTimeouts information.",
+ () -> interceptor.updateApplicationTimeouts(null));
+
+ // normal request
+ ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
+
+ // Submit the application
+ SubmitApplicationResponse response = interceptor.submitApplication(request);
+
+ Assert.assertNotNull(response);
+ Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
+
+ SubClusterId subClusterId = interceptor.getApplicationHomeSubCluster(appId);
+ Assert.assertNotNull(subClusterId);
+
+ MockRM mockRM = interceptor.getMockRMs().get(subClusterId);
+ mockRM.waitForState(appId, RMAppState.ACCEPTED);
+ RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId);
+ mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
+ RMAppAttemptState.SCHEDULED);
+
+ // Call GetApplicationAttempts
+ GetApplicationAttemptsRequest attemptsRequest =
+ GetApplicationAttemptsRequest.newInstance(appId);
+ GetApplicationAttemptsResponse attemptsResponse =
+ interceptor.getApplicationAttempts(attemptsRequest);
+ Assert.assertNotNull(attemptsResponse);
+
+ String appTimeout =
+ Times.formatISO8601(System.currentTimeMillis() + 5 * 1000);
+ Map<ApplicationTimeoutType, String> applicationTimeouts = new HashMap<>();
+ applicationTimeouts.put(ApplicationTimeoutType.LIFETIME, appTimeout);
+
+ UpdateApplicationTimeoutsRequest timeoutsRequest =
+ UpdateApplicationTimeoutsRequest.newInstance(appId, applicationTimeouts);
+ UpdateApplicationTimeoutsResponse timeoutsResponse =
+ interceptor.updateApplicationTimeouts(timeoutsRequest);
+
+ String responseTimeOut =
+ timeoutsResponse.getApplicationTimeouts().get(ApplicationTimeoutType.LIFETIME);
+ Assert.assertNotNull(timeoutsResponse);
+ Assert.assertEquals(appTimeout, responseTimeOut);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
index c97d0532425..202a286696a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
@@ -115,4 +115,7 @@ public class TestableFederationClientInterceptor
}
}
+ public ConcurrentHashMap<SubClusterId, MockRM> getMockRMs() {
+ return mockRMs;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/resources/yarn-site.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/resources/yarn-site.xml
index f3e0de3604b..310a1612486 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/resources/yarn-site.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/resources/yarn-site.xml
@@ -27,4 +27,8 @@
<name>yarn.resourcemanager.webapp.address</name>
<value>0.0.0.0:8080</value>
</property>
+ <property>
+ <name>yarn.cluster.max-application-priority</name>
+ <value>50</value>
+ </property>
</configuration>
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org