You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2022/06/09 00:34:53 UTC

[hadoop] branch trunk updated: YARN-11159. Support failApplicationAttempt, updateApplicationPriority, updateApplicationTimeouts API's for Federation (#4396)

This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 98ca6fa10a9 YARN-11159. Support failApplicationAttempt, updateApplicationPriority, updateApplicationTimeouts API's for Federation (#4396)
98ca6fa10a9 is described below

commit 98ca6fa10a93257dc7a72d534128864ff159cc88
Author: slfan1989 <55...@users.noreply.github.com>
AuthorDate: Wed Jun 8 17:34:43 2022 -0700

    YARN-11159. Support failApplicationAttempt, updateApplicationPriority, updateApplicationTimeouts API's for Federation (#4396)
---
 .../hadoop/yarn/server/router/RouterMetrics.java   | 109 +++++++++++++-
 .../clientrm/FederationClientInterceptor.java      | 122 +++++++++++++++-
 .../yarn/server/router/TestRouterMetrics.java      |  98 +++++++++++++
 .../clientrm/TestFederationClientInterceptor.java  | 160 ++++++++++++++++++++-
 .../TestableFederationClientInterceptor.java       |   3 +
 .../src/test/resources/yarn-site.xml               |   4 +
 6 files changed, 487 insertions(+), 9 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
index 1399b2f4b22..b02b3e155fa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
@@ -75,6 +75,12 @@ public final class RouterMetrics {
   private MutableGaugeInt numListReservationsFailedRetrieved;
   @Metric("# of getResourceTypeInfo failed to be retrieved")
   private MutableGaugeInt numGetResourceTypeInfo;
+  @Metric("# of failApplicationAttempt failed to be retrieved")
+  private MutableGaugeInt numFailAppAttemptFailedRetrieved;
+  @Metric("# of updateApplicationPriority failed to be retrieved")
+  private MutableGaugeInt numUpdateAppPriorityFailedRetrieved;
+  @Metric("# of updateApplicationPriority failed to be retrieved")
+  private MutableGaugeInt numUpdateAppTimeoutsFailedRetrieved;
 
   // Aggregate metrics are shared, and don't have to be looked up per call
   @Metric("Total number of successful Submitted apps and latency(ms)")
@@ -114,6 +120,12 @@ public final class RouterMetrics {
   private MutableRate totalSucceededListReservationsRetrieved;
   @Metric("Total number of successful Retrieved getResourceTypeInfo and latency(ms)")
   private MutableRate totalSucceededGetResourceTypeInfoRetrieved;
+  @Metric("Total number of successful Retrieved failApplicationAttempt and latency(ms)")
+  private MutableRate totalSucceededFailAppAttemptRetrieved;
+  @Metric("Total number of successful Retrieved updateApplicationPriority and latency(ms)")
+  private MutableRate totalSucceededUpdateAppPriorityRetrieved;
+  @Metric("Total number of successful Retrieved updateApplicationTimeouts and latency(ms)")
+  private MutableRate totalSucceededUpdateAppTimeoutsRetrieved;
 
   /**
    * Provide quantile counters for all latencies.
@@ -135,8 +147,11 @@ public final class RouterMetrics {
   private MutableQuantiles getContainerLatency;
   private MutableQuantiles listReservationsLatency;
   private MutableQuantiles listResourceTypeInfoLatency;
+  private MutableQuantiles failAppAttemptLatency;
+  private MutableQuantiles updateAppPriorityLatency;
+  private MutableQuantiles updateAppTimeoutsLatency;
 
-  private static volatile RouterMetrics INSTANCE = null;
+  private static volatile RouterMetrics instance = null;
   private static MetricsRegistry registry;
 
   private RouterMetrics() {
@@ -201,25 +216,37 @@ public final class RouterMetrics {
     listResourceTypeInfoLatency =
         registry.newQuantiles("getResourceTypeInfoLatency",
             "latency of get resource type info", "ops", "latency", 10);
+
+    failAppAttemptLatency =
+        registry.newQuantiles("failApplicationAttemptLatency",
+            "latency of fail application attempt", "ops", "latency", 10);
+
+    updateAppPriorityLatency =
+        registry.newQuantiles("updateApplicationPriorityLatency",
+            "latency of update application priority", "ops", "latency", 10);
+
+    updateAppTimeoutsLatency =
+        registry.newQuantiles("updateApplicationTimeoutsLatency",
+            "latency of update application timeouts", "ops", "latency", 10);
   }
 
   public static RouterMetrics getMetrics() {
     if (!isInitialized.get()) {
       synchronized (RouterMetrics.class) {
-        if (INSTANCE == null) {
-          INSTANCE = DefaultMetricsSystem.instance().register("RouterMetrics",
+        if (instance == null) {
+          instance = DefaultMetricsSystem.instance().register("RouterMetrics",
               "Metrics for the Yarn Router", new RouterMetrics());
           isInitialized.set(true);
         }
       }
     }
-    return INSTANCE;
+    return instance;
   }
 
   @VisibleForTesting
   synchronized static void destroy() {
     isInitialized.set(false);
-    INSTANCE = null;
+    instance = null;
   }
 
   @VisibleForTesting
@@ -307,6 +334,21 @@ public final class RouterMetrics {
     return totalSucceededGetResourceTypeInfoRetrieved.lastStat().numSamples();
   }
 
+  @VisibleForTesting
+  public long getNumSucceededFailAppAttemptRetrieved() {
+    return totalSucceededFailAppAttemptRetrieved.lastStat().numSamples();
+  }
+
+  @VisibleForTesting
+  public long getNumSucceededUpdateAppPriorityRetrieved() {
+    return totalSucceededUpdateAppPriorityRetrieved.lastStat().numSamples();
+  }
+
+  @VisibleForTesting
+  public long getNumSucceededUpdateAppTimeoutsRetrieved() {
+    return totalSucceededUpdateAppTimeoutsRetrieved.lastStat().numSamples();
+  }
+
   @VisibleForTesting
   public double getLatencySucceededAppsCreated() {
     return totalSucceededAppsCreated.lastStat().mean();
@@ -392,6 +434,21 @@ public final class RouterMetrics {
     return totalSucceededGetResourceTypeInfoRetrieved.lastStat().mean();
   }
 
+  @VisibleForTesting
+  public double getLatencySucceededFailAppAttemptRetrieved() {
+    return totalSucceededFailAppAttemptRetrieved.lastStat().mean();
+  }
+
+  @VisibleForTesting
+  public double getLatencySucceededUpdateAppPriorityRetrieved() {
+    return totalSucceededUpdateAppPriorityRetrieved.lastStat().mean();
+  }
+
+  @VisibleForTesting
+  public double getLatencySucceededUpdateAppTimeoutsRetrieved() {
+    return totalSucceededUpdateAppTimeoutsRetrieved.lastStat().mean();
+  }
+
   @VisibleForTesting
   public int getAppsFailedCreated() {
     return numAppsFailedCreated.value();
@@ -477,6 +534,21 @@ public final class RouterMetrics {
     return numGetResourceTypeInfo.value();
   }
 
+  @VisibleForTesting
+  public int getFailApplicationAttemptFailedRetrieved() {
+    return numFailAppAttemptFailedRetrieved.value();
+  }
+
+  @VisibleForTesting
+  public int getUpdateApplicationPriorityFailedRetrieved() {
+    return numUpdateAppPriorityFailedRetrieved.value();
+  }
+
+  @VisibleForTesting
+  public int getUpdateApplicationTimeoutsFailedRetrieved() {
+    return numUpdateAppTimeoutsFailedRetrieved.value();
+  }
+
   public void succeededAppsCreated(long duration) {
     totalSucceededAppsCreated.add(duration);
     getNewApplicationLatency.add(duration);
@@ -562,6 +634,21 @@ public final class RouterMetrics {
     listResourceTypeInfoLatency.add(duration);
   }
 
+  public void succeededFailAppAttemptRetrieved(long duration) {
+    totalSucceededFailAppAttemptRetrieved.add(duration);
+    failAppAttemptLatency.add(duration);
+  }
+
+  public void succeededUpdateAppPriorityRetrieved(long duration) {
+    totalSucceededUpdateAppPriorityRetrieved.add(duration);
+    updateAppPriorityLatency.add(duration);
+  }
+
+  public void succeededUpdateAppTimeoutsRetrieved(long duration) {
+    totalSucceededUpdateAppTimeoutsRetrieved.add(duration);
+    updateAppTimeoutsLatency.add(duration);
+  }
+
   public void incrAppsFailedCreated() {
     numAppsFailedCreated.incr();
   }
@@ -629,4 +716,16 @@ public final class RouterMetrics {
   public void incrResourceTypeInfoFailedRetrieved() {
     numGetResourceTypeInfo.incr();
   }
+
+  public void incrFailAppAttemptFailedRetrieved() {
+    numFailAppAttemptFailedRetrieved.incr();
+  }
+
+  public void incrUpdateAppPriorityFailedRetrieved() {
+    numUpdateAppPriorityFailedRetrieved.incr();
+  }
+
+  public void incrUpdateApplicationTimeoutsRetrieved() {
+    numUpdateAppTimeoutsFailedRetrieved.incr();
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
index f92e3566ca2..fec62d4b080 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
@@ -1213,14 +1213,92 @@ public class FederationClientInterceptor
   @Override
   public FailApplicationAttemptResponse failApplicationAttempt(
       FailApplicationAttemptRequest request) throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+    if (request == null || request.getApplicationAttemptId() == null
+          || request.getApplicationAttemptId().getApplicationId() == null) {
+      routerMetrics.incrFailAppAttemptFailedRetrieved();
+      RouterServerUtil.logAndThrowException(
+          "Missing failApplicationAttempt request or applicationId " +
+          "or applicationAttemptId information.", null);
+    }
+    long startTime = clock.getTime();
+    SubClusterId subClusterId = null;
+    ApplicationId applicationId = request.getApplicationAttemptId().getApplicationId();
+
+    try {
+      subClusterId = getApplicationHomeSubCluster(applicationId);
+    } catch (YarnException e) {
+      routerMetrics.incrFailAppAttemptFailedRetrieved();
+      RouterServerUtil.logAndThrowException("ApplicationAttempt " +
+          request.getApplicationAttemptId() + " belongs to Application " +
+          request.getApplicationAttemptId().getApplicationId() +
+          " does not exist in FederationStateStore.", e);
+    }
+
+    ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
+    FailApplicationAttemptResponse response = null;
+    try {
+      response = clientRMProxy.failApplicationAttempt(request);
+    } catch (Exception e) {
+      routerMetrics.incrFailAppAttemptFailedRetrieved();
+      RouterServerUtil.logAndThrowException("Unable to get the applicationAttempt report for " +
+          request.getApplicationAttemptId() + " to SubCluster " + subClusterId.getId(), e);
+    }
+
+    if (response == null) {
+      LOG.error("No response when attempting to retrieve the report of " +
+          "the applicationAttempt {} to SubCluster {}.",
+          request.getApplicationAttemptId(), subClusterId.getId());
+    }
+
+    long stopTime = clock.getTime();
+    routerMetrics.succeededFailAppAttemptRetrieved(stopTime - startTime);
+    return response;
   }
 
   @Override
   public UpdateApplicationPriorityResponse updateApplicationPriority(
       UpdateApplicationPriorityRequest request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+    if (request == null || request.getApplicationId() == null
+            || request.getApplicationPriority() == null) {
+      routerMetrics.incrUpdateAppPriorityFailedRetrieved();
+      RouterServerUtil.logAndThrowException(
+          "Missing updateApplicationPriority request or applicationId " +
+          "or applicationPriority information.", null);
+    }
+
+    long startTime = clock.getTime();
+    SubClusterId subClusterId = null;
+    ApplicationId applicationId = request.getApplicationId();
+
+    try {
+      subClusterId = getApplicationHomeSubCluster(applicationId);
+    } catch (YarnException e) {
+      routerMetrics.incrUpdateAppPriorityFailedRetrieved();
+      RouterServerUtil.logAndThrowException("Application " +
+              request.getApplicationId() +
+              " does not exist in FederationStateStore.", e);
+    }
+
+    ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
+    UpdateApplicationPriorityResponse response = null;
+    try {
+      response = clientRMProxy.updateApplicationPriority(request);
+    } catch (Exception e) {
+      routerMetrics.incrFailAppAttemptFailedRetrieved();
+      RouterServerUtil.logAndThrowException("Unable to update application priority for " +
+          request.getApplicationId() + " to SubCluster " + subClusterId.getId(), e);
+    }
+
+    if (response == null) {
+      LOG.error("No response when update application priority of " +
+           "the applicationId {} to SubCluster {}.",
+           applicationId, subClusterId.getId());
+    }
+
+    long stopTime = clock.getTime();
+    routerMetrics.succeededUpdateAppPriorityRetrieved(stopTime - startTime);
+    return response;
   }
 
   @Override
@@ -1233,7 +1311,45 @@ public class FederationClientInterceptor
   public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
       UpdateApplicationTimeoutsRequest request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+    if (request == null || request.getApplicationId() == null
+            || request.getApplicationTimeouts() == null) {
+      routerMetrics.incrUpdateApplicationTimeoutsRetrieved();
+      RouterServerUtil.logAndThrowException(
+          "Missing updateApplicationTimeouts request or applicationId " +
+          "or applicationTimeouts information.", null);
+    }
+
+    long startTime = clock.getTime();
+    SubClusterId subClusterId = null;
+    ApplicationId applicationId = request.getApplicationId();
+    try {
+      subClusterId = getApplicationHomeSubCluster(applicationId);
+    } catch (YarnException e) {
+      routerMetrics.incrFailAppAttemptFailedRetrieved();
+      RouterServerUtil.logAndThrowException("Application " +
+          request.getApplicationId() +
+          " does not exist in FederationStateStore.", e);
+    }
+
+    ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
+    UpdateApplicationTimeoutsResponse response = null;
+    try {
+      response = clientRMProxy.updateApplicationTimeouts(request);
+    } catch (Exception e) {
+      routerMetrics.incrFailAppAttemptFailedRetrieved();
+      RouterServerUtil.logAndThrowException("Unable to update application timeout for " +
+          request.getApplicationId() + " to SubCluster " + subClusterId.getId(), e);
+    }
+
+    if (response == null) {
+      LOG.error("No response when update application timeout of " +
+          "the applicationId {} to SubCluster {}.",
+          applicationId, subClusterId.getId());
+    }
+
+    long stopTime = clock.getTime();
+    routerMetrics.succeededUpdateAppTimeoutsRetrieved(stopTime - startTime);
+    return response;
   }
 
   @Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
index a4df82f9dcb..4b1049e8b64 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
@@ -398,6 +398,21 @@ public class TestRouterMetrics {
       LOG.info("Mocked: failed getResourceTypeInfo call");
       metrics.incrResourceTypeInfoFailedRetrieved();
     }
+
+    public void getFailApplicationAttempt() {
+      LOG.info("Mocked: failed failApplicationAttempt call");
+      metrics.incrFailAppAttemptFailedRetrieved();
+    }
+
+    public void getUpdateApplicationPriority() {
+      LOG.info("Mocked: failed updateApplicationPriority call");
+      metrics.incrUpdateAppPriorityFailedRetrieved();
+    }
+
+    public void getUpdateApplicationTimeouts() {
+      LOG.info("Mocked: failed updateApplicationTimeouts call");
+      metrics.incrUpdateApplicationTimeoutsRetrieved();
+    }
   }
 
   // Records successes for all calls
@@ -493,6 +508,21 @@ public class TestRouterMetrics {
       LOG.info("Mocked: successful getResourceTypeInfo call with duration {}", duration);
       metrics.succeededGetResourceTypeInfoRetrieved(duration);
     }
+
+    public void getFailApplicationAttempt(long duration) {
+      LOG.info("Mocked: successful failApplicationAttempt call with duration {}", duration);
+      metrics.succeededFailAppAttemptRetrieved(duration);
+    }
+
+    public void getUpdateApplicationPriority(long duration) {
+      LOG.info("Mocked: successful updateApplicationPriority call with duration {}", duration);
+      metrics.succeededUpdateAppPriorityRetrieved(duration);
+    }
+
+    public void getUpdateApplicationTimeouts(long duration) {
+      LOG.info("Mocked: successful updateApplicationTimeouts call with duration {}", duration);
+      metrics.succeededUpdateAppTimeoutsRetrieved(duration);
+    }
   }
 
   @Test
@@ -708,4 +738,72 @@ public class TestRouterMetrics {
     Assert.assertEquals(totalBadBefore + 1, metrics.getGetResourceTypeInfoRetrieved());
   }
 
+  @Test
+  public void testSucceededFailApplicationAttempt() {
+    long totalGoodBefore = metrics.getNumSucceededFailAppAttemptRetrieved();
+    goodSubCluster.getFailApplicationAttempt(150);
+    Assert.assertEquals(totalGoodBefore + 1,
+        metrics.getNumSucceededFailAppAttemptRetrieved());
+    Assert.assertEquals(150,
+        metrics.getLatencySucceededFailAppAttemptRetrieved(), ASSERT_DOUBLE_DELTA);
+    goodSubCluster.getFailApplicationAttempt(300);
+    Assert.assertEquals(totalGoodBefore + 2,
+        metrics.getNumSucceededFailAppAttemptRetrieved());
+    Assert.assertEquals(225,
+        metrics.getLatencySucceededFailAppAttemptRetrieved(), ASSERT_DOUBLE_DELTA);
+  }
+
+  @Test
+  public void testFailApplicationAttemptFailed() {
+    long totalBadBefore = metrics.getFailApplicationAttemptFailedRetrieved();
+    badSubCluster.getFailApplicationAttempt();
+    Assert.assertEquals(totalBadBefore + 1, metrics.getFailApplicationAttemptFailedRetrieved());
+  }
+
+  @Test
+  public void testSucceededUpdateApplicationPriority() {
+    long totalGoodBefore = metrics.getNumSucceededUpdateAppPriorityRetrieved();
+    goodSubCluster.getUpdateApplicationPriority(150);
+    Assert.assertEquals(totalGoodBefore + 1,
+        metrics.getNumSucceededUpdateAppPriorityRetrieved());
+    Assert.assertEquals(150,
+        metrics.getLatencySucceededUpdateAppPriorityRetrieved(), ASSERT_DOUBLE_DELTA);
+    goodSubCluster.getUpdateApplicationPriority(300);
+    Assert.assertEquals(totalGoodBefore + 2,
+        metrics.getNumSucceededUpdateAppPriorityRetrieved());
+    Assert.assertEquals(225,
+        metrics.getLatencySucceededUpdateAppPriorityRetrieved(), ASSERT_DOUBLE_DELTA);
+  }
+
+  @Test
+  public void testUpdateApplicationPriorityFailed() {
+    long totalBadBefore = metrics.getUpdateApplicationPriorityFailedRetrieved();
+    badSubCluster.getUpdateApplicationPriority();
+    Assert.assertEquals(totalBadBefore + 1,
+        metrics.getUpdateApplicationPriorityFailedRetrieved());
+  }
+
+  @Test
+  public void testSucceededUpdateAppTimeoutsRetrieved() {
+    long totalGoodBefore = metrics.getNumSucceededUpdateAppTimeoutsRetrieved();
+    goodSubCluster.getUpdateApplicationTimeouts(150);
+    Assert.assertEquals(totalGoodBefore + 1,
+        metrics.getNumSucceededUpdateAppTimeoutsRetrieved());
+    Assert.assertEquals(150,
+        metrics.getLatencySucceededUpdateAppTimeoutsRetrieved(), ASSERT_DOUBLE_DELTA);
+    goodSubCluster.getUpdateApplicationTimeouts(300);
+    Assert.assertEquals(totalGoodBefore + 2,
+        metrics.getNumSucceededUpdateAppTimeoutsRetrieved());
+    Assert.assertEquals(225,
+        metrics.getLatencySucceededUpdateAppTimeoutsRetrieved(), ASSERT_DOUBLE_DELTA);
+  }
+
+  @Test
+  public void testUpdateAppTimeoutsFailed() {
+    long totalBadBefore = metrics.getUpdateApplicationTimeoutsFailedRetrieved();
+    badSubCluster.getUpdateApplicationTimeouts();
+    Assert.assertEquals(totalBadBefore + 1,
+        metrics.getUpdateApplicationTimeoutsFailedRetrieved());
+  }
+
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
index 8fa52e8f92b..9ead9fbe721 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
+import java.util.HashMap;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -65,6 +66,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -75,6 +82,7 @@ import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
@@ -82,7 +90,12 @@ import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationState
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.util.Times;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
 import org.junit.Test;
@@ -109,6 +122,8 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
 
   private final static int NUM_SUBCLUSTER = 4;
 
+  private final static int APP_PRIORITY_ZERO = 0;
+
   @Override
   public void setUp() {
     super.setUpConfig();
@@ -212,7 +227,7 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
     ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class);
     ApplicationSubmissionContext context = ApplicationSubmissionContext
         .newInstance(appId, MockApps.newAppName(), "default",
-            Priority.newInstance(0), amContainerSpec, false, false, -1,
+            Priority.newInstance(APP_PRIORITY_ZERO), amContainerSpec, false, false, -1,
             Resources.createResource(
                 YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB),
             "MockApp");
@@ -898,4 +913,147 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
         interceptor.getResourceTypeInfo(GetAllResourceTypeInfoRequest.newInstance());
     Assert.assertEquals(2, response.getResourceTypeInfo().size());
   }
+
+  @Test
+  public void testFailApplicationAttempt() throws Exception {
+    LOG.info("Test FederationClientInterceptor : Fail Application Attempt request.");
+
+    // null request
+    LambdaTestUtils.intercept(YarnException.class, "Missing failApplicationAttempt request " +
+        "or applicationId or applicationAttemptId information.",
+        () -> interceptor.failApplicationAttempt(null));
+
+    // normal request
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
+
+    // Submit the application
+    SubmitApplicationResponse response = interceptor.submitApplication(request);
+
+    Assert.assertNotNull(response);
+    Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
+
+    SubClusterId subClusterId = interceptor.getApplicationHomeSubCluster(appId);
+    Assert.assertNotNull(subClusterId);
+
+    MockRM mockRM = interceptor.getMockRMs().get(subClusterId);
+    mockRM.waitForState(appId, RMAppState.ACCEPTED);
+    RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId);
+    mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
+        RMAppAttemptState.SCHEDULED);
+
+    // Call GetApplicationAttempts
+    GetApplicationAttemptsRequest attemptsRequest =
+        GetApplicationAttemptsRequest.newInstance(appId);
+    GetApplicationAttemptsResponse attemptsResponse =
+        interceptor.getApplicationAttempts(attemptsRequest);
+    Assert.assertNotNull(attemptsResponse);
+
+    ApplicationAttemptId attemptId = attemptsResponse.getApplicationAttemptList().
+        get(0).getApplicationAttemptId();
+
+    FailApplicationAttemptRequest requestFailAppAttempt =
+        FailApplicationAttemptRequest.newInstance(attemptId);
+    FailApplicationAttemptResponse responseFailAppAttempt =
+        interceptor.failApplicationAttempt(requestFailAppAttempt);
+
+    Assert.assertNotNull(responseFailAppAttempt);
+  }
+
+  @Test
+  public void testUpdateApplicationPriority() throws Exception {
+    LOG.info("Test FederationClientInterceptor : Update Application Priority request.");
+
+    // null request
+    LambdaTestUtils.intercept(YarnException.class, "Missing updateApplicationPriority request " +
+        "or applicationId or applicationPriority information.",
+        () -> interceptor.updateApplicationPriority(null));
+
+    // normal request
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
+
+    // Submit the application
+    SubmitApplicationResponse response = interceptor.submitApplication(request);
+
+    Assert.assertNotNull(response);
+    Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
+
+    SubClusterId subClusterId = interceptor.getApplicationHomeSubCluster(appId);
+    Assert.assertNotNull(subClusterId);
+
+    MockRM mockRM = interceptor.getMockRMs().get(subClusterId);
+    mockRM.waitForState(appId, RMAppState.ACCEPTED);
+    RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId);
+    mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
+        RMAppAttemptState.SCHEDULED);
+
+    // Call GetApplicationAttempts
+    GetApplicationAttemptsRequest attemptsRequest =
+        GetApplicationAttemptsRequest.newInstance(appId);
+    GetApplicationAttemptsResponse attemptsResponse =
+        interceptor.getApplicationAttempts(attemptsRequest);
+    Assert.assertNotNull(attemptsResponse);
+
+    Priority priority = Priority.newInstance(20);
+    UpdateApplicationPriorityRequest requestUpdateAppPriority =
+        UpdateApplicationPriorityRequest.newInstance(appId, priority);
+    UpdateApplicationPriorityResponse responseAppPriority =
+        interceptor.updateApplicationPriority(requestUpdateAppPriority);
+
+    Assert.assertNotNull(responseAppPriority);
+    Assert.assertEquals(20,
+        responseAppPriority.getApplicationPriority().getPriority());
+  }
+
+  @Test
+  public void testUpdateApplicationTimeouts() throws Exception {
+    LOG.info("Test FederationClientInterceptor : Update Application Timeouts request.");
+
+    // null request
+    LambdaTestUtils.intercept(YarnException.class, "Missing updateApplicationTimeouts request " +
+        "or applicationId or applicationTimeouts information.",
+        () -> interceptor.updateApplicationTimeouts(null));
+
+    // normal request
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
+
+    // Submit the application
+    SubmitApplicationResponse response = interceptor.submitApplication(request);
+
+    Assert.assertNotNull(response);
+    Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
+
+    SubClusterId subClusterId = interceptor.getApplicationHomeSubCluster(appId);
+    Assert.assertNotNull(subClusterId);
+
+    MockRM mockRM = interceptor.getMockRMs().get(subClusterId);
+    mockRM.waitForState(appId, RMAppState.ACCEPTED);
+    RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId);
+    mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
+        RMAppAttemptState.SCHEDULED);
+
+    // Call GetApplicationAttempts
+    GetApplicationAttemptsRequest attemptsRequest =
+        GetApplicationAttemptsRequest.newInstance(appId);
+    GetApplicationAttemptsResponse attemptsResponse =
+        interceptor.getApplicationAttempts(attemptsRequest);
+    Assert.assertNotNull(attemptsResponse);
+
+    String appTimeout =
+        Times.formatISO8601(System.currentTimeMillis() + 5 * 1000);
+    Map<ApplicationTimeoutType, String> applicationTimeouts = new HashMap<>();
+    applicationTimeouts.put(ApplicationTimeoutType.LIFETIME, appTimeout);
+
+    UpdateApplicationTimeoutsRequest timeoutsRequest =
+        UpdateApplicationTimeoutsRequest.newInstance(appId, applicationTimeouts);
+    UpdateApplicationTimeoutsResponse timeoutsResponse =
+        interceptor.updateApplicationTimeouts(timeoutsRequest);
+
+    String responseTimeOut =
+        timeoutsResponse.getApplicationTimeouts().get(ApplicationTimeoutType.LIFETIME);
+    Assert.assertNotNull(timeoutsResponse);
+    Assert.assertEquals(appTimeout, responseTimeOut);
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
index c97d0532425..202a286696a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
@@ -115,4 +115,7 @@ public class TestableFederationClientInterceptor
     }
   }
 
+  public ConcurrentHashMap<SubClusterId, MockRM> getMockRMs() {
+    return mockRMs;
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/resources/yarn-site.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/resources/yarn-site.xml
index f3e0de3604b..310a1612486 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/resources/yarn-site.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/resources/yarn-site.xml
@@ -27,4 +27,8 @@
     <name>yarn.resourcemanager.webapp.address</name>
     <value>0.0.0.0:8080</value>
   </property>
+  <property>
+    <name>yarn.cluster.max-application-priority</name>
+    <value>50</value>
+  </property>
 </configuration>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org