You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2017/09/22 01:20:10 UTC
[5/8] hadoop git commit: YARN-5603. Metrics for Federation
StateStore. (Ellen Hui via asuresh)
YARN-5603. Metrics for Federation StateStore. (Ellen Hui via asuresh)
(cherry picked from commit 75abc9a8e2cf1c7d2c574ede720df59421512be3)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ac090b38
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ac090b38
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ac090b38
Branch: refs/heads/branch-2
Commit: ac090b38ad54f78f59ec2ec0f73c6c4d7664d4cb
Parents: 261f769
Author: Arun Suresh <as...@apache.org>
Authored: Mon Aug 21 22:43:08 2017 -0700
Committer: Carlo Curino <cu...@apache.org>
Committed: Thu Sep 21 18:09:30 2017 -0700
----------------------------------------------------------------------
.../store/impl/SQLFederationStateStore.java | 79 ++++++++
.../FederationStateStoreClientMetrics.java | 184 +++++++++++++++++++
.../federation/store/metrics/package-info.java | 17 ++
.../TestFederationStateStoreClientMetrics.java | 146 +++++++++++++++
4 files changed, 426 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac090b38/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java
index 63d8e42..533f9c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
+import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
@@ -72,6 +73,8 @@ import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembership
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.MonotonicClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -137,6 +140,7 @@ public class SQLFederationStateStore implements FederationStateStore {
private String url;
private int maximumPoolSize;
private HikariDataSource dataSource = null;
+ private final Clock clock = new MonotonicClock();
@Override
public void init(Configuration conf) throws YarnException {
@@ -203,7 +207,9 @@ public class SQLFederationStateStore implements FederationStateStore {
cstmt.registerOutParameter(9, java.sql.Types.INTEGER);
// Execute the query
+ long startTime = clock.getTime();
cstmt.executeUpdate();
+ long stopTime = clock.getTime();
// Check the ROWCOUNT value, if it is equal to 0 it means the call
// did not add a new subcluster into FederationStateStore
@@ -222,8 +228,11 @@ public class SQLFederationStateStore implements FederationStateStore {
LOG.info(
"Registered the SubCluster " + subClusterId + " into the StateStore");
+ FederationStateStoreClientMetrics
+ .succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to register the SubCluster " + subClusterId
+ " into the StateStore",
@@ -260,7 +269,9 @@ public class SQLFederationStateStore implements FederationStateStore {
cstmt.registerOutParameter(3, java.sql.Types.INTEGER);
// Execute the query
+ long startTime = clock.getTime();
cstmt.executeUpdate();
+ long stopTime = clock.getTime();
// Check the ROWCOUNT value, if it is equal to 0 it means the call
// did not deregister the subcluster into FederationStateStore
@@ -278,8 +289,11 @@ public class SQLFederationStateStore implements FederationStateStore {
LOG.info("Deregistered the SubCluster " + subClusterId + " state to "
+ state.toString());
+ FederationStateStoreClientMetrics
+ .succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to deregister the sub-cluster " + subClusterId + " state to "
+ state.toString(),
@@ -317,7 +331,9 @@ public class SQLFederationStateStore implements FederationStateStore {
cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
// Execute the query
+ long startTime = clock.getTime();
cstmt.executeUpdate();
+ long stopTime = clock.getTime();
// Check the ROWCOUNT value, if it is equal to 0 it means the call
// did not update the subcluster into FederationStateStore
@@ -336,8 +352,11 @@ public class SQLFederationStateStore implements FederationStateStore {
LOG.info("Heartbeated the StateStore for the specified SubCluster "
+ subClusterId);
+ FederationStateStoreClientMetrics
+ .succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to heartbeat the StateStore for the specified SubCluster "
+ subClusterId,
@@ -378,7 +397,9 @@ public class SQLFederationStateStore implements FederationStateStore {
cstmt.registerOutParameter(9, java.sql.Types.VARCHAR);
// Execute the query
+ long startTime = clock.getTime();
cstmt.execute();
+ long stopTime = clock.getTime();
String amRMAddress = cstmt.getString(2);
String clientRMAddress = cstmt.getString(3);
@@ -403,6 +424,9 @@ public class SQLFederationStateStore implements FederationStateStore {
clientRMAddress, rmAdminAddress, webAppAddress, lastHeartBeat, state,
lastStartTime, capability);
+ FederationStateStoreClientMetrics
+ .succeededStateStoreCall(stopTime - startTime);
+
// Check if the output it is a valid subcluster
try {
FederationMembershipStateStoreInputValidator
@@ -417,6 +441,7 @@ public class SQLFederationStateStore implements FederationStateStore {
+ subClusterInfo.toString());
}
} catch (SQLException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to obtain the SubCluster information for " + subClusterId, e);
} finally {
@@ -439,7 +464,9 @@ public class SQLFederationStateStore implements FederationStateStore {
cstmt = conn.prepareCall(CALL_SP_GET_SUBCLUSTERS);
// Execute the query
+ long startTime = clock.getTime();
rs = cstmt.executeQuery();
+ long stopTime = clock.getTime();
while (rs.next()) {
@@ -459,6 +486,10 @@ public class SQLFederationStateStore implements FederationStateStore {
amRMAddress, clientRMAddress, rmAdminAddress, webAppAddress,
lastHeartBeat, state, lastStartTime, capability);
+ FederationStateStoreClientMetrics
+ .succeededStateStoreCall(stopTime - startTime);
+
+
// Check if the output it is a valid subcluster
try {
FederationMembershipStateStoreInputValidator
@@ -477,6 +508,7 @@ public class SQLFederationStateStore implements FederationStateStore {
}
} catch (SQLException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to obtain the information for all the SubClusters ", e);
} finally {
@@ -513,11 +545,16 @@ public class SQLFederationStateStore implements FederationStateStore {
cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
// Execute the query
+ long startTime = clock.getTime();
cstmt.executeUpdate();
+ long stopTime = clock.getTime();
subClusterHome = cstmt.getString(3);
SubClusterId subClusterIdHome = SubClusterId.newInstance(subClusterHome);
+ FederationStateStoreClientMetrics
+ .succeededStateStoreCall(stopTime - startTime);
+
// For failover reason, we check the returned SubClusterId.
// If it is equal to the subclusterId we sent, the call added the new
// application into FederationStateStore. If the call returns a different
@@ -554,6 +591,7 @@ public class SQLFederationStateStore implements FederationStateStore {
}
} catch (SQLException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils
.logAndThrowRetriableException(LOG,
"Unable to insert the newly generated application "
@@ -592,7 +630,9 @@ public class SQLFederationStateStore implements FederationStateStore {
cstmt.registerOutParameter(3, java.sql.Types.INTEGER);
// Execute the query
+ long startTime = clock.getTime();
cstmt.executeUpdate();
+ long stopTime = clock.getTime();
// Check the ROWCOUNT value, if it is equal to 0 it means the call
// did not update the application into FederationStateStore
@@ -611,8 +651,11 @@ public class SQLFederationStateStore implements FederationStateStore {
LOG.info(
"Update the SubCluster to {} for application {} in the StateStore",
subClusterId, appId);
+ FederationStateStoreClientMetrics
+ .succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils
.logAndThrowRetriableException(LOG,
"Unable to update the application "
@@ -645,7 +688,9 @@ public class SQLFederationStateStore implements FederationStateStore {
cstmt.registerOutParameter(2, java.sql.Types.VARCHAR);
// Execute the query
+ long startTime = clock.getTime();
cstmt.execute();
+ long stopTime = clock.getTime();
if (cstmt.getString(2) != null) {
homeRM = SubClusterId.newInstance(cstmt.getString(2));
@@ -659,7 +704,12 @@ public class SQLFederationStateStore implements FederationStateStore {
LOG.debug("Got the information about the specified application "
+ request.getApplicationId() + ". The AM is running in " + homeRM);
}
+
+ FederationStateStoreClientMetrics
+ .succeededStateStoreCall(stopTime - startTime);
+
} catch (SQLException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to obtain the application information "
+ "for the specified application " + request.getApplicationId(),
@@ -688,7 +738,9 @@ public class SQLFederationStateStore implements FederationStateStore {
cstmt = conn.prepareCall(CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER);
// Execute the query
+ long startTime = clock.getTime();
rs = cstmt.executeQuery();
+ long stopTime = clock.getTime();
while (rs.next()) {
@@ -701,7 +753,11 @@ public class SQLFederationStateStore implements FederationStateStore {
SubClusterId.newInstance(homeSubCluster)));
}
+ FederationStateStoreClientMetrics
+ .succeededStateStoreCall(stopTime - startTime);
+
} catch (SQLException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to obtain the information for all the applications ", e);
} finally {
@@ -731,7 +787,9 @@ public class SQLFederationStateStore implements FederationStateStore {
cstmt.registerOutParameter(2, java.sql.Types.INTEGER);
// Execute the query
+ long startTime = clock.getTime();
cstmt.executeUpdate();
+ long stopTime = clock.getTime();
// Check the ROWCOUNT value, if it is equal to 0 it means the call
// did not delete the application from FederationStateStore
@@ -750,8 +808,11 @@ public class SQLFederationStateStore implements FederationStateStore {
LOG.info("Delete from the StateStore the application: {}",
request.getApplicationId());
+ FederationStateStoreClientMetrics
+ .succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to delete the application " + request.getApplicationId(), e);
} finally {
@@ -782,7 +843,9 @@ public class SQLFederationStateStore implements FederationStateStore {
cstmt.registerOutParameter(3, java.sql.Types.VARBINARY);
// Execute the query
+ long startTime = clock.getTime();
cstmt.executeUpdate();
+ long stopTime = clock.getTime();
// Check if the output it is a valid policy
if (cstmt.getString(2) != null && cstmt.getBytes(3) != null) {
@@ -798,7 +861,11 @@ public class SQLFederationStateStore implements FederationStateStore {
return null;
}
+ FederationStateStoreClientMetrics
+ .succeededStateStoreCall(stopTime - startTime);
+
} catch (SQLException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to select the policy for the queue :" + request.getQueue(),
e);
@@ -833,7 +900,9 @@ public class SQLFederationStateStore implements FederationStateStore {
cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
// Execute the query
+ long startTime = clock.getTime();
cstmt.executeUpdate();
+ long stopTime = clock.getTime();
// Check the ROWCOUNT value, if it is equal to 0 it means the call
// did not add a new policy into FederationStateStore
@@ -852,8 +921,11 @@ public class SQLFederationStateStore implements FederationStateStore {
LOG.info("Insert into the state store the policy for the queue: "
+ policyConf.getQueue());
+ FederationStateStoreClientMetrics
+ .succeededStateStoreCall(stopTime - startTime);
} catch (SQLException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to insert the newly generated policy for the queue :"
+ policyConf.getQueue(),
@@ -880,7 +952,9 @@ public class SQLFederationStateStore implements FederationStateStore {
cstmt = conn.prepareCall(CALL_SP_GET_POLICIES_CONFIGURATIONS);
// Execute the query
+ long startTime = clock.getTime();
rs = cstmt.executeQuery();
+ long stopTime = clock.getTime();
while (rs.next()) {
@@ -894,7 +968,12 @@ public class SQLFederationStateStore implements FederationStateStore {
ByteBuffer.wrap(policyInfo));
policyConfigurations.add(subClusterPolicyConfiguration);
}
+
+ FederationStateStoreClientMetrics
+ .succeededStateStoreCall(stopTime - startTime);
+
} catch (SQLException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to obtain the policy information for all the queues.", e);
} finally {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac090b38/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/FederationStateStoreClientMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/FederationStateStoreClientMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/FederationStateStoreClientMetrics.java
new file mode 100644
index 0000000..27b46cd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/FederationStateStoreClientMetrics.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.metrics;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
+import org.apache.hadoop.metrics2.lib.MutableRate;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Performance metrics for FederationStateStore implementations.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+@Metrics(about = "Performance and usage metrics for Federation StateStore",
+ context = "fedr")
+public final class FederationStateStoreClientMetrics implements MetricsSource {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(FederationStateStoreClientMetrics.class);
+
+ private static final MetricsRegistry REGISTRY =
+ new MetricsRegistry("FederationStateStoreClientMetrics");
+ private final static Method[] STATESTORE_API_METHODS =
+ FederationStateStore.class.getMethods();
+
+ // Map method names to counter objects
+ private static final Map<String, MutableCounterLong> API_TO_FAILED_CALLS =
+ new HashMap<String, MutableCounterLong>();
+ private static final Map<String, MutableRate> API_TO_SUCCESSFUL_CALLS =
+ new HashMap<String, MutableRate>();
+
+ // Provide quantile latency for each api call.
+ private static final Map<String, MutableQuantiles> API_TO_QUANTILE_METRICS =
+ new HashMap<String, MutableQuantiles>();
+
+ // Error string templates for logging calls from methods not in
+ // FederationStateStore API
+ private static final String UNKOWN_FAIL_ERROR_MSG =
+ "Not recording failed call for unknown FederationStateStore method {}";
+ private static final String UNKNOWN_SUCCESS_ERROR_MSG =
+ "Not recording successful call for unknown "
+ + "FederationStateStore method {}";
+
+ // Aggregate metrics are shared, and don't have to be looked up per call
+ @Metric("Total number of successful calls and latency(ms)")
+ private static MutableRate totalSucceededCalls;
+
+ @Metric("Total number of failed StateStore calls")
+ private static MutableCounterLong totalFailedCalls;
+
+ // This after the static members are initialized, or the constructor will
+ // throw a NullPointerException
+ private static final FederationStateStoreClientMetrics S_INSTANCE =
+ DefaultMetricsSystem.instance()
+ .register(new FederationStateStoreClientMetrics());
+
+ synchronized public static FederationStateStoreClientMetrics getInstance() {
+ return S_INSTANCE;
+ }
+
+ private FederationStateStoreClientMetrics() {
+ // Create the metrics for each method and put them into the map
+ for (Method m : STATESTORE_API_METHODS) {
+ String methodName = m.getName();
+ LOG.debug("Registering Federation StateStore Client metrics for {}",
+ methodName);
+
+ // This metric only records the number of failed calls; it does not
+ // capture latency information
+ API_TO_FAILED_CALLS.put(methodName,
+ REGISTRY.newCounter(methodName + "_numFailedCalls",
+ "# failed calls to " + methodName, 0L));
+
+ // This metric records both the number and average latency of successful
+ // calls.
+ API_TO_SUCCESSFUL_CALLS.put(methodName,
+ REGISTRY.newRate(methodName + "_successfulCalls",
+ "# successful calls and latency(ms) for" + methodName));
+
+ // This metric records the quantile-based latency of each successful call,
+ // re-sampled every 10 seconds.
+ API_TO_QUANTILE_METRICS.put(methodName,
+ REGISTRY.newQuantiles(methodName + "Latency",
+ "Quantile latency (ms) for " + methodName, "ops", "latency", 10));
+ }
+ }
+
+ public static void failedStateStoreCall() {
+ String methodName =
+ Thread.currentThread().getStackTrace()[2].getMethodName();
+ MutableCounterLong methodMetric = API_TO_FAILED_CALLS.get(methodName);
+ if (methodMetric == null) {
+ LOG.error(UNKOWN_FAIL_ERROR_MSG, methodName);
+ return;
+ }
+
+ totalFailedCalls.incr();
+ methodMetric.incr();
+ }
+
+ public static void succeededStateStoreCall(long duration) {
+ String methodName =
+ Thread.currentThread().getStackTrace()[2].getMethodName();
+ MutableRate methodMetric = API_TO_SUCCESSFUL_CALLS.get(methodName);
+ MutableQuantiles methodQuantileMetric =
+ API_TO_QUANTILE_METRICS.get(methodName);
+ if (methodMetric == null || methodQuantileMetric == null) {
+ LOG.error(UNKNOWN_SUCCESS_ERROR_MSG, methodName);
+ return;
+ }
+
+ totalSucceededCalls.add(duration);
+ methodMetric.add(duration);
+ methodQuantileMetric.add(duration);
+ }
+
+ @Override
+ public void getMetrics(MetricsCollector collector, boolean all) {
+ REGISTRY.snapshot(collector.addRecord(REGISTRY.info()), all);
+ }
+
+ // Getters for unit testing
+ @VisibleForTesting
+ static long getNumFailedCallsForMethod(String methodName) {
+ return API_TO_FAILED_CALLS.get(methodName).value();
+ }
+
+ @VisibleForTesting
+ static long getNumSucceessfulCallsForMethod(String methodName) {
+ return API_TO_SUCCESSFUL_CALLS.get(methodName).lastStat().numSamples();
+ }
+
+ @VisibleForTesting
+ static double getLatencySucceessfulCallsForMethod(String methodName) {
+ return API_TO_SUCCESSFUL_CALLS.get(methodName).lastStat().mean();
+ }
+
+ @VisibleForTesting
+ static long getNumFailedCalls() {
+ return totalFailedCalls.value();
+ }
+
+ @VisibleForTesting
+ static long getNumSucceededCalls() {
+ return totalSucceededCalls.lastStat().numSamples();
+ }
+
+ @VisibleForTesting
+ static double getLatencySucceededCalls() {
+ return totalSucceededCalls.lastStat().mean();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac090b38/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/package-info.java
new file mode 100644
index 0000000..eb548f4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/package-info.java
@@ -0,0 +1,17 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.store.metrics;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac090b38/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/metrics/TestFederationStateStoreClientMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/metrics/TestFederationStateStoreClientMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/metrics/TestFederationStateStoreClientMetrics.java
new file mode 100644
index 0000000..241d5e2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/metrics/TestFederationStateStoreClientMetrics.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.metrics;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unittests for {@link FederationStateStoreClientMetrics}.
+ *
+ */
+public class TestFederationStateStoreClientMetrics {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(TestFederationStateStoreClientMetrics.class);
+
+ private MockBadFederationStateStore badStateStore =
+ new MockBadFederationStateStore();
+ private MockGoodFederationStateStore goodStateStore =
+ new MockGoodFederationStateStore();
+
+ @Test
+ public void testAggregateMetricInit() {
+ LOG.info("Test: aggregate metrics are initialized correctly");
+
+ Assert.assertEquals(0,
+ FederationStateStoreClientMetrics.getNumSucceededCalls());
+ Assert.assertEquals(0,
+ FederationStateStoreClientMetrics.getNumFailedCalls());
+
+ LOG.info("Test: aggregate metrics are updated correctly");
+ }
+
+ @Test
+ public void testSuccessfulCalls() {
+ LOG.info("Test: Aggregate and method successful calls updated correctly");
+
+ long totalGoodBefore =
+ FederationStateStoreClientMetrics.getNumSucceededCalls();
+ long apiGoodBefore = FederationStateStoreClientMetrics
+ .getNumSucceessfulCallsForMethod("registerSubCluster");
+
+ goodStateStore.registerSubCluster(100);
+
+ Assert.assertEquals(totalGoodBefore + 1,
+ FederationStateStoreClientMetrics.getNumSucceededCalls());
+ Assert.assertEquals(100,
+ FederationStateStoreClientMetrics.getLatencySucceededCalls(), 0);
+ Assert.assertEquals(apiGoodBefore + 1,
+ FederationStateStoreClientMetrics.getNumSucceededCalls());
+ Assert.assertEquals(100, FederationStateStoreClientMetrics
+ .getLatencySucceessfulCallsForMethod("registerSubCluster"), 0);
+
+ LOG.info("Test: Running stats correctly calculated for 2 metrics");
+
+ goodStateStore.registerSubCluster(200);
+
+ Assert.assertEquals(totalGoodBefore + 2,
+ FederationStateStoreClientMetrics.getNumSucceededCalls());
+ Assert.assertEquals(150,
+ FederationStateStoreClientMetrics.getLatencySucceededCalls(), 0);
+ Assert.assertEquals(apiGoodBefore + 2,
+ FederationStateStoreClientMetrics.getNumSucceededCalls());
+ Assert.assertEquals(150, FederationStateStoreClientMetrics
+ .getLatencySucceessfulCallsForMethod("registerSubCluster"), 0);
+
+ }
+
+ @Test
+ public void testFailedCalls() {
+
+ long totalBadbefore = FederationStateStoreClientMetrics.getNumFailedCalls();
+ long apiBadBefore = FederationStateStoreClientMetrics
+ .getNumFailedCallsForMethod("registerSubCluster");
+
+ badStateStore.registerSubCluster();
+
+ LOG.info("Test: Aggregate and method failed calls updated correctly");
+ Assert.assertEquals(totalBadbefore + 1,
+ FederationStateStoreClientMetrics.getNumFailedCalls());
+ Assert.assertEquals(apiBadBefore + 1, FederationStateStoreClientMetrics
+ .getNumFailedCallsForMethod("registerSubCluster"));
+
+ }
+
+ @Test
+ public void testCallsUnknownMethod() {
+
+ long totalBadbefore = FederationStateStoreClientMetrics.getNumFailedCalls();
+ long apiBadBefore = FederationStateStoreClientMetrics
+ .getNumFailedCallsForMethod("registerSubCluster");
+ long totalGoodBefore =
+ FederationStateStoreClientMetrics.getNumSucceededCalls();
+ long apiGoodBefore = FederationStateStoreClientMetrics
+ .getNumSucceessfulCallsForMethod("registerSubCluster");
+
+ LOG.info("Calling Metrics class directly");
+ FederationStateStoreClientMetrics.failedStateStoreCall();
+ FederationStateStoreClientMetrics.succeededStateStoreCall(100);
+
+ LOG.info("Test: Aggregate and method calls did not update");
+ Assert.assertEquals(totalBadbefore,
+ FederationStateStoreClientMetrics.getNumFailedCalls());
+ Assert.assertEquals(apiBadBefore, FederationStateStoreClientMetrics
+ .getNumFailedCallsForMethod("registerSubCluster"));
+
+ Assert.assertEquals(totalGoodBefore,
+ FederationStateStoreClientMetrics.getNumSucceededCalls());
+ Assert.assertEquals(apiGoodBefore, FederationStateStoreClientMetrics
+ .getNumSucceessfulCallsForMethod("registerSubCluster"));
+
+ }
+
+ // Records failures for all calls
+ private class MockBadFederationStateStore {
+ public void registerSubCluster() {
+ LOG.info("Mocked: failed registerSubCluster call");
+ FederationStateStoreClientMetrics.failedStateStoreCall();
+ }
+ }
+
+ // Records successes for all calls
+ private class MockGoodFederationStateStore {
+ public void registerSubCluster(long duration) {
+ LOG.info("Mocked: successful registerSubCluster call with duration {}",
+ duration);
+ FederationStateStoreClientMetrics.succeededStateStoreCall(duration);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org