You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2017/09/08 16:37:17 UTC
[2/2] hadoop git commit: HDFS-12335. Federation Metrics. Contributed
by Inigo Goiri.
HDFS-12335. Federation Metrics. Contributed by Inigo Goiri.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d522007c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d522007c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d522007c
Branch: refs/heads/HDFS-10467
Commit: d522007c7b3f48cc826347b36b7854645f991f2f
Parents: da858ca
Author: Inigo Goiri <in...@apache.org>
Authored: Fri Sep 8 09:37:10 2017 -0700
Committer: Inigo Goiri <in...@apache.org>
Committed: Fri Sep 8 09:37:10 2017 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 11 +
.../federation/metrics/FederationMBean.java | 204 ++++++
.../federation/metrics/FederationMetrics.java | 673 +++++++++++++++++++
.../federation/metrics/FederationRPCMBean.java | 90 +++
.../metrics/FederationRPCMetrics.java | 239 +++++++
.../FederationRPCPerformanceMonitor.java | 211 ++++++
.../federation/metrics/NamenodeBeanMetrics.java | 624 +++++++++++++++++
.../federation/metrics/StateStoreMBean.java | 45 ++
.../federation/metrics/StateStoreMetrics.java | 144 ++++
.../server/federation/metrics/package-info.java | 27 +
.../federation/router/ConnectionManager.java | 23 +
.../federation/router/ConnectionPool.java | 23 +
.../hdfs/server/federation/router/Router.java | 62 ++
.../server/federation/router/RouterMetrics.java | 73 ++
.../federation/router/RouterMetricsService.java | 108 +++
.../federation/router/RouterRpcClient.java | 39 +-
.../federation/router/RouterRpcMonitor.java | 95 +++
.../federation/router/RouterRpcServer.java | 63 +-
.../federation/store/CachedRecordStore.java | 8 +
.../federation/store/StateStoreService.java | 42 +-
.../store/driver/StateStoreDriver.java | 17 +-
.../driver/impl/StateStoreSerializableImpl.java | 6 +-
.../driver/impl/StateStoreZooKeeperImpl.java | 26 +
.../store/records/MembershipState.java | 2 +-
.../federation/store/records/MountTable.java | 23 +
.../records/impl/pb/MembershipStatePBImpl.java | 5 +-
.../src/main/resources/hdfs-default.xml | 19 +-
.../server/federation/FederationTestUtils.java | 13 +
.../server/federation/RouterConfigBuilder.java | 13 +
.../metrics/TestFederationMetrics.java | 237 +++++++
.../federation/metrics/TestMetricsBase.java | 150 +++++
.../server/federation/router/TestRouter.java | 23 +-
.../store/driver/TestStateStoreDriverBase.java | 69 ++
33 files changed, 3383 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 5fd0811..91f0dab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFau
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcMonitor;
+import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCPerformanceMonitor;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
@@ -1149,6 +1151,15 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
FEDERATION_ROUTER_PREFIX + "rpc.enable";
public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true;
+ public static final String DFS_ROUTER_METRICS_ENABLE =
+ FEDERATION_ROUTER_PREFIX + "metrics.enable";
+ public static final boolean DFS_ROUTER_METRICS_ENABLE_DEFAULT = true;
+ public static final String DFS_ROUTER_METRICS_CLASS =
+ FEDERATION_ROUTER_PREFIX + "metrics.class";
+ public static final Class<? extends RouterRpcMonitor>
+ DFS_ROUTER_METRICS_CLASS_DEFAULT =
+ FederationRPCPerformanceMonitor.class;
+
// HDFS Router heartbeat
public static final String DFS_ROUTER_HEARTBEAT_ENABLE =
FEDERATION_ROUTER_PREFIX + "heartbeat.enable";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMBean.java
new file mode 100644
index 0000000..43efb3c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMBean.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.metrics;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * JMX interface for the federation statistics.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface FederationMBean {
+
+ /**
+ * Get information about all the namenodes in the federation or null if
+ * failure.
+ * @return JSON with all the Namenodes.
+ */
+ String getNamenodes();
+
+ /**
+ * Get the latest info for each registered nameservice.
+ * @return JSON with all the nameservices.
+ */
+ String getNameservices();
+
+ /**
+ * Get the mount table for the federated filesystem or null if failure.
+ * @return JSON with the mount table.
+ */
+ String getMountTable();
+
+ /**
+ * Get the total capacity of the federated cluster.
+ * @return Total capacity of the federated cluster.
+ */
+ long getTotalCapacity();
+
+ /**
+ * Get the used capacity of the federated cluster.
+ * @return Used capacity of the federated cluster.
+ */
+ long getUsedCapacity();
+
+ /**
+ * Get the remaining capacity of the federated cluster.
+ * @return Remaining capacity of the federated cluster.
+ */
+ long getRemainingCapacity();
+
+ /**
+ * Get the number of nameservices in the federation.
+ * @return Number of nameservices in the federation.
+ */
+ int getNumNameservices();
+
+ /**
+ * Get the number of namenodes.
+ * @return Number of namenodes.
+ */
+ int getNumNamenodes();
+
+ /**
+ * Get the number of expired namenodes.
+ * @return Number of expired namenodes.
+ */
+ int getNumExpiredNamenodes();
+
+ /**
+ * Get the number of live datanodes.
+ * @return Number of live datanodes.
+ */
+ int getNumLiveNodes();
+
+ /**
+ * Get the number of dead datanodes.
+ * @return Number of dead datanodes.
+ */
+ int getNumDeadNodes();
+
+ /**
+ * Get the number of decommissioning datanodes.
+ * @return Number of decommissioning datanodes.
+ */
+ int getNumDecommissioningNodes();
+
+ /**
+ * Get the number of live decommissioned datanodes.
+ * @return Number of live decommissioned datanodes.
+ */
+ int getNumDecomLiveNodes();
+
+ /**
+ * Get the number of dead decommissioned datanodes.
+ * @return Number of dead decommissioned datanodes.
+ */
+ int getNumDecomDeadNodes();
+
+ /**
+ * Get Max, Median, Min and Standard Deviation of DataNodes usage.
+ * @return the DataNode usage information, as a JSON string.
+ */
+ String getNodeUsage();
+
+ /**
+ * Get the number of blocks in the federation.
+ * @return Number of blocks in the federation.
+ */
+ long getNumBlocks();
+
+ /**
+ * Get the number of missing blocks in the federation.
+ * @return Number of missing blocks in the federation.
+ */
+ long getNumOfMissingBlocks();
+
+ /**
+ * Get the number of pending replication blocks in the federation.
+ * @return Number of pending replication blocks in the federation.
+ */
+ long getNumOfBlocksPendingReplication();
+
+ /**
+ * Get the number of under replicated blocks in the federation.
+ * @return Number of under replicated blocks in the federation.
+ */
+ long getNumOfBlocksUnderReplicated();
+
+ /**
+ * Get the number of pending deletion blocks in the federation.
+ * @return Number of pending deletion blocks in the federation.
+ */
+ long getNumOfBlocksPendingDeletion();
+
+ /**
+ * Get the number of files in the federation.
+ * @return Number of files in the federation.
+ */
+ long getNumFiles();
+
+ /**
+ * When the router started.
+ * @return Date as a string the router started.
+ */
+ String getRouterStarted();
+
+ /**
+ * Get the version of the router.
+ * @return Version of the router.
+ */
+ String getVersion();
+
+ /**
+ * Get the compilation date of the router.
+ * @return Compilation date of the router.
+ */
+ String getCompiledDate();
+
+ /**
+ * Get the compilation info of the router.
+ * @return Compilation info of the router.
+ */
+ String getCompileInfo();
+
+ /**
+ * Get the host and port of the router.
+ * @return Host and port of the router.
+ */
+ String getHostAndPort();
+
+ /**
+ * Get the identifier of the router.
+ * @return Identifier of the router.
+ */
+ String getRouterId();
+
+ /**
+ * Get the host and port of the router.
+ * @return Host and port of the router.
+ */
+ String getClusterId();
+
+ /**
+ * Get the host and port of the router.
+ * @return Host and port of the router.
+ */
+ String getBlockPoolId();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java
new file mode 100644
index 0000000..1e80256
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java
@@ -0,0 +1,673 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.metrics;
+
+import static org.apache.hadoop.util.Time.now;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.ToIntFunction;
+import java.util.function.ToLongFunction;
+import java.util.stream.Collectors;
+
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
+import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.VersionInfo;
+import org.codehaus.jettison.json.JSONObject;
+import org.eclipse.jetty.util.ajax.JSON;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of the Router metrics collector.
+ */
+public class FederationMetrics implements FederationMBean {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(FederationMetrics.class);
+
+ /** Format for a date. */
+ private static final String DATE_FORMAT = "yyyy/MM/dd HH:mm:ss";
+
+
+ /** Router interface. */
+ private final Router router;
+
+ /** FederationState JMX bean. */
+ private ObjectName beanName;
+
+ /** Resolve the namenode for each namespace. */
+ private final ActiveNamenodeResolver namenodeResolver;
+
+ /** State store. */
+ private final StateStoreService stateStore;
+ /** Membership state store. */
+ private MembershipStore membershipStore;
+ /** Mount table store. */
+ private MountTableStore mountTableStore;
+
+
+ public FederationMetrics(Router router) throws IOException {
+ this.router = router;
+
+ try {
+ StandardMBean bean = new StandardMBean(this, FederationMBean.class);
+ this.beanName = MBeans.register("Router", "FederationState", bean);
+ LOG.info("Registered Router MBean: {}", this.beanName);
+ } catch (NotCompliantMBeanException e) {
+ throw new RuntimeException("Bad Router MBean setup", e);
+ }
+
+ // Resolve namenode for each nameservice
+ this.namenodeResolver = this.router.getNamenodeResolver();
+
+ // State store interfaces
+ this.stateStore = this.router.getStateStore();
+ if (this.stateStore == null) {
+ LOG.error("State store not available");
+ } else {
+ this.membershipStore = stateStore.getRegisteredRecordStore(
+ MembershipStore.class);
+ this.mountTableStore = stateStore.getRegisteredRecordStore(
+ MountTableStore.class);
+ }
+ }
+
+ /**
+ * Unregister the JMX beans.
+ */
+ public void close() {
+ if (this.beanName != null) {
+ MBeans.unregister(beanName);
+ }
+ }
+
+ @Override
+ public String getNamenodes() {
+ final Map<String, Map<String, Object>> info = new LinkedHashMap<>();
+ try {
+ // Get the values from the store
+ GetNamenodeRegistrationsRequest request =
+ GetNamenodeRegistrationsRequest.newInstance();
+ GetNamenodeRegistrationsResponse response =
+ membershipStore.getNamenodeRegistrations(request);
+
+ // Order the namenodes
+ final List<MembershipState> namenodes = response.getNamenodeMemberships();
+ if (namenodes == null || namenodes.size() == 0) {
+ return JSON.toString(info);
+ }
+ List<MembershipState> namenodesOrder = new ArrayList<>(namenodes);
+ Collections.sort(namenodesOrder, MembershipState.NAME_COMPARATOR);
+
+ // Dump namenodes information into JSON
+ for (MembershipState namenode : namenodesOrder) {
+ Map<String, Object> innerInfo = new HashMap<>();
+ Map<String, Object> map = getJson(namenode);
+ innerInfo.putAll(map);
+ long dateModified = namenode.getDateModified();
+ long lastHeartbeat = getSecondsSince(dateModified);
+ innerInfo.put("lastHeartbeat", lastHeartbeat);
+ MembershipStats stats = namenode.getStats();
+ long used = stats.getTotalSpace() - stats.getAvailableSpace();
+ innerInfo.put("used", used);
+ info.put(namenode.getNamenodeKey(),
+ Collections.unmodifiableMap(innerInfo));
+ }
+ } catch (IOException e) {
+ LOG.error("Enable to fetch json representation of namenodes {}",
+ e.getMessage());
+ return "{}";
+ }
+ return JSON.toString(info);
+ }
+
+ @Override
+ public String getNameservices() {
+ final Map<String, Map<String, Object>> info = new LinkedHashMap<>();
+ try {
+ final List<MembershipState> namenodes = getActiveNamenodeRegistrations();
+ List<MembershipState> namenodesOrder = new ArrayList<>(namenodes);
+ Collections.sort(namenodesOrder, MembershipState.NAME_COMPARATOR);
+
+ // Dump namenodes information into JSON
+ for (MembershipState namenode : namenodesOrder) {
+ Map<String, Object> innerInfo = new HashMap<>();
+ Map<String, Object> map = getJson(namenode);
+ innerInfo.putAll(map);
+ long dateModified = namenode.getDateModified();
+ long lastHeartbeat = getSecondsSince(dateModified);
+ innerInfo.put("lastHeartbeat", lastHeartbeat);
+ MembershipStats stats = namenode.getStats();
+ long used = stats.getTotalSpace() - stats.getAvailableSpace();
+ innerInfo.put("used", used);
+ info.put(namenode.getNamenodeKey(),
+ Collections.unmodifiableMap(innerInfo));
+ }
+ } catch (IOException e) {
+ LOG.error("Cannot retrieve nameservices for JMX: {}", e.getMessage());
+ return "{}";
+ }
+ return JSON.toString(info);
+ }
+
+ @Override
+ public String getMountTable() {
+ final List<Map<String, Object>> info = new LinkedList<>();
+
+ try {
+ // Get all the mount points in order
+ GetMountTableEntriesRequest request =
+ GetMountTableEntriesRequest.newInstance("/");
+ GetMountTableEntriesResponse response =
+ mountTableStore.getMountTableEntries(request);
+ final List<MountTable> mounts = response.getEntries();
+ List<MountTable> orderedMounts = new ArrayList<>(mounts);
+ Collections.sort(orderedMounts, MountTable.SOURCE_COMPARATOR);
+
+ // Dump mount table entries information into JSON
+ for (MountTable entry : orderedMounts) {
+ // Sumarize destinations
+ Set<String> nameservices = new LinkedHashSet<>();
+ Set<String> paths = new LinkedHashSet<>();
+ for (RemoteLocation location : entry.getDestinations()) {
+ nameservices.add(location.getNameserviceId());
+ paths.add(location.getDest());
+ }
+
+ Map<String, Object> map = getJson(entry);
+ // We add some values with a cleaner format
+ map.put("dateCreated", getDateString(entry.getDateCreated()));
+ map.put("dateModified", getDateString(entry.getDateModified()));
+
+ Map<String, Object> innerInfo = new HashMap<>();
+ innerInfo.putAll(map);
+ innerInfo.put("nameserviceId", StringUtils.join(",", nameservices));
+ innerInfo.put("path", StringUtils.join(",", paths));
+ if (nameservices.size() > 1) {
+ innerInfo.put("order", entry.getDestOrder().toString());
+ } else {
+ innerInfo.put("order", "");
+ }
+ innerInfo.put("readonly", entry.isReadOnly());
+ info.add(Collections.unmodifiableMap(innerInfo));
+ }
+ } catch (IOException e) {
+ LOG.error(
+ "Cannot generate JSON of mount table from store: {}", e.getMessage());
+ return "[]";
+ }
+ return JSON.toString(info);
+ }
+
+ @Override
+ public long getTotalCapacity() {
+ return getNameserviceAggregatedLong(MembershipStats::getTotalSpace);
+ }
+
+ @Override
+ public long getRemainingCapacity() {
+ return getNameserviceAggregatedLong(MembershipStats::getAvailableSpace);
+ }
+
+ @Override
+ public long getUsedCapacity() {
+ return getTotalCapacity() - getRemainingCapacity();
+ }
+
+ @Override
+ public int getNumNameservices() {
+ try {
+ Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+ return nss.size();
+ } catch (IOException e) {
+ LOG.error(
+ "Cannot fetch number of expired registrations from the store: {}",
+ e.getMessage());
+ return 0;
+ }
+ }
+
+ @Override
+ public int getNumNamenodes() {
+ try {
+ GetNamenodeRegistrationsRequest request =
+ GetNamenodeRegistrationsRequest.newInstance();
+ GetNamenodeRegistrationsResponse response =
+ membershipStore.getNamenodeRegistrations(request);
+ List<MembershipState> memberships = response.getNamenodeMemberships();
+ return memberships.size();
+ } catch (IOException e) {
+ LOG.error("Cannot retrieve numNamenodes for JMX: {}", e.getMessage());
+ return 0;
+ }
+ }
+
+ @Override
+ public int getNumExpiredNamenodes() {
+ try {
+ GetNamenodeRegistrationsRequest request =
+ GetNamenodeRegistrationsRequest.newInstance();
+ GetNamenodeRegistrationsResponse response =
+ membershipStore.getExpiredNamenodeRegistrations(request);
+ List<MembershipState> expiredMemberships =
+ response.getNamenodeMemberships();
+ return expiredMemberships.size();
+ } catch (IOException e) {
+ LOG.error(
+ "Cannot retrieve numExpiredNamenodes for JMX: {}", e.getMessage());
+ return 0;
+ }
+ }
+
+ @Override
+ public int getNumLiveNodes() {
+ return getNameserviceAggregatedInt(
+ MembershipStats::getNumOfActiveDatanodes);
+ }
+
+ @Override
+ public int getNumDeadNodes() {
+ return getNameserviceAggregatedInt(MembershipStats::getNumOfDeadDatanodes);
+ }
+
+ @Override
+ public int getNumDecommissioningNodes() {
+ return getNameserviceAggregatedInt(
+ MembershipStats::getNumOfDecommissioningDatanodes);
+ }
+
+ @Override
+ public int getNumDecomLiveNodes() {
+ return getNameserviceAggregatedInt(
+ MembershipStats::getNumOfDecomActiveDatanodes);
+ }
+
+ @Override
+ public int getNumDecomDeadNodes() {
+ return getNameserviceAggregatedInt(
+ MembershipStats::getNumOfDecomDeadDatanodes);
+ }
+
+ @Override // NameNodeMXBean
+ public String getNodeUsage() {
+ float median = 0;
+ float max = 0;
+ float min = 0;
+ float dev = 0;
+
+ final Map<String, Map<String, Object>> info = new HashMap<>();
+ try {
+ RouterRpcServer rpcServer = this.router.getRpcServer();
+ DatanodeInfo[] live =
+ rpcServer.getDatanodeReport(DatanodeReportType.LIVE);
+
+ if (live.length > 0) {
+ float totalDfsUsed = 0;
+ float[] usages = new float[live.length];
+ int i = 0;
+ for (DatanodeInfo dn : live) {
+ usages[i++] = dn.getDfsUsedPercent();
+ totalDfsUsed += dn.getDfsUsedPercent();
+ }
+ totalDfsUsed /= live.length;
+ Arrays.sort(usages);
+ median = usages[usages.length / 2];
+ max = usages[usages.length - 1];
+ min = usages[0];
+
+ for (i = 0; i < usages.length; i++) {
+ dev += (usages[i] - totalDfsUsed) * (usages[i] - totalDfsUsed);
+ }
+ dev = (float) Math.sqrt(dev / usages.length);
+ }
+ } catch (IOException e) {
+ LOG.info("Cannot get the live nodes: {}", e.getMessage());
+ }
+
+ final Map<String, Object> innerInfo = new HashMap<>();
+ innerInfo.put("min", StringUtils.format("%.2f%%", min));
+ innerInfo.put("median", StringUtils.format("%.2f%%", median));
+ innerInfo.put("max", StringUtils.format("%.2f%%", max));
+ innerInfo.put("stdDev", StringUtils.format("%.2f%%", dev));
+ info.put("nodeUsage", innerInfo);
+
+ return JSON.toString(info);
+ }
+
+ @Override
+ public long getNumBlocks() {
+ return getNameserviceAggregatedLong(MembershipStats::getNumOfBlocks);
+ }
+
+ @Override
+ public long getNumOfMissingBlocks() {
+ return getNameserviceAggregatedLong(MembershipStats::getNumOfBlocksMissing);
+ }
+
+ @Override
+ public long getNumOfBlocksPendingReplication() {
+ return getNameserviceAggregatedLong(
+ MembershipStats::getNumOfBlocksPendingReplication);
+ }
+
+ @Override
+ public long getNumOfBlocksUnderReplicated() {
+ return getNameserviceAggregatedLong(
+ MembershipStats::getNumOfBlocksUnderReplicated);
+ }
+
+ @Override
+ public long getNumOfBlocksPendingDeletion() {
+ return getNameserviceAggregatedLong(
+ MembershipStats::getNumOfBlocksPendingDeletion);
+ }
+
+ @Override
+ public long getNumFiles() {
+ return getNameserviceAggregatedLong(MembershipStats::getNumOfFiles);
+ }
+
+ @Override
+ public String getRouterStarted() {
+ long startTime = this.router.getStartTime();
+ return new Date(startTime).toString();
+ }
+
+ @Override
+ public String getVersion() {
+ return VersionInfo.getVersion() + ", r" + VersionInfo.getRevision();
+ }
+
+ @Override
+ public String getCompiledDate() {
+ return VersionInfo.getDate();
+ }
+
+ @Override
+ public String getCompileInfo() {
+ return VersionInfo.getDate() + " by " + VersionInfo.getUser() + " from "
+ + VersionInfo.getBranch();
+ }
+
+ @Override
+ public String getHostAndPort() {
+ // TODO this should be the HTTP address
+ return "Unknown";
+ }
+
+ @Override
+ public String getRouterId() {
+ return this.router.getRouterId();
+ }
+
+ @Override
+ public String getClusterId() {
+ try {
+ Collection<String> clusterIds =
+ getNamespaceInfo(FederationNamespaceInfo::getClusterId);
+ return clusterIds.toString();
+ } catch (IOException e) {
+ LOG.error("Cannot fetch cluster ID metrics: {}", e.getMessage());
+ return "";
+ }
+ }
+
+ @Override
+ public String getBlockPoolId() {
+ try {
+ Collection<String> blockpoolIds =
+ getNamespaceInfo(FederationNamespaceInfo::getBlockPoolId);
+ return blockpoolIds.toString();
+ } catch (IOException e) {
+ LOG.error("Cannot fetch block pool ID metrics: {}", e.getMessage());
+ return "";
+ }
+ }
+
+ /**
+ * Build a set of unique values found in all namespaces.
+ *
+ * @param f Method reference of the appropriate FederationNamespaceInfo
+ * getter function
+ * @return Set of unique string values found in all discovered namespaces.
+ * @throws IOException if the query could not be executed.
+ */
+ private Collection<String> getNamespaceInfo(
+ Function<FederationNamespaceInfo, String> f) throws IOException {
+ GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance();
+ GetNamespaceInfoResponse response =
+ membershipStore.getNamespaceInfo(request);
+ return response.getNamespaceInfo().stream()
+ .map(f)
+ .collect(Collectors.toSet());
+ }
+
+ /**
+ * Get the aggregated value for a method for all nameservices.
+ * @param f Method reference
+ * @return Aggregated integer.
+ */
+ private int getNameserviceAggregatedInt(ToIntFunction<MembershipStats> f) {
+ try {
+ return getActiveNamenodeRegistrations().stream()
+ .map(MembershipState::getStats)
+ .collect(Collectors.summingInt(f));
+ } catch (IOException e) {
+ LOG.error("Unable to extract metrics: {}", e.getMessage());
+ return 0;
+ }
+ }
+
+ /**
+ * Get the aggregated value for a method for all nameservices.
+ * @param f Method reference
+ * @return Aggregated long.
+ */
+ private long getNameserviceAggregatedLong(ToLongFunction<MembershipStats> f) {
+ try {
+ return getActiveNamenodeRegistrations().stream()
+ .map(MembershipState::getStats)
+ .collect(Collectors.summingLong(f));
+ } catch (IOException e) {
+ LOG.error("Unable to extract metrics: {}", e.getMessage());
+ return 0;
+ }
+ }
+
+ /**
+ * Fetches the most active namenode memberships for all known nameservices.
+ * The fetched membership may not or may not be active. Excludes expired
+ * memberships.
+ * @throws IOException if the query could not be performed.
+ * @return List of the most active NNs from each known nameservice.
+ */
+ private List<MembershipState> getActiveNamenodeRegistrations()
+ throws IOException {
+
+ List<MembershipState> resultList = new ArrayList<>();
+ GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance();
+ GetNamespaceInfoResponse response =
+ membershipStore.getNamespaceInfo(request);
+ for (FederationNamespaceInfo nsInfo : response.getNamespaceInfo()) {
+ // Fetch the most recent namenode registration
+ String nsId = nsInfo.getNameserviceId();
+ List<? extends FederationNamenodeContext> nns =
+ namenodeResolver.getNamenodesForNameserviceId(nsId);
+ if (nns != null) {
+ FederationNamenodeContext nn = nns.get(0);
+ if (nn != null && nn instanceof MembershipState) {
+ resultList.add((MembershipState) nn);
+ }
+ }
+ }
+ return resultList;
+ }
+
+ /**
+ * Get time as a date string.
+ * @param time Seconds since 1970.
+ * @return String representing the date.
+ */
+ private static String getDateString(long time) {
+ if (time <= 0) {
+ return "-";
+ }
+ Date date = new Date(time);
+
+ SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
+ return sdf.format(date);
+ }
+
+ /**
+ * Get the number of seconds passed since a date.
+ *
+ * @param timeMs to use as a reference.
+ * @return Seconds since the date.
+ */
+ private static long getSecondsSince(long timeMs) {
+ if (timeMs < 0) {
+ return -1;
+ }
+ return (now() - timeMs) / 1000;
+ }
+
+ /**
+ * Get JSON for this record.
+ *
+ * @return Map representing the data for the JSON representation.
+ */
+ private static Map<String, Object> getJson(BaseRecord record) {
+ Map<String, Object> json = new HashMap<>();
+ Map<String, Class<?>> fields = getFields(record);
+
+ for (String fieldName : fields.keySet()) {
+ if (!fieldName.equalsIgnoreCase("proto")) {
+ try {
+ Object value = getField(record, fieldName);
+ if (value instanceof BaseRecord) {
+ BaseRecord recordField = (BaseRecord) value;
+ json.putAll(getJson(recordField));
+ } else {
+ json.put(fieldName, value == null ? JSONObject.NULL : value);
+ }
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ "Cannot serialize field " + fieldName + " into JSON");
+ }
+ }
+ }
+ return json;
+ }
+
+ /**
+ * Returns all serializable fields in the object.
+ *
+ * @return Map with the fields.
+ */
+ private static Map<String, Class<?>> getFields(BaseRecord record) {
+ Map<String, Class<?>> getters = new HashMap<>();
+ for (Method m : record.getClass().getDeclaredMethods()) {
+ if (m.getName().startsWith("get")) {
+ try {
+ Class<?> type = m.getReturnType();
+ char[] c = m.getName().substring(3).toCharArray();
+ c[0] = Character.toLowerCase(c[0]);
+ String key = new String(c);
+ getters.put(key, type);
+ } catch (Exception e) {
+ LOG.error("Cannot execute getter {} on {}", m.getName(), record);
+ }
+ }
+ }
+ return getters;
+ }
+
+ /**
+ * Fetches the value for a field name.
+ *
+ * @param fieldName the legacy name of the field.
+ * @return The field data or null if not found.
+ */
+ private static Object getField(BaseRecord record, String fieldName) {
+ Object result = null;
+ Method m = locateGetter(record, fieldName);
+ if (m != null) {
+ try {
+ result = m.invoke(record);
+ } catch (Exception e) {
+ LOG.error("Cannot get field {} on {}", fieldName, record);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Finds the appropriate getter for a field name.
+ *
+ * @param fieldName The legacy name of the field.
+ * @return The matching getter or null if not found.
+ */
+ private static Method locateGetter(BaseRecord record, String fieldName) {
+ for (Method m : record.getClass().getMethods()) {
+ if (m.getName().equalsIgnoreCase("get" + fieldName)) {
+ return m;
+ }
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
new file mode 100644
index 0000000..00209e9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.metrics;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * JMX interface for the RPC server.
+ * TODO use the default RPC MBean.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface FederationRPCMBean {
+
+ long getProxyOps();
+
+ double getProxyAvg();
+
+ long getProcessingOps();
+
+ double getProcessingAvg();
+
+ long getProxyOpFailureCommunicate();
+
+ long getProxyOpFailureStandby();
+
+ long getProxyOpNotImplemented();
+
+ long getRouterFailureStateStoreOps();
+
+ long getRouterFailureReadOnlyOps();
+
+ long getRouterFailureLockedOps();
+
+ long getRouterFailureSafemodeOps();
+
+ int getRpcServerCallQueue();
+
+ /**
+ * Get the number of RPC connections between the clients and the Router.
+ * @return Number of RPC connections between the clients and the Router.
+ */
+ int getRpcServerNumOpenConnections();
+
+ /**
+ * Get the number of RPC connections between the Router and the NNs.
+ * @return Number of RPC connections between the Router and the NNs.
+ */
+ int getRpcClientNumConnections();
+
+ /**
+ * Get the number of active RPC connections between the Router and the NNs.
+ * @return Number of active RPC connections between the Router and the NNs.
+ */
+ int getRpcClientNumActiveConnections();
+
+ /**
+ * Get the number of RPC connections to be created.
+ * @return Number of RPC connections to be created.
+ */
+ int getRpcClientNumCreatingConnections();
+
+ /**
+ * Get the number of connection pools between the Router and a NNs.
+ * @return Number of connection pools between the Router and a NNs.
+ */
+ int getRpcClientNumConnectionPools();
+
+ /**
+ * JSON representation of the RPC connections from the Router to the NNs.
+ * @return JSON string representation.
+ */
+ String getRpcClientConnections();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
new file mode 100644
index 0000000..427bca2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.metrics;
+
+import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
+import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableRate;
+
+/**
+ * Implementation of the RPC metrics collector.
+ */
+@Metrics(name = "RouterRPCActivity", about = "Router RPC Activity",
+ context = "router")
+public class FederationRPCMetrics implements FederationRPCMBean {
+
+ private final MetricsRegistry registry = new MetricsRegistry("router");
+
+ private RouterRpcServer rpcServer;
+
+ @Metric("Time for the router to process an operation internally")
+ private MutableRate processing;
+ @Metric("Number of operations the Router processed internally")
+ private MutableCounterLong processingOp;
+ @Metric("Time for the Router to proxy an operation to the Namenodes")
+ private MutableRate proxy;
+ @Metric("Number of operations the Router proxied to a Namenode")
+ private MutableCounterLong proxyOp;
+
+ @Metric("Number of operations to fail to reach NN")
+ private MutableCounterLong proxyOpFailureStandby;
+ @Metric("Number of operations to hit a standby NN")
+ private MutableCounterLong proxyOpFailureCommunicate;
+ @Metric("Number of operations not implemented")
+ private MutableCounterLong proxyOpNotImplemented;
+
+ @Metric("Failed requests due to State Store unavailable")
+ private MutableCounterLong routerFailureStateStore;
+ @Metric("Failed requests due to read only mount point")
+ private MutableCounterLong routerFailureReadOnly;
+ @Metric("Failed requests due to locked path")
+ private MutableCounterLong routerFailureLocked;
+ @Metric("Failed requests due to safe mode")
+ private MutableCounterLong routerFailureSafemode;
+
+ public FederationRPCMetrics(Configuration conf, RouterRpcServer rpcServer) {
+ this.rpcServer = rpcServer;
+
+ registry.tag(SessionId, "RouterRPCSession");
+ registry.tag(ProcessName, "Router");
+ }
+
+ public static FederationRPCMetrics create(Configuration conf,
+ RouterRpcServer rpcServer) {
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ return ms.register(FederationRPCMetrics.class.getName(),
+ "HDFS Federation RPC Metrics",
+ new FederationRPCMetrics(conf, rpcServer));
+ }
+
+ /**
+ * Convert nanoseconds to milliseconds.
+ * @param ns Time in nanoseconds.
+ * @return Time in milliseconds.
+ */
+ private static double toMs(double ns) {
+ return ns / 1000000;
+ }
+
+ /**
+ * Reset the metrics system.
+ */
+ public static void reset() {
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ ms.unregisterSource(FederationRPCMetrics.class.getName());
+ }
+
+ public void incrProxyOpFailureStandby() {
+ proxyOpFailureStandby.incr();
+ }
+
+ @Override
+ public long getProxyOpFailureStandby() {
+ return proxyOpFailureStandby.value();
+ }
+
+ public void incrProxyOpFailureCommunicate() {
+ proxyOpFailureCommunicate.incr();
+ }
+
+ @Override
+ public long getProxyOpFailureCommunicate() {
+ return proxyOpFailureCommunicate.value();
+ }
+
+
+ public void incrProxyOpNotImplemented() {
+ proxyOpNotImplemented.incr();
+ }
+
+ @Override
+ public long getProxyOpNotImplemented() {
+ return proxyOpNotImplemented.value();
+ }
+
+ public void incrRouterFailureStateStore() {
+ routerFailureStateStore.incr();
+ }
+
+ @Override
+ public long getRouterFailureStateStoreOps() {
+ return routerFailureStateStore.value();
+ }
+
+ public void incrRouterFailureSafemode() {
+ routerFailureSafemode.incr();
+ }
+
+ @Override
+ public long getRouterFailureSafemodeOps() {
+ return routerFailureSafemode.value();
+ }
+
+ public void incrRouterFailureReadOnly() {
+ routerFailureReadOnly.incr();
+ }
+
+ @Override
+ public long getRouterFailureReadOnlyOps() {
+ return routerFailureReadOnly.value();
+ }
+
+ public void incrRouterFailureLocked() {
+ routerFailureLocked.incr();
+ }
+
+ @Override
+ public long getRouterFailureLockedOps() {
+ return routerFailureLocked.value();
+ }
+
+ @Override
+ public int getRpcServerCallQueue() {
+ return rpcServer.getServer().getCallQueueLen();
+ }
+
+ @Override
+ public int getRpcServerNumOpenConnections() {
+ return rpcServer.getServer().getNumOpenConnections();
+ }
+
+ @Override
+ public int getRpcClientNumConnections() {
+ return rpcServer.getRPCClient().getNumConnections();
+ }
+
+ @Override
+ public int getRpcClientNumActiveConnections() {
+ return rpcServer.getRPCClient().getNumActiveConnections();
+ }
+
+ @Override
+ public int getRpcClientNumCreatingConnections() {
+ return rpcServer.getRPCClient().getNumCreatingConnections();
+ }
+
+ @Override
+ public int getRpcClientNumConnectionPools() {
+ return rpcServer.getRPCClient().getNumConnectionPools();
+ }
+
+ @Override
+ public String getRpcClientConnections() {
+ return rpcServer.getRPCClient().getJSON();
+ }
+
+ /**
+ * Add the time to proxy an operation from the moment the Router sends it to
+ * the Namenode until it replied.
+ * @param time Proxy time of an operation in nanoseconds.
+ */
+ public void addProxyTime(long time) {
+ proxy.add(time);
+ proxyOp.incr();
+ }
+
+ @Override
+ public double getProxyAvg() {
+ return toMs(proxy.lastStat().mean());
+ }
+
+ @Override
+ public long getProxyOps() {
+ return proxyOp.value();
+ }
+
+ /**
+ * Add the time to process a request in the Router from the time we receive
+ * the call until we send it to the Namenode.
+ * @param time Process time of an operation in nanoseconds.
+ */
+ public void addProcessingTime(long time) {
+ processing.add(time);
+ processingOp.incr();
+ }
+
+ @Override
+ public double getProcessingAvg() {
+ return toMs(processing.lastStat().mean());
+ }
+
+ @Override
+ public long getProcessingOps() {
+ return processingOp.value();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java
new file mode 100644
index 0000000..e3a16b5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.metrics;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcMonitor;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.metrics2.util.MBeans;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Customizable RPC performance monitor. Receives events from the RPC server
+ * and aggregates them via JMX.
+ */
+public class FederationRPCPerformanceMonitor implements RouterRpcMonitor {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(FederationRPCPerformanceMonitor.class);
+
+
+ /** Time for an operation to be received in the Router. */
+ private static final ThreadLocal<Long> START_TIME = new ThreadLocal<>();
+ /** Time for an operation to be send to the Namenode. */
+ private static final ThreadLocal<Long> PROXY_TIME = new ThreadLocal<>();
+
+ /** Configuration for the performance monitor. */
+ private Configuration conf;
+ /** RPC server for the Router. */
+ private RouterRpcServer server;
+ /** State Store. */
+ private StateStoreService store;
+
+ /** JMX interface to monitor the RPC metrics. */
+ private FederationRPCMetrics metrics;
+ private ObjectName registeredBean;
+
+ /** Thread pool for logging stats. */
+ private ExecutorService executor;
+
+
+ @Override
+ public void init(Configuration configuration, RouterRpcServer rpcServer,
+ StateStoreService stateStore) {
+
+ this.conf = configuration;
+ this.server = rpcServer;
+ this.store = stateStore;
+
+ // Create metrics
+ this.metrics = FederationRPCMetrics.create(conf, server);
+
+ // Create thread pool
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setNameFormat("Federation RPC Performance Monitor-%d").build();
+ this.executor = Executors.newFixedThreadPool(1, threadFactory);
+
+ // Adding JMX interface
+ try {
+ StandardMBean bean =
+ new StandardMBean(this.metrics, FederationRPCMBean.class);
+ registeredBean = MBeans.register("Router", "FederationRPC", bean);
+ LOG.info("Registered FederationRPCMBean: {}", registeredBean);
+ } catch (NotCompliantMBeanException e) {
+ throw new RuntimeException("Bad FederationRPCMBean setup", e);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (registeredBean != null) {
+ MBeans.unregister(registeredBean);
+ registeredBean = null;
+ }
+ if (this.executor != null) {
+ this.executor.shutdown();
+ }
+ }
+
+ /**
+ * Resets all RPC service performance counters to their defaults.
+ */
+ public void resetPerfCounters() {
+ if (registeredBean != null) {
+ MBeans.unregister(registeredBean);
+ registeredBean = null;
+ }
+ if (metrics != null) {
+ FederationRPCMetrics.reset();
+ metrics = null;
+ }
+ init(conf, server, store);
+ }
+
+ @Override
+ public void startOp() {
+ START_TIME.set(this.getNow());
+ }
+
+ @Override
+ public long proxyOp() {
+ PROXY_TIME.set(this.getNow());
+ long processingTime = getProcessingTime();
+ if (processingTime >= 0) {
+ metrics.addProcessingTime(processingTime);
+ }
+ return Thread.currentThread().getId();
+ }
+
+ @Override
+ public void proxyOpComplete(boolean success) {
+ if (success) {
+ long proxyTime = getProxyTime();
+ if (proxyTime >= 0) {
+ metrics.addProxyTime(proxyTime);
+ }
+ }
+ }
+
+ @Override
+ public void proxyOpFailureStandby() {
+ metrics.incrProxyOpFailureStandby();
+ }
+
+ @Override
+ public void proxyOpFailureCommunicate() {
+ metrics.incrProxyOpFailureCommunicate();
+ }
+
+ @Override
+ public void proxyOpNotImplemented() {
+ metrics.incrProxyOpNotImplemented();
+ }
+
+ @Override
+ public void routerFailureStateStore() {
+ metrics.incrRouterFailureStateStore();
+ }
+
+ @Override
+ public void routerFailureSafemode() {
+ metrics.incrRouterFailureSafemode();
+ }
+
+ @Override
+ public void routerFailureReadOnly() {
+ metrics.incrRouterFailureReadOnly();
+ }
+
+ @Override
+ public void routerFailureLocked() {
+ metrics.incrRouterFailureLocked();
+ }
+
+ /**
+ * Get current time.
+ * @return Current time in nanoseconds.
+ */
+ private long getNow() {
+ return System.nanoTime();
+ }
+
+ /**
+ * Get time between we receiving the operation and sending it to the Namenode.
+ * @return Processing time in nanoseconds.
+ */
+ private long getProcessingTime() {
+ if (START_TIME.get() != null && START_TIME.get() > 0 &&
+ PROXY_TIME.get() != null && PROXY_TIME.get() > 0) {
+ return PROXY_TIME.get() - START_TIME.get();
+ }
+ return -1;
+ }
+
+ /**
+ * Get time between now and when the operation was forwarded to the Namenode.
+ * @return Current proxy time in nanoseconds.
+ */
+ private long getProxyTime() {
+ if (PROXY_TIME.get() != null && PROXY_TIME.get() > 0) {
+ return getNow() - PROXY_TIME.get();
+ }
+ return -1;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java
new file mode 100644
index 0000000..23cd675
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java
@@ -0,0 +1,624 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.metrics;
+
+import static org.apache.hadoop.util.Time.now;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeMXBean;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeStatusMXBean;
+import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
+import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.VersionInfo;
+import org.eclipse.jetty.util.ajax.JSON;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Expose the Namenode metrics as the Router was one.
+ */
+public class NamenodeBeanMetrics
+ implements FSNamesystemMBean, NameNodeMXBean, NameNodeStatusMXBean {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(NamenodeBeanMetrics.class);
+
+ private final Router router;
+
+ /** FSNamesystem bean. */
+ private ObjectName fsBeanName;
+ /** FSNamesystemState bean. */
+ private ObjectName fsStateBeanName;
+ /** NameNodeInfo bean. */
+ private ObjectName nnInfoBeanName;
+ /** NameNodeStatus bean. */
+ private ObjectName nnStatusBeanName;
+
+
+ public NamenodeBeanMetrics(Router router) {
+ this.router = router;
+
+ try {
+ // TODO this needs to be done with the Metrics from FSNamesystem
+ StandardMBean bean = new StandardMBean(this, FSNamesystemMBean.class);
+ this.fsBeanName = MBeans.register("NameNode", "FSNamesystem", bean);
+ LOG.info("Registered FSNamesystem MBean: {}", this.fsBeanName);
+ } catch (NotCompliantMBeanException e) {
+ throw new RuntimeException("Bad FSNamesystem MBean setup", e);
+ }
+
+ try {
+ StandardMBean bean = new StandardMBean(this, FSNamesystemMBean.class);
+ this.fsStateBeanName =
+ MBeans.register("NameNode", "FSNamesystemState", bean);
+ LOG.info("Registered FSNamesystemState MBean: {}", this.fsStateBeanName);
+ } catch (NotCompliantMBeanException e) {
+ throw new RuntimeException("Bad FSNamesystemState MBean setup", e);
+ }
+
+ try {
+ StandardMBean bean = new StandardMBean(this, NameNodeMXBean.class);
+ this.nnInfoBeanName = MBeans.register("NameNode", "NameNodeInfo", bean);
+ LOG.info("Registered NameNodeInfo MBean: {}", this.nnInfoBeanName);
+ } catch (NotCompliantMBeanException e) {
+ throw new RuntimeException("Bad NameNodeInfo MBean setup", e);
+ }
+
+ try {
+ StandardMBean bean = new StandardMBean(this, NameNodeStatusMXBean.class);
+ this.nnStatusBeanName =
+ MBeans.register("NameNode", "NameNodeStatus", bean);
+ LOG.info("Registered NameNodeStatus MBean: {}", this.nnStatusBeanName);
+ } catch (NotCompliantMBeanException e) {
+ throw new RuntimeException("Bad NameNodeStatus MBean setup", e);
+ }
+ }
+
+ /**
+ * De-register the JMX interfaces.
+ */
+ public void close() {
+ if (fsStateBeanName != null) {
+ MBeans.unregister(fsStateBeanName);
+ fsStateBeanName = null;
+ }
+ if (nnInfoBeanName != null) {
+ MBeans.unregister(nnInfoBeanName);
+ nnInfoBeanName = null;
+ }
+ // Remove the NameNode status bean
+ if (nnStatusBeanName != null) {
+ MBeans.unregister(nnStatusBeanName);
+ nnStatusBeanName = null;
+ }
+ }
+
+ private FederationMetrics getFederationMetrics() {
+ return this.router.getMetrics();
+ }
+
+ /////////////////////////////////////////////////////////
+ // NameNodeMXBean
+ /////////////////////////////////////////////////////////
+
+ @Override
+ public String getVersion() {
+ return VersionInfo.getVersion() + ", r" + VersionInfo.getRevision();
+ }
+
+ @Override
+ public String getSoftwareVersion() {
+ return VersionInfo.getVersion();
+ }
+
+ @Override
+ public long getUsed() {
+ return getFederationMetrics().getUsedCapacity();
+ }
+
+ @Override
+ public long getFree() {
+ return getFederationMetrics().getRemainingCapacity();
+ }
+
+ @Override
+ public long getTotal() {
+ return getFederationMetrics().getTotalCapacity();
+ }
+
+ @Override
+ public String getSafemode() {
+ // We assume that the global federated view is never in safe mode
+ return "";
+ }
+
+ @Override
+ public boolean isUpgradeFinalized() {
+ // We assume the upgrade is always finalized in a federated biew
+ return true;
+ }
+
+ @Override
+ public RollingUpgradeInfo.Bean getRollingUpgradeStatus() {
+ return null;
+ }
+
+ @Override
+ public long getNonDfsUsedSpace() {
+ return 0;
+ }
+
+ @Override
+ public float getPercentUsed() {
+ return DFSUtilClient.getPercentUsed(getCapacityUsed(), getCapacityTotal());
+ }
+
+ @Override
+ public float getPercentRemaining() {
+ return DFSUtilClient.getPercentUsed(
+ getCapacityRemaining(), getCapacityTotal());
+ }
+
+ @Override
+ public long getCacheUsed() {
+ return 0;
+ }
+
+ @Override
+ public long getCacheCapacity() {
+ return 0;
+ }
+
+ @Override
+ public long getBlockPoolUsedSpace() {
+ return 0;
+ }
+
+ @Override
+ public float getPercentBlockPoolUsed() {
+ return 0;
+ }
+
+ @Override
+ public long getTotalBlocks() {
+ return getFederationMetrics().getNumBlocks();
+ }
+
+ @Override
+ public long getNumberOfMissingBlocks() {
+ return getFederationMetrics().getNumOfMissingBlocks();
+ }
+
+ @Override
+ @Deprecated
+ public long getPendingReplicationBlocks() {
+ return getFederationMetrics().getNumOfBlocksPendingReplication();
+ }
+
+ @Override
+ public long getPendingReconstructionBlocks() {
+ return getFederationMetrics().getNumOfBlocksPendingReplication();
+ }
+
+ @Override
+ @Deprecated
+ public long getUnderReplicatedBlocks() {
+ return getFederationMetrics().getNumOfBlocksUnderReplicated();
+ }
+
+ @Override
+ public long getLowRedundancyBlocks() {
+ return getFederationMetrics().getNumOfBlocksUnderReplicated();
+ }
+
+ @Override
+ public long getPendingDeletionBlocks() {
+ return getFederationMetrics().getNumOfBlocksPendingDeletion();
+ }
+
+ @Override
+ public long getScheduledReplicationBlocks() {
+ return -1;
+ }
+
+ @Override
+ public long getNumberOfMissingBlocksWithReplicationFactorOne() {
+ return 0;
+ }
+
+ @Override
+ public String getCorruptFiles() {
+ return "N/A";
+ }
+
+ @Override
+ public int getThreads() {
+ return ManagementFactory.getThreadMXBean().getThreadCount();
+ }
+
+ @Override
+ public String getLiveNodes() {
+ return this.getNodes(DatanodeReportType.LIVE);
+ }
+
+ @Override
+ public String getDeadNodes() {
+ return this.getNodes(DatanodeReportType.DEAD);
+ }
+
+ @Override
+ public String getDecomNodes() {
+ return this.getNodes(DatanodeReportType.DECOMMISSIONING);
+ }
+
+ /**
+ * Get all the nodes in the federation from a particular type.
+ * TODO this is expensive, we may want to cache it.
+ * @param type Type of the datanodes to check.
+ * @return JSON with the nodes.
+ */
+ private String getNodes(DatanodeReportType type) {
+ final Map<String, Map<String, Object>> info = new HashMap<>();
+ try {
+ RouterRpcServer rpcServer = this.router.getRpcServer();
+ DatanodeInfo[] datanodes = rpcServer.getDatanodeReport(type);
+ for (DatanodeInfo node : datanodes) {
+ Map<String, Object> innerinfo = new HashMap<>();
+ innerinfo.put("infoAddr", node.getInfoAddr());
+ innerinfo.put("infoSecureAddr", node.getInfoSecureAddr());
+ innerinfo.put("xferaddr", node.getXferAddr());
+ innerinfo.put("location", node.getNetworkLocation());
+ innerinfo.put("lastContact", getLastContact(node));
+ innerinfo.put("usedSpace", node.getDfsUsed());
+ innerinfo.put("adminState", node.getAdminState().toString());
+ innerinfo.put("nonDfsUsedSpace", node.getNonDfsUsed());
+ innerinfo.put("capacity", node.getCapacity());
+ innerinfo.put("numBlocks", -1); // node.numBlocks()
+ innerinfo.put("version", (node.getSoftwareVersion() == null ?
+ "UNKNOWN" : node.getSoftwareVersion()));
+ innerinfo.put("used", node.getDfsUsed());
+ innerinfo.put("remaining", node.getRemaining());
+ innerinfo.put("blockScheduled", -1); // node.getBlocksScheduled()
+ innerinfo.put("blockPoolUsed", node.getBlockPoolUsed());
+ innerinfo.put("blockPoolUsedPercent", node.getBlockPoolUsedPercent());
+ innerinfo.put("volfails", -1); // node.getVolumeFailures()
+ info.put(node.getHostName() + ":" + node.getXferPort(),
+ Collections.unmodifiableMap(innerinfo));
+ }
+ } catch (StandbyException e) {
+ LOG.error("Cannot get {} nodes, Router in safe mode", type);
+ } catch (IOException e) {
+ LOG.error("Cannot get " + type + " nodes", e);
+ }
+ return JSON.toString(info);
+ }
+
+ @Override
+ public String getClusterId() {
+ try {
+ return getNamespaceInfo(FederationNamespaceInfo::getClusterId).toString();
+ } catch (IOException e) {
+ LOG.error("Cannot fetch cluster ID metrics {}", e.getMessage());
+ return "";
+ }
+ }
+
+ @Override
+ public String getBlockPoolId() {
+ try {
+ return
+ getNamespaceInfo(FederationNamespaceInfo::getBlockPoolId).toString();
+ } catch (IOException e) {
+ LOG.error("Cannot fetch block pool ID metrics {}", e.getMessage());
+ return "";
+ }
+ }
+
+ /**
+ * Build a set of unique values found in all namespaces.
+ *
+ * @param f Method reference of the appropriate FederationNamespaceInfo
+ * getter function
+ * @return Set of unique string values found in all discovered namespaces.
+ * @throws IOException if the query could not be executed.
+ */
+ private Collection<String> getNamespaceInfo(
+ Function<FederationNamespaceInfo, String> f) throws IOException {
+ StateStoreService stateStore = router.getStateStore();
+ MembershipStore membershipStore =
+ stateStore.getRegisteredRecordStore(MembershipStore.class);
+
+ GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance();
+ GetNamespaceInfoResponse response =
+ membershipStore.getNamespaceInfo(request);
+ return response.getNamespaceInfo().stream()
+ .map(f)
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public String getNameDirStatuses() {
+ return "N/A";
+ }
+
+ @Override
+ public String getNodeUsage() {
+ return "N/A";
+ }
+
+ @Override
+ public String getNameJournalStatus() {
+ return "N/A";
+ }
+
+ @Override
+ public String getJournalTransactionInfo() {
+ return "N/A";
+ }
+
+ @Override
+ public long getNNStartedTimeInMillis() {
+ return this.router.getStartTime();
+ }
+
+ @Override
+ public String getCompileInfo() {
+ return VersionInfo.getDate() + " by " + VersionInfo.getUser() +
+ " from " + VersionInfo.getBranch();
+ }
+
+ @Override
+ public int getDistinctVersionCount() {
+ return 0;
+ }
+
+ @Override
+ public Map<String, Integer> getDistinctVersions() {
+ return null;
+ }
+
+ /////////////////////////////////////////////////////////
+ // FSNamesystemMBean
+ /////////////////////////////////////////////////////////
+
+ @Override
+ public String getFSState() {
+ // We assume is not in safe mode
+ return "Operational";
+ }
+
+ @Override
+ public long getBlocksTotal() {
+ return this.getTotalBlocks();
+ }
+
+ @Override
+ public long getCapacityTotal() {
+ return this.getTotal();
+ }
+
+ @Override
+ public long getCapacityRemaining() {
+ return this.getFree();
+ }
+
+ @Override
+ public long getCapacityUsed() {
+ return this.getUsed();
+ }
+
+ @Override
+ public long getFilesTotal() {
+ return getFederationMetrics().getNumFiles();
+ }
+
+ @Override
+ public int getTotalLoad() {
+ return -1;
+ }
+
+ @Override
+ public int getNumLiveDataNodes() {
+ return this.router.getMetrics().getNumLiveNodes();
+ }
+
+ @Override
+ public int getNumDeadDataNodes() {
+ return this.router.getMetrics().getNumDeadNodes();
+ }
+
+ @Override
+ public int getNumStaleDataNodes() {
+ return -1;
+ }
+
+ @Override
+ public int getNumDecomLiveDataNodes() {
+ return this.router.getMetrics().getNumDecomLiveNodes();
+ }
+
+ @Override
+ public int getNumDecomDeadDataNodes() {
+ return this.router.getMetrics().getNumDecomDeadNodes();
+ }
+
+ @Override
+ public int getNumDecommissioningDataNodes() {
+ return this.router.getMetrics().getNumDecommissioningNodes();
+ }
+
+ @Override
+ public int getNumInMaintenanceLiveDataNodes() {
+ return 0;
+ }
+
+ @Override
+ public int getNumInMaintenanceDeadDataNodes() {
+ return 0;
+ }
+
+ @Override
+ public int getNumEnteringMaintenanceDataNodes() {
+ return 0;
+ }
+
+ @Override
+ public int getVolumeFailuresTotal() {
+ return 0;
+ }
+
+ @Override
+ public long getEstimatedCapacityLostTotal() {
+ return 0;
+ }
+
+ @Override
+ public String getSnapshotStats() {
+ return null;
+ }
+
+ @Override
+ public long getMaxObjects() {
+ return 0;
+ }
+
+ @Override
+ public long getBlockDeletionStartTime() {
+ return -1;
+ }
+
+ @Override
+ public int getNumStaleStorages() {
+ return -1;
+ }
+
+ @Override
+ public String getTopUserOpCounts() {
+ return "N/A";
+ }
+
+ @Override
+ public int getFsLockQueueLength() {
+ return 0;
+ }
+
+ @Override
+ public long getTotalSyncCount() {
+ return 0;
+ }
+
+ @Override
+ public String getTotalSyncTimes() {
+ return "";
+ }
+
+ private long getLastContact(DatanodeInfo node) {
+ return (now() - node.getLastUpdate()) / 1000;
+ }
+
+ /////////////////////////////////////////////////////////
+ // NameNodeStatusMXBean
+ /////////////////////////////////////////////////////////
+
+ @Override
+ public String getNNRole() {
+ return NamenodeRole.NAMENODE.toString();
+ }
+
+ @Override
+ public String getState() {
+ return HAServiceState.ACTIVE.toString();
+ }
+
+ @Override
+ public String getHostAndPort() {
+ return NetUtils.getHostPortString(router.getRpcServerAddress());
+ }
+
+ @Override
+ public boolean isSecurityEnabled() {
+ return false;
+ }
+
+ @Override
+ public long getLastHATransitionTime() {
+ return 0;
+ }
+
+ @Override
+ public long getBytesWithFutureGenerationStamps() {
+ return 0;
+ }
+
+ @Override
+ public String getSlowPeersReport() {
+ return "N/A";
+ }
+
+ @Override
+ public String getSlowDisksReport() {
+ return "N/A";
+ }
+
+ @Override
+ public long getNumberOfSnapshottableDirs() {
+ return 0;
+ }
+
+ @Override
+ public String getEnteringMaintenanceNodes() {
+ return "N/A";
+ }
+
+ @Override
+ public String getNameDirSize() {
+ return "N/A";
+ }
+
+ @Override
+ public int getNumEncryptionZones() {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMBean.java
new file mode 100644
index 0000000..5e4ccab
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMBean.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.metrics;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * JMX interface for the State Store metrics.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface StateStoreMBean {
+
+ long getReadOps();
+
+ double getReadAvg();
+
+ long getWriteOps();
+
+ double getWriteAvg();
+
+ long getFailureOps();
+
+ double getFailureAvg();
+
+ long getRemoveOps();
+
+ double getRemoveAvg();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMetrics.java
new file mode 100644
index 0000000..c17eabc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMetrics.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.metrics;
+
+import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
+import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.metrics2.lib.MutableRate;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Implementations of the JMX interface for the State Store metrics.
+ */
+@Metrics(name = "StateStoreActivity", about = "Router metrics",
+ context = "router")
+public final class StateStoreMetrics implements StateStoreMBean {
+
+ private final MetricsRegistry registry = new MetricsRegistry("router");
+
+ @Metric("GET transactions")
+ private MutableRate reads;
+ @Metric("PUT transactions")
+ private MutableRate writes;
+ @Metric("REMOVE transactions")
+ private MutableRate removes;
+ @Metric("Failed transactions")
+ private MutableRate failures;
+
+ private Map<String, MutableGaugeInt> cacheSizes;
+
+ private StateStoreMetrics(Configuration conf) {
+ registry.tag(SessionId, "RouterSession");
+ registry.tag(ProcessName, "Router");
+ cacheSizes = new HashMap<>();
+ }
+
+ public static StateStoreMetrics create(Configuration conf) {
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ return ms.register(new StateStoreMetrics(conf));
+ }
+
+ public void shutdown() {
+ DefaultMetricsSystem.shutdown();
+ reset();
+ }
+
+ public void addRead(long latency) {
+ reads.add(latency);
+ }
+
+ public long getReadOps() {
+ return reads.lastStat().numSamples();
+ }
+
+ public double getReadAvg() {
+ return reads.lastStat().mean();
+ }
+
+ public void addWrite(long latency) {
+ writes.add(latency);
+ }
+
+ public long getWriteOps() {
+ return writes.lastStat().numSamples();
+ }
+
+ public double getWriteAvg() {
+ return writes.lastStat().mean();
+ }
+
+ public void addFailure(long latency) {
+ failures.add(latency);
+ }
+
+ public long getFailureOps() {
+ return failures.lastStat().numSamples();
+ }
+
+ public double getFailureAvg() {
+ return failures.lastStat().mean();
+ }
+
+ public void addRemove(long latency) {
+ removes.add(latency);
+ }
+
+ public long getRemoveOps() {
+ return removes.lastStat().numSamples();
+ }
+
+ public double getRemoveAvg() {
+ return removes.lastStat().mean();
+ }
+
+ /**
+ * Set the size of the cache for a State Store interface.
+ *
+ * @param name Name of the record to cache.
+ * @param size Number of records.
+ */
+ public void setCacheSize(String name, int size) {
+ String counterName = "Cache" + name + "Size";
+ MutableGaugeInt counter = cacheSizes.get(counterName);
+ if (counter == null) {
+ counter = registry.newGauge(counterName, name, size);
+ cacheSizes.put(counterName, counter);
+ }
+ counter.set(size);
+ }
+
+ @VisibleForTesting
+ public void reset() {
+ reads.resetMinMax();
+ writes.resetMinMax();
+ removes.resetMinMax();
+ failures.resetMinMax();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/package-info.java
new file mode 100644
index 0000000..c56c823
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/package-info.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Report metrics for Router-based Federation.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+package org.apache.hadoop.hdfs.server.federation.metrics;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
index d93d498..543d964 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
@@ -23,6 +23,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -35,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
+import org.eclipse.jetty.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -283,6 +285,27 @@ public class ConnectionManager {
}
/**
+ * Get a JSON representation of the connection pool.
+ *
+ * @return JSON representation of all the connection pools.
+ */
+ public String getJSON() {
+ final Map<String, String> info = new TreeMap<>();
+ readLock.lock();
+ try {
+ for (Entry<ConnectionPoolId, ConnectionPool> entry :
+ this.pools.entrySet()) {
+ ConnectionPoolId connectionPoolId = entry.getKey();
+ ConnectionPool pool = entry.getValue();
+ info.put(connectionPoolId.toString(), pool.getJSON());
+ }
+ } finally {
+ readLock.unlock();
+ }
+ return JSON.toString(info);
+ }
+
+ /**
* Removes stale connections not accessed recently from the pool. This is
* invoked periodically.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
index f76f621..ca113ef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
@@ -20,8 +20,10 @@ package org.apache.hadoop.hdfs.server.federation.router;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -46,6 +48,7 @@ import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
+import org.eclipse.jetty.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -258,6 +261,26 @@ public class ConnectionPool {
}
/**
+ * JSON representation of the connection pool.
+ *
+ * @return String representation of the JSON.
+ */
+ public String getJSON() {
+ final Map<String, String> info = new LinkedHashMap<>();
+ info.put("active", Integer.toString(getNumActiveConnections()));
+ info.put("total", Integer.toString(getNumConnections()));
+ if (LOG.isDebugEnabled()) {
+ List<ConnectionContext> tmpConnections = this.connections;
+ for (int i=0; i<tmpConnections.size(); i++) {
+ ConnectionContext connection = tmpConnections.get(i);
+ info.put(i + " active", Boolean.toString(connection.isActive()));
+ info.put(i + " closed", Boolean.toString(connection.isClosed()));
+ }
+ }
+ return JSON.toString(info);
+ }
+
+ /**
* Create a new proxy wrapper for a client NN connection.
* @return Proxy for the target ClientProtocol that contains the user's
* security context.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
index fcbd2eb..3ab5e2d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
@@ -36,10 +36,14 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.federation.metrics.FederationMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
@@ -96,6 +100,12 @@ public class Router extends CompositeService {
/** Updates the namenode status in the namenode resolver. */
private Collection<NamenodeHeartbeatService> namenodeHearbeatServices;
+ /** Router metrics. */
+ private RouterMetricsService metrics;
+
+ /** JVM pauses (GC and others). */
+ private JvmPauseMonitor pauseMonitor;
+
/** Usage string for help message. */
private static final String USAGE = "Usage: java Router";
@@ -174,18 +184,46 @@ public class Router extends CompositeService {
}
}
+ // Router metrics system
+ if (conf.getBoolean(
+ DFSConfigKeys.DFS_ROUTER_METRICS_ENABLE,
+ DFSConfigKeys.DFS_ROUTER_METRICS_ENABLE_DEFAULT)) {
+
+ DefaultMetricsSystem.initialize("Router");
+
+ this.metrics = new RouterMetricsService(this);
+ addService(this.metrics);
+
+ // JVM pause monitor
+ this.pauseMonitor = new JvmPauseMonitor();
+ this.pauseMonitor.init(conf);
+ }
+
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
+ if (this.pauseMonitor != null) {
+ this.pauseMonitor.start();
+ JvmMetrics jvmMetrics = this.metrics.getJvmMetrics();
+ if (jvmMetrics != null) {
+ jvmMetrics.setPauseMonitor(pauseMonitor);
+ }
+ }
+
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
+ // JVM pause monitor
+ if (this.pauseMonitor != null) {
+ this.pauseMonitor.stop();
+ }
+
super.serviceStop();
}
@@ -419,6 +457,30 @@ public class Router extends CompositeService {
}
/**
+ * Get the metrics system for the Router.
+ *
+ * @return Router metrics.
+ */
+ public RouterMetrics getRouterMetrics() {
+ if (this.metrics != null) {
+ return this.metrics.getRouterMetrics();
+ }
+ return null;
+ }
+
+ /**
+ * Get the federation metrics.
+ *
+ * @return Federation metrics.
+ */
+ public FederationMetrics getMetrics() {
+ if (this.metrics != null) {
+ return this.metrics.getFederationMetrics();
+ }
+ return null;
+ }
+
+ /**
* Get the subcluster resolver for files.
*
* @return Subcluster resolver for files.
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org