You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2017/09/22 01:20:11 UTC
[6/8] hadoop git commit: YARN-6923. Metrics for Federation Router.
(Giovanni Matteo Fumarola via asuresh)
YARN-6923. Metrics for Federation Router. (Giovanni Matteo Fumarola via asuresh)
(cherry picked from commit ae8fb13b312b30de50d65b5450b565d50d690e9e)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2aacb9d3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2aacb9d3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2aacb9d3
Branch: refs/heads/branch-2
Commit: 2aacb9d3fbf21308daff828639be10acbcd5e5cc
Parents: ac090b3
Author: Arun Suresh <as...@apache.org>
Authored: Mon Aug 21 22:50:24 2017 -0700
Committer: Carlo Curino <cu...@apache.org>
Committed: Thu Sep 21 18:09:37 2017 -0700
----------------------------------------------------------------------
.../yarn/server/router/RouterMetrics.java | 203 +++++++++++++++
.../clientrm/FederationClientInterceptor.java | 37 ++-
.../webapp/FederationInterceptorREST.java | 116 +++++++--
.../yarn/server/router/TestRouterMetrics.java | 248 +++++++++++++++++++
.../webapp/TestFederationInterceptorREST.java | 12 +-
5 files changed, 593 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2aacb9d3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
new file mode 100644
index 0000000..42361a3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.router;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.*;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.hadoop.metrics2.lib.Interns.info;
+
+/**
+ * This class is for maintaining the various Router Federation Interceptor
+ * activity statistics and publishing them through the metrics interfaces.
+ */
+@InterfaceAudience.Private
+@Metrics(about = "Metrics for Router Federation Interceptor", context = "fedr")
+public final class RouterMetrics {
+
+ private static final MetricsInfo RECORD_INFO =
+ info("RouterMetrics", "Router Federation Interceptor");
+ private static AtomicBoolean isInitialized = new AtomicBoolean(false);
+
+ // Metrics for operation failed
+ @Metric("# of applications failed to be submitted")
+ private MutableGaugeInt numAppsFailedSubmitted;
+ @Metric("# of applications failed to be created")
+ private MutableGaugeInt numAppsFailedCreated;
+ @Metric("# of applications failed to be killed")
+ private MutableGaugeInt numAppsFailedKilled;
+ @Metric("# of application reports failed to be retrieved")
+ private MutableGaugeInt numAppsFailedRetrieved;
+
+ // Aggregate metrics are shared, and don't have to be looked up per call
+ @Metric("Total number of successful Submitted apps and latency(ms)")
+ private MutableRate totalSucceededAppsSubmitted;
+ @Metric("Total number of successful Killed apps and latency(ms)")
+ private MutableRate totalSucceededAppsKilled;
+ @Metric("Total number of successful Created apps and latency(ms)")
+ private MutableRate totalSucceededAppsCreated;
+ @Metric("Total number of successful Retrieved app reports and latency(ms)")
+ private MutableRate totalSucceededAppsRetrieved;
+
+ /**
+ * Provide quantile counters for all latencies.
+ */
+ private MutableQuantiles submitApplicationLatency;
+ private MutableQuantiles getNewApplicationLatency;
+ private MutableQuantiles killApplicationLatency;
+ private MutableQuantiles getApplicationReportLatency;
+
+ private static volatile RouterMetrics INSTANCE = null;
+ private static MetricsRegistry registry;
+
+ private RouterMetrics() {
+ registry = new MetricsRegistry(RECORD_INFO);
+ registry.tag(RECORD_INFO, "Router");
+ getNewApplicationLatency = registry.newQuantiles("getNewApplicationLatency",
+ "latency of get new application", "ops", "latency", 10);
+ submitApplicationLatency = registry.newQuantiles("submitApplicationLatency",
+ "latency of submit application", "ops", "latency", 10);
+ killApplicationLatency = registry.newQuantiles("killApplicationLatency",
+ "latency of kill application", "ops", "latency", 10);
+ getApplicationReportLatency =
+ registry.newQuantiles("getApplicationReportLatency",
+ "latency of get application report", "ops", "latency", 10);
+ }
+
+ public static RouterMetrics getMetrics() {
+ if (!isInitialized.get()) {
+ synchronized (RouterMetrics.class) {
+ if (INSTANCE == null) {
+ INSTANCE = DefaultMetricsSystem.instance().register("RouterMetrics",
+ "Metrics for the Yarn Router", new RouterMetrics());
+ isInitialized.set(true);
+ }
+ }
+ }
+ return INSTANCE;
+ }
+
+ @VisibleForTesting
+ synchronized static void destroy() {
+ isInitialized.set(false);
+ INSTANCE = null;
+ }
+
+ @VisibleForTesting
+ public long getNumSucceededAppsCreated() {
+ return totalSucceededAppsCreated.lastStat().numSamples();
+ }
+
+ @VisibleForTesting
+ public long getNumSucceededAppsSubmitted() {
+ return totalSucceededAppsSubmitted.lastStat().numSamples();
+ }
+
+ @VisibleForTesting
+ public long getNumSucceededAppsKilled() {
+ return totalSucceededAppsKilled.lastStat().numSamples();
+ }
+
+ @VisibleForTesting
+ public long getNumSucceededAppsRetrieved() {
+ return totalSucceededAppsRetrieved.lastStat().numSamples();
+ }
+
+ @VisibleForTesting
+ public double getLatencySucceededAppsCreated() {
+ return totalSucceededAppsCreated.lastStat().mean();
+ }
+
+ @VisibleForTesting
+ public double getLatencySucceededAppsSubmitted() {
+ return totalSucceededAppsSubmitted.lastStat().mean();
+ }
+
+ @VisibleForTesting
+ public double getLatencySucceededAppsKilled() {
+ return totalSucceededAppsKilled.lastStat().mean();
+ }
+
+ @VisibleForTesting
+ public double getLatencySucceededGetAppReport() {
+ return totalSucceededAppsRetrieved.lastStat().mean();
+ }
+
+ @VisibleForTesting
+ public int getAppsFailedCreated() {
+ return numAppsFailedCreated.value();
+ }
+
+ @VisibleForTesting
+ public int getAppsFailedSubmitted() {
+ return numAppsFailedSubmitted.value();
+ }
+
+ @VisibleForTesting
+ public int getAppsFailedKilled() {
+ return numAppsFailedKilled.value();
+ }
+
+ @VisibleForTesting
+ public int getAppsFailedRetrieved() {
+ return numAppsFailedRetrieved.value();
+ }
+
+ public void succeededAppsCreated(long duration) {
+ totalSucceededAppsCreated.add(duration);
+ getNewApplicationLatency.add(duration);
+ }
+
+ public void succeededAppsSubmitted(long duration) {
+ totalSucceededAppsSubmitted.add(duration);
+ submitApplicationLatency.add(duration);
+ }
+
+ public void succeededAppsKilled(long duration) {
+ totalSucceededAppsKilled.add(duration);
+ killApplicationLatency.add(duration);
+ }
+
+ public void succeededAppsRetrieved(long duration) {
+ totalSucceededAppsRetrieved.add(duration);
+ getApplicationReportLatency.add(duration);
+ }
+
+ public void incrAppsFailedCreated() {
+ numAppsFailedCreated.incr();
+ }
+
+ public void incrAppsFailedSubmitted() {
+ numAppsFailedSubmitted.incr();
+ }
+
+ public void incrAppsFailedKilled() {
+ numAppsFailedKilled.incr();
+ }
+
+ public void incrAppsFailedRetrieved() {
+ numAppsFailedRetrieved.incr();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2aacb9d3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
index 7268ebd..3a36eec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
@@ -98,7 +98,10 @@ import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSub
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.router.RouterMetrics;
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.MonotonicClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -130,6 +133,8 @@ public class FederationClientInterceptor
private FederationStateStoreFacade federationFacade;
private Random rand;
private RouterPolicyFacade policyFacade;
+ private RouterMetrics routerMetrics;
+ private final Clock clock = new MonotonicClock();
@Override
public void init(String userName) {
@@ -153,7 +158,7 @@ public class FederationClientInterceptor
clientRMProxies =
new ConcurrentHashMap<SubClusterId, ApplicationClientProtocol>();
-
+ routerMetrics = RouterMetrics.getMetrics();
}
@Override
@@ -220,6 +225,9 @@ public class FederationClientInterceptor
@Override
public GetNewApplicationResponse getNewApplication(
GetNewApplicationRequest request) throws YarnException, IOException {
+
+ long startTime = clock.getTime();
+
Map<SubClusterId, SubClusterInfo> subClustersActive =
federationFacade.getSubClusters(true);
@@ -238,6 +246,9 @@ public class FederationClientInterceptor
}
if (response != null) {
+
+ long stopTime = clock.getTime();
+ routerMetrics.succeededAppsCreated(stopTime - startTime);
return response;
} else {
// Empty response from the ResourceManager.
@@ -247,6 +258,7 @@ public class FederationClientInterceptor
}
+ routerMetrics.incrAppsFailedCreated();
String errMsg = "Fail to create a new application.";
LOG.error(errMsg);
throw new YarnException(errMsg);
@@ -320,9 +332,13 @@ public class FederationClientInterceptor
@Override
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request) throws YarnException, IOException {
+
+ long startTime = clock.getTime();
+
if (request == null || request.getApplicationSubmissionContext() == null
|| request.getApplicationSubmissionContext()
.getApplicationId() == null) {
+ routerMetrics.incrAppsFailedSubmitted();
RouterServerUtil
.logAndThrowException("Missing submitApplication request or "
+ "applicationSubmissionContex information.", null);
@@ -350,6 +366,7 @@ public class FederationClientInterceptor
subClusterId =
federationFacade.addApplicationHomeSubCluster(appHomeSubCluster);
} catch (YarnException e) {
+ routerMetrics.incrAppsFailedSubmitted();
String message = "Unable to insert the ApplicationId " + applicationId
+ " into the FederationStateStore";
RouterServerUtil.logAndThrowException(message, e);
@@ -368,6 +385,7 @@ public class FederationClientInterceptor
LOG.info("Application " + applicationId
+ " already submitted on SubCluster " + subClusterId);
} else {
+ routerMetrics.incrAppsFailedSubmitted();
RouterServerUtil.logAndThrowException(message, e);
}
}
@@ -388,6 +406,8 @@ public class FederationClientInterceptor
LOG.info("Application "
+ request.getApplicationSubmissionContext().getApplicationName()
+ " with appId " + applicationId + " submitted on " + subClusterId);
+ long stopTime = clock.getTime();
+ routerMetrics.succeededAppsSubmitted(stopTime - startTime);
return response;
} else {
// Empty response from the ResourceManager.
@@ -396,6 +416,7 @@ public class FederationClientInterceptor
}
}
+ routerMetrics.incrAppsFailedSubmitted();
String errMsg = "Application "
+ request.getApplicationSubmissionContext().getApplicationName()
+ " with appId " + applicationId + " failed to be submitted.";
@@ -423,7 +444,10 @@ public class FederationClientInterceptor
public KillApplicationResponse forceKillApplication(
KillApplicationRequest request) throws YarnException, IOException {
+ long startTime = clock.getTime();
+
if (request == null || request.getApplicationId() == null) {
+ routerMetrics.incrAppsFailedKilled();
RouterServerUtil.logAndThrowException(
"Missing forceKillApplication request or ApplicationId.", null);
}
@@ -434,6 +458,7 @@ public class FederationClientInterceptor
subClusterId = federationFacade
.getApplicationHomeSubCluster(request.getApplicationId());
} catch (YarnException e) {
+ routerMetrics.incrAppsFailedKilled();
RouterServerUtil.logAndThrowException("Application " + applicationId
+ " does not exist in FederationStateStore", e);
}
@@ -447,6 +472,7 @@ public class FederationClientInterceptor
+ subClusterId);
response = clientRMProxy.forceKillApplication(request);
} catch (Exception e) {
+ routerMetrics.incrAppsFailedKilled();
LOG.error("Unable to kill the application report for "
+ request.getApplicationId() + "to SubCluster "
+ subClusterId.getId(), e);
@@ -458,6 +484,8 @@ public class FederationClientInterceptor
+ applicationId + " to SubCluster " + subClusterId.getId());
}
+ long stopTime = clock.getTime();
+ routerMetrics.succeededAppsKilled(stopTime - startTime);
return response;
}
@@ -481,7 +509,10 @@ public class FederationClientInterceptor
public GetApplicationReportResponse getApplicationReport(
GetApplicationReportRequest request) throws YarnException, IOException {
+ long startTime = clock.getTime();
+
if (request == null || request.getApplicationId() == null) {
+ routerMetrics.incrAppsFailedRetrieved();
RouterServerUtil.logAndThrowException(
"Missing getApplicationReport request or applicationId information.",
null);
@@ -493,6 +524,7 @@ public class FederationClientInterceptor
subClusterId = federationFacade
.getApplicationHomeSubCluster(request.getApplicationId());
} catch (YarnException e) {
+ routerMetrics.incrAppsFailedRetrieved();
RouterServerUtil
.logAndThrowException("Application " + request.getApplicationId()
+ " does not exist in FederationStateStore", e);
@@ -505,6 +537,7 @@ public class FederationClientInterceptor
try {
response = clientRMProxy.getApplicationReport(request);
} catch (Exception e) {
+ routerMetrics.incrAppsFailedRetrieved();
LOG.error("Unable to get the application report for "
+ request.getApplicationId() + "to SubCluster "
+ subClusterId.getId(), e);
@@ -517,6 +550,8 @@ public class FederationClientInterceptor
+ subClusterId.getId());
}
+ long stopTime = clock.getTime();
+ routerMetrics.succeededAppsRetrieved(stopTime - startTime);
return response;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2aacb9d3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
index 8ecc19d..4c7d4b1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
@@ -18,7 +18,19 @@
package org.apache.hadoop.yarn.server.router.webapp;
-import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authorize.AuthorizationException;
@@ -36,20 +48,42 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
-import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.*;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.DelegationToken;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsEntryList;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
+import org.apache.hadoop.yarn.server.router.RouterMetrics;
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.MonotonicClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
-import java.io.IOException;
-import java.util.*;
+import com.google.common.annotations.VisibleForTesting;
/**
* Extends the {@code AbstractRESTRequestInterceptor} class and provides an
@@ -66,6 +100,8 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
private FederationStateStoreFacade federationFacade;
private Random rand;
private RouterPolicyFacade policyFacade;
+ private RouterMetrics routerMetrics;
+ private final Clock clock = new MonotonicClock();
private Map<SubClusterId, DefaultRequestInterceptorREST> interceptors;
@@ -88,6 +124,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY);
interceptors = new HashMap<SubClusterId, DefaultRequestInterceptorREST>();
+ routerMetrics = RouterMetrics.getMetrics();
}
private SubClusterId getRandomActiveSubCluster(
@@ -191,10 +228,14 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
@Override
public Response createNewApplication(HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
+
+ long startTime = clock.getTime();
+
Map<SubClusterId, SubClusterInfo> subClustersActive;
try {
subClustersActive = federationFacade.getSubClusters(true);
} catch (YarnException e) {
+ routerMetrics.incrAppsFailedCreated();
return Response.status(Status.INTERNAL_SERVER_ERROR)
.entity(e.getLocalizedMessage()).build();
}
@@ -207,6 +248,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
try {
subClusterId = getRandomActiveSubCluster(subClustersActive, blacklist);
} catch (YarnException e) {
+ routerMetrics.incrAppsFailedCreated();
return Response.status(Status.SERVICE_UNAVAILABLE)
.entity(e.getLocalizedMessage()).build();
}
@@ -226,6 +268,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
}
if (response != null && response.getStatus() == 200) {
+
+ long stopTime = clock.getTime();
+ routerMetrics.succeededAppsCreated(stopTime - startTime);
+
return response;
} else {
// Empty response from the ResourceManager.
@@ -236,6 +282,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
String errMsg = "Fail to create a new application.";
LOG.error(errMsg);
+ routerMetrics.incrAppsFailedCreated();
return Response.status(Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
}
@@ -308,7 +355,11 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
public Response submitApplication(ApplicationSubmissionContextInfo newApp,
HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
+
+ long startTime = clock.getTime();
+
if (newApp == null || newApp.getApplicationId() == null) {
+ routerMetrics.incrAppsFailedSubmitted();
String errMsg = "Missing ApplicationSubmissionContextInfo or "
+ "applicationSubmissionContex information.";
return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
@@ -318,6 +369,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
try {
applicationId = ApplicationId.fromString(newApp.getApplicationId());
} catch (IllegalArgumentException e) {
+ routerMetrics.incrAppsFailedSubmitted();
return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage())
.build();
}
@@ -333,6 +385,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
try {
subClusterId = policyFacade.getHomeSubcluster(context, blacklist);
} catch (YarnException e) {
+ routerMetrics.incrAppsFailedSubmitted();
return Response.status(Status.SERVICE_UNAVAILABLE)
.entity(e.getLocalizedMessage()).build();
}
@@ -349,6 +402,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
subClusterId =
federationFacade.addApplicationHomeSubCluster(appHomeSubCluster);
} catch (YarnException e) {
+ routerMetrics.incrAppsFailedSubmitted();
String errMsg = "Unable to insert the ApplicationId " + applicationId
+ " into the FederationStateStore";
return Response.status(Status.SERVICE_UNAVAILABLE)
@@ -367,6 +421,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
subClusterIdInStateStore =
federationFacade.getApplicationHomeSubCluster(applicationId);
} catch (YarnException e1) {
+ routerMetrics.incrAppsFailedSubmitted();
return Response.status(Status.SERVICE_UNAVAILABLE)
.entity(e1.getLocalizedMessage()).build();
}
@@ -374,6 +429,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
LOG.info("Application " + applicationId
+ " already submitted on SubCluster " + subClusterId);
} else {
+ routerMetrics.incrAppsFailedSubmitted();
return Response.status(Status.SERVICE_UNAVAILABLE).entity(errMsg)
.build();
}
@@ -384,6 +440,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
try {
subClusterInfo = federationFacade.getSubCluster(subClusterId);
} catch (YarnException e) {
+ routerMetrics.incrAppsFailedSubmitted();
return Response.status(Status.SERVICE_UNAVAILABLE)
.entity(e.getLocalizedMessage()).build();
}
@@ -401,6 +458,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
if (response != null && response.getStatus() == 202) {
LOG.info("Application " + context.getApplicationName() + " with appId "
+ applicationId + " submitted on " + subClusterId);
+
+ long stopTime = clock.getTime();
+ routerMetrics.succeededAppsSubmitted(stopTime - startTime);
+
return response;
} else {
// Empty response from the ResourceManager.
@@ -409,6 +470,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
}
}
+ routerMetrics.incrAppsFailedSubmitted();
String errMsg = "Application " + newApp.getApplicationName()
+ " with appId " + applicationId + " failed to be submitted.";
LOG.error(errMsg);
@@ -435,10 +497,13 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
public AppInfo getApp(HttpServletRequest hsr, String appId,
Set<String> unselectedFields) {
+ long startTime = clock.getTime();
+
ApplicationId applicationId = null;
try {
applicationId = ApplicationId.fromString(appId);
} catch (IllegalArgumentException e) {
+ routerMetrics.incrAppsFailedRetrieved();
return null;
}
@@ -448,16 +513,23 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
subClusterId =
federationFacade.getApplicationHomeSubCluster(applicationId);
if (subClusterId == null) {
+ routerMetrics.incrAppsFailedRetrieved();
return null;
}
subClusterInfo = federationFacade.getSubCluster(subClusterId);
} catch (YarnException e) {
+ routerMetrics.incrAppsFailedRetrieved();
return null;
}
- return getOrCreateInterceptorForSubCluster(subClusterId,
+ AppInfo response = getOrCreateInterceptorForSubCluster(subClusterId,
subClusterInfo.getRMWebServiceAddress()).getApp(hsr, appId,
unselectedFields);
+
+ long stopTime = clock.getTime();
+ routerMetrics.succeededAppsRetrieved(stopTime - startTime);
+
+ return response;
}
/**
@@ -481,23 +553,37 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
String appId) throws AuthorizationException, YarnException,
InterruptedException, IOException {
+ long startTime = clock.getTime();
+
ApplicationId applicationId = null;
try {
applicationId = ApplicationId.fromString(appId);
} catch (IllegalArgumentException e) {
+ routerMetrics.incrAppsFailedKilled();
return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage())
.build();
}
- SubClusterId subClusterId =
- federationFacade.getApplicationHomeSubCluster(applicationId);
-
- SubClusterInfo subClusterInfo =
- federationFacade.getSubCluster(subClusterId);
+ SubClusterInfo subClusterInfo = null;
+ SubClusterId subClusterId = null;
+ try {
+ subClusterId =
+ federationFacade.getApplicationHomeSubCluster(applicationId);
+ subClusterInfo = federationFacade.getSubCluster(subClusterId);
+ } catch (YarnException e) {
+ routerMetrics.incrAppsFailedKilled();
+ return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage())
+ .build();
+ }
- return getOrCreateInterceptorForSubCluster(subClusterId,
+ Response response = getOrCreateInterceptorForSubCluster(subClusterId,
subClusterInfo.getRMWebServiceAddress()).updateAppState(targetState,
hsr, appId);
+
+ long stopTime = clock.getTime();
+ routerMetrics.succeededAppsRetrieved(stopTime - startTime);
+
+ return response;
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2aacb9d3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
new file mode 100644
index 0000000..3cdafd8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.router;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class validates the correctness of Router Federation Interceptor
+ * Metrics.
+ */
+public class TestRouterMetrics {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(TestRouterMetrics.class);
+
+ // All the operations in the bad subcluster failed.
+ private MockBadSubCluster badSubCluster = new MockBadSubCluster();
+ // All the operations in the bad subcluster succeed.
+ private MockGoodSubCluster goodSubCluster = new MockGoodSubCluster();
+
+ private static RouterMetrics metrics = RouterMetrics.getMetrics();
+
+ @BeforeClass
+ public static void init() {
+
+ LOG.info("Test: aggregate metrics are initialized correctly");
+
+ Assert.assertEquals(0, metrics.getNumSucceededAppsCreated());
+ Assert.assertEquals(0, metrics.getNumSucceededAppsSubmitted());
+ Assert.assertEquals(0, metrics.getNumSucceededAppsKilled());
+ Assert.assertEquals(0, metrics.getNumSucceededAppsRetrieved());
+
+ Assert.assertEquals(0, metrics.getAppsFailedCreated());
+ Assert.assertEquals(0, metrics.getAppsFailedSubmitted());
+ Assert.assertEquals(0, metrics.getAppsFailedKilled());
+ Assert.assertEquals(0, metrics.getAppsFailedRetrieved());
+
+ LOG.info("Test: aggregate metrics are updated correctly");
+ }
+
+ /**
+ * This test validates the correctness of the metric: Created Apps
+ * successfully.
+ */
+ @Test
+ public void testSucceededAppsCreated() {
+
+ long totalGoodBefore = metrics.getNumSucceededAppsCreated();
+
+ goodSubCluster.getNewApplication(100);
+
+ Assert.assertEquals(totalGoodBefore + 1,
+ metrics.getNumSucceededAppsCreated());
+ Assert.assertEquals(100, metrics.getLatencySucceededAppsCreated(), 0);
+
+ goodSubCluster.getNewApplication(200);
+
+ Assert.assertEquals(totalGoodBefore + 2,
+ metrics.getNumSucceededAppsCreated());
+ Assert.assertEquals(150, metrics.getLatencySucceededAppsCreated(), 0);
+ }
+
+ /**
+ * This test validates the correctness of the metric: Failed to create Apps.
+ */
+ @Test
+ public void testAppsFailedCreated() {
+
+ long totalBadbefore = metrics.getAppsFailedCreated();
+
+ badSubCluster.getNewApplication();
+
+ Assert.assertEquals(totalBadbefore + 1, metrics.getAppsFailedCreated());
+ }
+
+ /**
+ * This test validates the correctness of the metric: Submitted Apps
+ * successfully.
+ */
+ @Test
+ public void testSucceededAppsSubmitted() {
+
+ long totalGoodBefore = metrics.getNumSucceededAppsSubmitted();
+
+ goodSubCluster.submitApplication(100);
+
+ Assert.assertEquals(totalGoodBefore + 1,
+ metrics.getNumSucceededAppsSubmitted());
+ Assert.assertEquals(100, metrics.getLatencySucceededAppsSubmitted(), 0);
+
+ goodSubCluster.submitApplication(200);
+
+ Assert.assertEquals(totalGoodBefore + 2,
+ metrics.getNumSucceededAppsSubmitted());
+ Assert.assertEquals(150, metrics.getLatencySucceededAppsSubmitted(), 0);
+ }
+
+ /**
+ * This test validates the correctness of the metric: Failed to submit Apps.
+ */
+ @Test
+ public void testAppsFailedSubmitted() {
+
+ long totalBadbefore = metrics.getAppsFailedSubmitted();
+
+ badSubCluster.submitApplication();
+
+ Assert.assertEquals(totalBadbefore + 1, metrics.getAppsFailedSubmitted());
+ }
+
+ /**
+ * This test validates the correctness of the metric: Killed Apps
+ * successfully.
+ */
+ @Test
+ public void testSucceededAppsKilled() {
+
+ long totalGoodBefore = metrics.getNumSucceededAppsKilled();
+
+ goodSubCluster.forceKillApplication(100);
+
+ Assert.assertEquals(totalGoodBefore + 1,
+ metrics.getNumSucceededAppsKilled());
+ Assert.assertEquals(100, metrics.getLatencySucceededAppsKilled(), 0);
+
+ goodSubCluster.forceKillApplication(200);
+
+ Assert.assertEquals(totalGoodBefore + 2,
+ metrics.getNumSucceededAppsKilled());
+ Assert.assertEquals(150, metrics.getLatencySucceededAppsKilled(), 0);
+ }
+
+ /**
+ * This test validates the correctness of the metric: Failed to kill Apps.
+ */
+ @Test
+ public void testAppsFailedKilled() {
+
+ long totalBadbefore = metrics.getAppsFailedKilled();
+
+ badSubCluster.forceKillApplication();
+
+ Assert.assertEquals(totalBadbefore + 1, metrics.getAppsFailedKilled());
+ }
+
+ /**
+ * This test validates the correctness of the metric: Retrieved Apps
+ * successfully.
+ */
+ @Test
+ public void testSucceededAppsReport() {
+
+ long totalGoodBefore = metrics.getNumSucceededAppsRetrieved();
+
+ goodSubCluster.getApplicationReport(100);
+
+ Assert.assertEquals(totalGoodBefore + 1,
+ metrics.getNumSucceededAppsRetrieved());
+ Assert.assertEquals(100, metrics.getLatencySucceededGetAppReport(), 0);
+
+ goodSubCluster.getApplicationReport(200);
+
+ Assert.assertEquals(totalGoodBefore + 2,
+ metrics.getNumSucceededAppsRetrieved());
+ Assert.assertEquals(150, metrics.getLatencySucceededGetAppReport(), 0);
+ }
+
+ /**
+ * This test validates the correctness of the metric: Failed to retrieve Apps.
+ */
+ @Test
+ public void testAppsReportFailed() {
+
+ long totalBadbefore = metrics.getAppsFailedRetrieved();
+
+ badSubCluster.getApplicationReport();
+
+ Assert.assertEquals(totalBadbefore + 1, metrics.getAppsFailedRetrieved());
+ }
+
+ // Records failures for all calls
+ private class MockBadSubCluster {
+ public void getNewApplication() {
+ LOG.info("Mocked: failed getNewApplication call");
+ metrics.incrAppsFailedCreated();
+ }
+
+ public void submitApplication() {
+ LOG.info("Mocked: failed submitApplication call");
+ metrics.incrAppsFailedSubmitted();
+ }
+
+ public void forceKillApplication() {
+ LOG.info("Mocked: failed forceKillApplication call");
+ metrics.incrAppsFailedKilled();
+ }
+
+ public void getApplicationReport() {
+ LOG.info("Mocked: failed getApplicationReport call");
+ metrics.incrAppsFailedRetrieved();
+ }
+ }
+
+ // Records successes for all calls
+ private class MockGoodSubCluster {
+ public void getNewApplication(long duration) {
+ LOG.info("Mocked: successful getNewApplication call with duration {}",
+ duration);
+ metrics.succeededAppsCreated(duration);
+ }
+
+ public void submitApplication(long duration) {
+ LOG.info("Mocked: successful submitApplication call with duration {}",
+ duration);
+ metrics.succeededAppsSubmitted(duration);
+ }
+
+ public void forceKillApplication(long duration) {
+ LOG.info("Mocked: successful forceKillApplication call with duration {}",
+ duration);
+ metrics.succeededAppsKilled(duration);
+ }
+
+ public void getApplicationReport(long duration) {
+ LOG.info("Mocked: successful getApplicationReport call with duration {}",
+ duration);
+ metrics.succeededAppsRetrieved(duration);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2aacb9d3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
index d918149..fb6cdd8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
@@ -276,13 +276,11 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
AppState appState = new AppState("KILLED");
- try {
- interceptor.updateAppState(appState, null, appId.toString());
- Assert.fail();
- } catch (YarnException e) {
- Assert.assertTrue(
- e.getMessage().equals("Application " + appId + " does not exist"));
- }
+
+ Response response =
+ interceptor.updateAppState(appState, null, appId.toString());
+ Assert.assertEquals(BAD_REQUEST, response.getStatus());
+
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org