You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2017/09/08 16:37:16 UTC

[1/2] hadoop git commit: HDFS-12335. Federation Metrics. Contributed by Inigo Goiri.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-10467 da858ca36 -> d522007c7


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetrics.java
new file mode 100644
index 0000000..851538a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetrics.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
+import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
+
+/**
+ * This class is for maintaining the various Router activity statistics
+ * and publishing them through the metrics interfaces.
+ */
+@Metrics(name="RouterActivity", about="Router metrics", context="dfs")
+public class RouterMetrics {
+
+  private final MetricsRegistry registry = new MetricsRegistry("router");
+
+  @Metric("Duration in SafeMode at startup in msec")
+  private MutableGaugeInt safeModeTime;
+
+  private JvmMetrics jvmMetrics = null;
+
+  RouterMetrics(
+      String processName, String sessionId, final JvmMetrics jvmMetrics) {
+    this.jvmMetrics = jvmMetrics;
+    registry.tag(ProcessName, processName).tag(SessionId, sessionId);
+  }
+
+  public static RouterMetrics create(Configuration conf) {
+    String sessionId = conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY);
+    String processName = "Router";
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    JvmMetrics jm = JvmMetrics.create(processName, sessionId, ms);
+
+    return ms.register(new RouterMetrics(processName, sessionId, jm));
+  }
+
+  public JvmMetrics getJvmMetrics() {
+    return jvmMetrics;
+  }
+
+  public void shutdown() {
+    DefaultMetricsSystem.shutdown();
+  }
+
+  public void setSafeModeTime(long elapsed) {
+    safeModeTime.set((int) elapsed);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java
new file mode 100644
index 0000000..f4debce
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.metrics.FederationMetrics;
+import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
+import org.apache.hadoop.service.AbstractService;
+
+/**
+ * Service to manage the metrics of the Router.
+ */
+public class RouterMetricsService extends AbstractService {
+
+  /** Router for this metrics. */
+  private final Router router;
+
+  /** Router metrics. */
+  private RouterMetrics routerMetrics;
+  /** Federation metrics. */
+  private FederationMetrics federationMetrics;
+  /** Namenode mock metrics. */
+  private NamenodeBeanMetrics nnMetrics;
+
+
+  public RouterMetricsService(final Router router) {
+    super(RouterMetricsService.class.getName());
+    this.router = router;
+  }
+
+  @Override
+  protected void serviceInit(Configuration configuration) throws Exception {
+    this.routerMetrics = RouterMetrics.create(configuration);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    // Wrapper for all the FSNamesystem JMX interfaces
+    this.nnMetrics = new NamenodeBeanMetrics(this.router);
+
+    // Federation MBean JMX interface
+    this.federationMetrics = new FederationMetrics(this.router);
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    // Remove JMX interfaces
+    if (this.federationMetrics != null) {
+      this.federationMetrics.close();
+    }
+
+    // Remove Namenode JMX interfaces
+    if (this.nnMetrics != null) {
+      this.nnMetrics.close();
+    }
+
+    // Shutdown metrics
+    if (this.routerMetrics != null) {
+      this.routerMetrics.shutdown();
+    }
+  }
+
+  /**
+   * Get the metrics system for the Router.
+   *
+   * @return Router metrics.
+   */
+  public RouterMetrics getRouterMetrics() {
+    return this.routerMetrics;
+  }
+
+  /**
+   * Get the federation metrics.
+   *
+   * @return Federation metrics.
+   */
+  public FederationMetrics getFederationMetrics() {
+    return this.federationMetrics;
+  }
+
+  /**
+   * Get the JVM metrics for the Router.
+   *
+   * @return JVM metrics.
+   */
+  public JvmMetrics getJvmMetrics() {
+    if (this.routerMetrics == null) {
+      return null;
+    }
+    return this.routerMetrics.getJvmMetrics();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index 3a32be1..5c33c2e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -97,6 +97,8 @@ public class RouterRpcClient {
   private final ExecutorService executorService;
   /** Retry policy for router -> NN communication. */
   private final RetryPolicy retryPolicy;
+  /** Optional perf monitor. */
+  private final RouterRpcMonitor rpcMonitor;
 
   /** Pattern to parse a stack trace line. */
   private static final Pattern STACK_TRACE_PATTERN =
@@ -111,8 +113,7 @@ public class RouterRpcClient {
    * @param monitor Optional performance monitor.
    */
   public RouterRpcClient(Configuration conf, String identifier,
-      ActiveNamenodeResolver resolver) {
-
+      ActiveNamenodeResolver resolver, RouterRpcMonitor monitor) {
     this.routerId = identifier;
 
     this.namenodeResolver = resolver;
@@ -125,6 +126,8 @@ public class RouterRpcClient {
         .build();
     this.executorService = Executors.newCachedThreadPool(threadFactory);
 
+    this.rpcMonitor = monitor;
+
     int maxFailoverAttempts = conf.getInt(
         HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY,
         HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT);
@@ -192,6 +195,15 @@ public class RouterRpcClient {
   }
 
   /**
+   * JSON representation of the connection pool.
+   *
+   * @return String representation of the JSON.
+   */
+  public String getJSON() {
+    return this.connectionManager.getJSON();
+  }
+
+  /**
    * Get ClientProtocol proxy client for a NameNode. Each combination of user +
    * NN must use a unique proxy client. Previously created clients are cached
    * and stored in a connection pool by the ConnectionManager.
@@ -294,6 +306,9 @@ public class RouterRpcClient {
     }
 
     Object ret = null;
+    if (rpcMonitor != null) {
+      rpcMonitor.proxyOp();
+    }
     boolean failover = false;
     Map<FederationNamenodeContext, IOException> ioes = new LinkedHashMap<>();
     for (FederationNamenodeContext namenode : namenodes) {
@@ -310,18 +325,31 @@ public class RouterRpcClient {
           InetSocketAddress address = client.getAddress();
           namenodeResolver.updateActiveNamenode(nsId, address);
         }
+        if (this.rpcMonitor != null) {
+          this.rpcMonitor.proxyOpComplete(true);
+        }
         return ret;
       } catch (IOException ioe) {
         ioes.put(namenode, ioe);
         if (ioe instanceof StandbyException) {
           // Fail over indicated by retry policy and/or NN
+          if (this.rpcMonitor != null) {
+            this.rpcMonitor.proxyOpFailureStandby();
+          }
           failover = true;
         } else if (ioe instanceof RemoteException) {
+          if (this.rpcMonitor != null) {
+            this.rpcMonitor.proxyOpComplete(true);
+          }
           // RemoteException returned by NN
           throw (RemoteException) ioe;
         } else {
           // Other communication error, this is a failure
           // Communication retries are handled by the retry policy
+          if (this.rpcMonitor != null) {
+            this.rpcMonitor.proxyOpFailureCommunicate();
+            this.rpcMonitor.proxyOpComplete(false);
+          }
           throw ioe;
         }
       } finally {
@@ -330,6 +358,9 @@ public class RouterRpcClient {
         }
       }
     }
+    if (this.rpcMonitor != null) {
+      this.rpcMonitor.proxyOpComplete(false);
+    }
 
     // All namenodes were unavailable or in standby
     String msg = "No namenode available to invoke " + method.getName() + " " +
@@ -746,6 +777,10 @@ public class RouterRpcClient {
       }
     }
 
+    if (rpcMonitor != null) {
+      rpcMonitor.proxyOp();
+    }
+
     try {
       List<Future<Object>> futures = executorService.invokeAll(callables);
       Map<T, Object> results = new TreeMap<>();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
new file mode 100644
index 0000000..d889a56
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+
+/**
+ * Metrics and monitoring interface for the router RPC server. Allows pluggable
+ * diagnostics and monitoring services to be attached.
+ */
+public interface RouterRpcMonitor {
+
+  /**
+   * Initialize the monitor.
+   * @param conf Configuration for the monitor.
+   * @param server RPC server.
+   * @param store State Store.
+   */
+  void init(
+      Configuration conf, RouterRpcServer server, StateStoreService store);
+
+  /**
+   * Close the monitor.
+   */
+  void close();
+
+  /**
+   * Start processing an operation on the Router.
+   */
+  void startOp();
+
+  /**
+   * Start proxying an operation to the Namenode.
+   * @return Id of the thread doing the proxying.
+   */
+  long proxyOp();
+
+  /**
+   * Mark a proxy operation as completed.
+   * @param success If the operation was successful.
+   */
+  void proxyOpComplete(boolean success);
+
+  /**
+   * Failed to proxy an operation to a Namenode because it was in standby.
+   */
+  void proxyOpFailureStandby();
+
+  /**
+   * Failed to proxy an operation to a Namenode because of an unexpected
+   * exception.
+   */
+  void proxyOpFailureCommunicate();
+
+  /**
+   * Failed to proxy an operation because it is not implemented.
+   */
+  void proxyOpNotImplemented();
+
+  /**
+   * If the Router cannot contact the State Store in an operation.
+   */
+  void routerFailureStateStore();
+
+  /**
+   * If the Router is in safe mode.
+   */
+  void routerFailureSafemode();
+
+  /**
+   * If a path is locked.
+   */
+  void routerFailureLocked();
+
+  /**
+   * If a path is in a read only mount point.
+   */
+  void routerFailureReadOnly();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index f9b4a5d..6aee1ee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -120,6 +120,7 @@ import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -159,6 +160,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
   /** RPC clients to connect to the Namenodes. */
   private final RouterRpcClient rpcClient;
 
+  /** Monitor metrics for the RPC calls. */
+  private final RouterRpcMonitor rpcMonitor;
+
 
   /** Interface to identify the active NN for a nameservice or blockpool ID. */
   private final ActiveNamenodeResolver namenodeResolver;
@@ -256,14 +260,28 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
     this.rpcAddress = new InetSocketAddress(
         confRpcAddress.getHostName(), listenAddress.getPort());
 
+    // Create metrics monitor
+    Class<? extends RouterRpcMonitor> rpcMonitorClass = this.conf.getClass(
+        DFSConfigKeys.DFS_ROUTER_METRICS_CLASS,
+        DFSConfigKeys.DFS_ROUTER_METRICS_CLASS_DEFAULT,
+        RouterRpcMonitor.class);
+    this.rpcMonitor = ReflectionUtils.newInstance(rpcMonitorClass, conf);
+
     // Create the client
     this.rpcClient = new RouterRpcClient(this.conf, this.router.getRouterId(),
-        this.namenodeResolver);
+        this.namenodeResolver, this.rpcMonitor);
   }
 
   @Override
   protected void serviceInit(Configuration configuration) throws Exception {
     this.conf = configuration;
+
+    if (this.rpcMonitor == null) {
+      LOG.error("Cannot instantiate Router RPC metrics class");
+    } else {
+      this.rpcMonitor.init(this.conf, this, this.router.getStateStore());
+    }
+
     super.serviceInit(configuration);
   }
 
@@ -281,6 +299,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
     if (this.rpcServer != null) {
       this.rpcServer.stop();
     }
+    if (rpcMonitor != null) {
+      this.rpcMonitor.close();
+    }
     super.serviceStop();
   }
 
@@ -294,6 +315,15 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
   }
 
   /**
+   * Get the RPC monitor and metrics.
+   *
+   * @return RPC monitor and metrics.
+   */
+  public RouterRpcMonitor getRPCMonitor() {
+    return rpcMonitor;
+  }
+
+  /**
    * Allow access to the client RPC server for testing.
    *
    * @return The RPC server.
@@ -330,6 +360,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
     checkOperation(op);
 
     if (!supported) {
+      if (rpcMonitor != null) {
+        rpcMonitor.proxyOpNotImplemented();
+      }
       String methodName = getMethodName();
       throw new UnsupportedOperationException(
           "Operation \"" + methodName + "\" is not supported");
@@ -346,6 +379,10 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
    */
   private void checkOperation(OperationCategory op) throws StandbyException {
     // Log the function we are currently calling.
+    if (rpcMonitor != null) {
+      rpcMonitor.startOp();
+    }
+    // Log the function we are currently calling.
     if (LOG.isDebugEnabled()) {
       String methodName = getMethodName();
       LOG.debug("Proxying operation: {}", methodName);
@@ -1912,16 +1949,22 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
    */
   private List<RemoteLocation> getLocationsForPath(
       String path, boolean failIfLocked) throws IOException {
-    // Check the location for this path
-    final PathLocation location =
-        this.subclusterResolver.getDestinationForPath(path);
-    if (location == null) {
-      throw new IOException("Cannot find locations for " + path + " in " +
-          this.subclusterResolver);
-    }
+    try {
+      // Check the location for this path
+      final PathLocation location =
+          this.subclusterResolver.getDestinationForPath(path);
+      if (location == null) {
+        throw new IOException("Cannot find locations for " + path + " in " +
+            this.subclusterResolver);
+      }
 
-    // Log the access to a path
-    return location.getDestinations();
+      return location.getDestinations();
+    } catch (IOException ioe) {
+      if (this.rpcMonitor != null) {
+        this.rpcMonitor.routerFailureStateStore();
+      }
+      throw ioe;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java
index 90a6699..fbece88 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java
@@ -26,6 +26,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
 import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
 import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
 import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
@@ -140,6 +141,13 @@ public abstract class CachedRecordStore<R extends BaseRecord>
         writeLock.unlock();
       }
 
+      // Update the metrics for the cache State Store size
+      StateStoreMetrics metrics = getDriver().getMetrics();
+      if (metrics != null) {
+        String recordName = getRecordClass().getSimpleName();
+        metrics.setCacheSize(recordName, this.records.size());
+      }
+
       lastUpdate = Time.monotonicNow();
     }
     return true;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
index 3aa3ffd..0289ba6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
@@ -25,15 +25,23 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMBean;
+import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
 import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
 import org.apache.hadoop.hdfs.server.federation.store.impl.MembershipStoreImpl;
 import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl;
 import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
 import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.metrics2.MetricsException;
+import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Time;
@@ -87,6 +95,8 @@ public class StateStoreService extends CompositeService {
   /** Service to maintain data store connection. */
   private StateStoreConnectionMonitorService monitorService;
 
+  /** StateStore metrics. */
+  private StateStoreMetrics metrics;
 
   /** Supported record stores. */
   private final Map<
@@ -152,6 +162,21 @@ public class StateStoreService extends CompositeService {
     this.cacheUpdater = new StateStoreCacheUpdateService(this);
     addService(this.cacheUpdater);
 
+    // Create metrics for the State Store
+    this.metrics = StateStoreMetrics.create(conf);
+
+    // Adding JMX interface
+    try {
+      StandardMBean bean = new StandardMBean(metrics, StateStoreMBean.class);
+      ObjectName registeredObject =
+          MBeans.register("Router", "StateStore", bean);
+      LOG.info("Registered StateStoreMBean: {}", registeredObject);
+    } catch (NotCompliantMBeanException e) {
+      throw new RuntimeException("Bad StateStoreMBean setup", e);
+    } catch (MetricsException e) {
+      LOG.info("Failed to register State Store bean {}", e.getMessage());
+    }
+
     super.serviceInit(this.conf);
   }
 
@@ -165,6 +190,11 @@ public class StateStoreService extends CompositeService {
   protected void serviceStop() throws Exception {
     closeDriver();
 
+    if (metrics != null) {
+      metrics.shutdown();
+      metrics = null;
+    }
+
     super.serviceStop();
   }
 
@@ -228,7 +258,8 @@ public class StateStoreService extends CompositeService {
     synchronized (this.driver) {
       if (!isDriverReady()) {
         String driverName = this.driver.getClass().getSimpleName();
-        if (this.driver.init(conf, getIdentifier(), getSupportedRecords())) {
+        if (this.driver.init(
+            conf, getIdentifier(), getSupportedRecords(), metrics)) {
           LOG.info("Connection to the State Store driver {} is open and ready",
               driverName);
           this.refreshCaches();
@@ -398,4 +429,13 @@ public class StateStoreService extends CompositeService {
     throw new IOException("Registered cache was not found for " + clazz);
   }
 
+  /**
+   * Get the metrics for the State Store.
+   *
+   * @return State Store metrics.
+   */
+  public StateStoreMetrics getMetrics() {
+    return metrics;
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java
index 90111bf..3ebab0b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java
@@ -21,6 +21,7 @@ import java.net.InetAddress;
 import java.util.Collection;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
@@ -46,6 +47,9 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations {
   /** Identifier for the driver. */
   private String identifier;
 
+  /** State Store metrics. */
+  private StateStoreMetrics metrics;
+
 
   /**
    * Initialize the state store connection.
@@ -56,10 +60,12 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations {
    * @return If initialized and ready, false if failed to initialize driver.
    */
   public boolean init(final Configuration config, final String id,
-      final Collection<Class<? extends BaseRecord>> records) {
+      final Collection<Class<? extends BaseRecord>> records,
+      final StateStoreMetrics stateStoreMetrics) {
 
     this.conf = config;
     this.identifier = id;
+    this.metrics = stateStoreMetrics;
 
     if (this.identifier == null) {
       LOG.warn("The identifier for the State Store connection is not set");
@@ -101,6 +107,15 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations {
   }
 
   /**
+   * Get the metrics for the State Store.
+   *
+   * @return State Store metrics.
+   */
+  public StateStoreMetrics getMetrics() {
+    return this.metrics;
+  }
+
+  /**
    * Prepare the driver to access data storage.
    *
    * @return True if the driver was successfully initialized. If false is

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java
index e2038fa..7bc93de 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Collection;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
 import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
 import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
 
@@ -41,8 +42,9 @@ public abstract class StateStoreSerializableImpl extends StateStoreBaseImpl {
 
   @Override
   public boolean init(final Configuration config, final String id,
-      final Collection<Class<? extends BaseRecord>> records) {
-    boolean ret = super.init(config, id, records);
+      final Collection<Class<? extends BaseRecord>> records,
+      final StateStoreMetrics metrics) {
+    boolean ret = super.init(config, id, records, metrics);
 
     this.serializer = StateStoreSerializer.getSerializer(config);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
index ddcd537..97c821e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
 import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple;
 import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.getRecordName;
 import static org.apache.hadoop.util.curator.ZKCuratorManager.getNodePath;
+import static org.apache.hadoop.util.Time.monotonicNow;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -123,6 +124,7 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
   public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz, String sub)
       throws IOException {
     verifyDriverReady();
+    long start = monotonicNow();
     List<T> ret = new ArrayList<>();
     String znode = getZNodeForClass(clazz);
     try {
@@ -157,11 +159,14 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
         }
       }
     } catch (Exception e) {
+      getMetrics().addFailure(monotonicNow() - start);
       String msg = "Cannot get children for \"" + znode + "\": " +
           e.getMessage();
       LOG.error(msg);
       throw new IOException(msg);
     }
+    long end = monotonicNow();
+    getMetrics().addRead(end - start);
     return new QueryResult<T>(ret, getTime());
   }
 
@@ -178,6 +183,7 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
     Class<? extends BaseRecord> recordClass = record0.getClass();
     String znode = getZNodeForClass(recordClass);
 
+    long start = monotonicNow();
     boolean status = true;
     for (T record : records) {
       String primaryKey = getPrimaryKey(record);
@@ -187,6 +193,12 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
         status = false;
       }
     }
+    long end = monotonicNow();
+    if (status) {
+      getMetrics().addWrite(end - start);
+    } else {
+      getMetrics().addFailure(end - start);
+    }
     return status;
   }
 
@@ -199,12 +211,14 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
     }
 
     // Read the current data
+    long start = monotonicNow();
     List<T> records = null;
     try {
       QueryResult<T> result = get(clazz);
       records = result.getRecords();
     } catch (IOException ex) {
       LOG.error("Cannot get existing records", ex);
+      getMetrics().addFailure(monotonicNow() - start);
       return 0;
     }
 
@@ -226,14 +240,20 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
         }
       } catch (Exception e) {
         LOG.error("Cannot remove \"{}\"", existingRecord, e);
+        getMetrics().addFailure(monotonicNow() - start);
       }
     }
+    long end = monotonicNow();
+    if (removed > 0) {
+      getMetrics().addRemove(end - start);
+    }
     return removed;
   }
 
   @Override
   public <T extends BaseRecord> boolean removeAll(Class<T> clazz)
       throws IOException {
+    long start = monotonicNow();
     boolean status = true;
     String znode = getZNodeForClass(clazz);
     LOG.info("Deleting all children under {}", znode);
@@ -248,6 +268,12 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
       LOG.error("Cannot remove {}: {}", znode, e.getMessage());
       status = false;
     }
+    long time = monotonicNow() - start;
+    if (status) {
+      getMetrics().addRemove(time);
+    } else {
+      getMetrics().addFailure(time);
+    }
     return status;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MembershipState.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MembershipState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MembershipState.java
index ab0ff0a..ac0b22e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MembershipState.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MembershipState.java
@@ -154,7 +154,7 @@ public abstract class MembershipState extends BaseRecord
 
   public abstract void setStats(MembershipStats stats);
 
-  public abstract MembershipStats getStats() throws IOException;
+  public abstract MembershipStats getStats();
 
   public abstract void setLastContact(long contact);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java
index 16f2b8b..0a3f19d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.federation.store.records;
 
 import java.io.IOException;
+import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -46,6 +47,28 @@ public abstract class MountTable extends BaseRecord {
   private static final Logger LOG = LoggerFactory.getLogger(MountTable.class);
 
 
+  /** Comparator for paths which considers the /. */
+  public static final Comparator<String> PATH_COMPARATOR =
+      new Comparator<String>() {
+        @Override
+        public int compare(String o1, String o2) {
+          String s1 = o1.replace('/', ' ');
+          String s2 = o2.replace('/', ' ');
+          return s1.compareTo(s2);
+        }
+      };
+
+  /** Comparator based on the mount table source. */
+  public static final Comparator<MountTable> SOURCE_COMPARATOR =
+      new Comparator<MountTable>() {
+        public int compare(MountTable m1, MountTable m2) {
+          String src1 = m1.getSourcePath();
+          String src2 = m2.getSourcePath();
+          return PATH_COMPARATOR.compare(src1, src2);
+        }
+      };
+
+
   /**
    * Default constructor for a mount table entry.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MembershipStatePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MembershipStatePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MembershipStatePBImpl.java
index 805c2af..614957b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MembershipStatePBImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MembershipStatePBImpl.java
@@ -288,7 +288,7 @@ public class MembershipStatePBImpl extends MembershipState implements PBRecord {
   }
 
   @Override
-  public MembershipStats getStats() throws IOException {
+  public MembershipStats getStats() {
     NamenodeMembershipStatsRecordProto statsProto =
         this.translator.getProtoOrBuilder().getStats();
     MembershipStats stats =
@@ -298,7 +298,8 @@ public class MembershipStatePBImpl extends MembershipState implements PBRecord {
       statsPB.setProto(statsProto);
       return statsPB;
     } else {
-      throw new IOException("Cannot get stats for the membership");
+      throw new IllegalArgumentException(
+          "Cannot get stats for the membership");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 17989f2..9cef6b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4726,7 +4726,24 @@
     </description>
   </property>
 
-    <property>
+  <property>
+    <name>dfs.federation.router.metrics.enable</name>
+    <value>true</value>
+    <description>
+      If the metrics in the router are enabled.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.federation.router.metrics.class</name>
+    <value>org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCPerformanceMonitor</value>
+    <description>
+      Class to monitor the RPC system in the router. It must implement the
+      RouterRpcMonitor interface.
+    </description>
+  </property>
+
+  <property>
     <name>dfs.federation.router.admin.enable</name>
     <value>true</value>
     <description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java
index bbb548ca..0c01763 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java
@@ -26,12 +26,18 @@ import java.io.BufferedReader;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.lang.management.ManagementFactory;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.Date;
 import java.util.List;
 import java.util.Random;
 
+import javax.management.JMX;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -135,6 +141,13 @@ public final class FederationTestUtils {
     return Math.abs(d1.getTime() - d2.getTime()) < precision;
   }
 
+  public static <T> T getBean(String name, Class<T> obj)
+      throws MalformedObjectNameException {
+    MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
+    ObjectName poolName = new ObjectName(name);
+    return JMX.newMXBeanProxy(mBeanServer, poolName, obj);
+  }
+
   public static boolean addDirectory(FileSystem context, String path)
       throws IOException {
     context.mkdirs(new Path(path), new FsPermission("777"));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
index cac5e6b..58ca1d1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
@@ -32,6 +32,7 @@ public class RouterConfigBuilder {
   private boolean enableHeartbeat = false;
   private boolean enableLocalHeartbeat = false;
   private boolean enableStateStore = false;
+  private boolean enableMetrics = false;
 
   public RouterConfigBuilder(Configuration configuration) {
     this.conf = configuration;
@@ -47,6 +48,7 @@ public class RouterConfigBuilder {
     this.enableHeartbeat = true;
     this.enableLocalHeartbeat = true;
     this.enableStateStore = true;
+    this.enableMetrics = true;
     return this;
   }
 
@@ -75,6 +77,11 @@ public class RouterConfigBuilder {
     return this;
   }
 
+  public RouterConfigBuilder metrics(boolean enable) {
+    this.enableMetrics = enable;
+    return this;
+  }
+
   public RouterConfigBuilder rpc() {
     return this.rpc(true);
   }
@@ -91,6 +98,10 @@ public class RouterConfigBuilder {
     return this.stateStore(true);
   }
 
+  public RouterConfigBuilder metrics() {
+    return this.metrics(true);
+  }
+
   public Configuration build() {
     conf.setBoolean(DFSConfigKeys.DFS_ROUTER_STORE_ENABLE,
         this.enableStateStore);
@@ -101,6 +112,8 @@ public class RouterConfigBuilder {
         this.enableHeartbeat);
     conf.setBoolean(DFSConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE,
         this.enableLocalHeartbeat);
+    conf.setBoolean(DFSConfigKeys.DFS_ROUTER_METRICS_ENABLE,
+        this.enableMetrics);
     return conf;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestFederationMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestFederationMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestFederationMetrics.java
new file mode 100644
index 0000000..d6a194f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestFederationMetrics.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.metrics;
+
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getBean;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.management.MalformedObjectNameException;
+
+import org.apache.commons.collections.ListUtils;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.junit.Test;
+
+/**
+ * Test the JMX interface for the {@link Router}.
+ */
+public class TestFederationMetrics extends TestMetricsBase {
+
+  public static final String FEDERATION_BEAN =
+      "Hadoop:service=Router,name=FederationState";
+  public static final String STATE_STORE_BEAN =
+      "Hadoop:service=Router,name=StateStore";
+  public static final String RPC_BEAN =
+      "Hadoop:service=Router,name=FederationRPC";
+
+  @Test
+  public void testClusterStatsJMX()
+      throws MalformedObjectNameException, IOException {
+
+    FederationMBean bean = getBean(FEDERATION_BEAN, FederationMBean.class);
+    validateClusterStatsBean(bean);
+  }
+
+  @Test
+  public void testClusterStatsDataSource() throws IOException {
+    FederationMetrics metrics = getRouter().getMetrics();
+    validateClusterStatsBean(metrics);
+  }
+
+  @Test
+  public void testMountTableStatsDataSource()
+      throws IOException, JSONException {
+
+    FederationMetrics metrics = getRouter().getMetrics();
+    String jsonString = metrics.getMountTable();
+    JSONArray jsonArray = new JSONArray(jsonString);
+    assertEquals(jsonArray.length(), getMockMountTable().size());
+
+    int match = 0;
+    for (int i = 0; i < jsonArray.length(); i++) {
+      JSONObject json = jsonArray.getJSONObject(i);
+      String src = json.getString("sourcePath");
+
+      for (MountTable entry : getMockMountTable()) {
+        if (entry.getSourcePath().equals(src)) {
+          assertEquals(entry.getDefaultLocation().getNameserviceId(),
+              json.getString("nameserviceId"));
+          assertEquals(entry.getDefaultLocation().getDest(),
+              json.getString("path"));
+          assertNotNullAndNotEmpty(json.getString("dateCreated"));
+          assertNotNullAndNotEmpty(json.getString("dateModified"));
+          match++;
+        }
+      }
+    }
+    assertEquals(match, getMockMountTable().size());
+  }
+
+  private MembershipState findMockNamenode(String nsId, String nnId) {
+
+    @SuppressWarnings("unchecked")
+    List<MembershipState> namenodes =
+        ListUtils.union(getActiveMemberships(), getStandbyMemberships());
+    for (MembershipState nn : namenodes) {
+      if (nn.getNamenodeId().equals(nnId)
+          && nn.getNameserviceId().equals(nsId)) {
+        return nn;
+      }
+    }
+    return null;
+  }
+
+  @Test
+  public void testNamenodeStatsDataSource() throws IOException, JSONException {
+
+    FederationMetrics metrics = getRouter().getMetrics();
+    String jsonString = metrics.getNamenodes();
+    JSONObject jsonObject = new JSONObject(jsonString);
+    Iterator<?> keys = jsonObject.keys();
+    int nnsFound = 0;
+    while (keys.hasNext()) {
+      // Validate each entry against our mocks
+      JSONObject json = jsonObject.getJSONObject((String) keys.next());
+      String nameserviceId = json.getString("nameserviceId");
+      String namenodeId = json.getString("namenodeId");
+
+      MembershipState mockEntry =
+          this.findMockNamenode(nameserviceId, namenodeId);
+      assertNotNull(mockEntry);
+
+      assertEquals(json.getString("state"), mockEntry.getState().toString());
+      MembershipStats stats = mockEntry.getStats();
+      assertEquals(json.getLong("numOfActiveDatanodes"),
+          stats.getNumOfActiveDatanodes());
+      assertEquals(json.getLong("numOfDeadDatanodes"),
+          stats.getNumOfDeadDatanodes());
+      assertEquals(json.getLong("numOfDecommissioningDatanodes"),
+          stats.getNumOfDecommissioningDatanodes());
+      assertEquals(json.getLong("numOfDecomActiveDatanodes"),
+          stats.getNumOfDecomActiveDatanodes());
+      assertEquals(json.getLong("numOfDecomDeadDatanodes"),
+          stats.getNumOfDecomDeadDatanodes());
+      assertEquals(json.getLong("numOfBlocks"), stats.getNumOfBlocks());
+      assertEquals(json.getString("rpcAddress"), mockEntry.getRpcAddress());
+      assertEquals(json.getString("webAddress"), mockEntry.getWebAddress());
+      nnsFound++;
+    }
+    // Validate all memberships are present
+    assertEquals(getActiveMemberships().size() + getStandbyMemberships().size(),
+        nnsFound);
+  }
+
+  @Test
+  public void testNameserviceStatsDataSource()
+      throws IOException, JSONException {
+
+    FederationMetrics metrics = getRouter().getMetrics();
+    String jsonString = metrics.getNameservices();
+    JSONObject jsonObject = new JSONObject(jsonString);
+    Iterator<?> keys = jsonObject.keys();
+    int nameservicesFound = 0;
+    while (keys.hasNext()) {
+      JSONObject json = jsonObject.getJSONObject((String) keys.next());
+      String nameserviceId = json.getString("nameserviceId");
+      String namenodeId = json.getString("namenodeId");
+
+      MembershipState mockEntry =
+          this.findMockNamenode(nameserviceId, namenodeId);
+      assertNotNull(mockEntry);
+
+      // NS should report the active NN
+      assertEquals(mockEntry.getState().toString(), json.getString("state"));
+      assertEquals("ACTIVE", json.getString("state"));
+
+      // Stats in the NS should reflect the stats for the most active NN
+      MembershipStats stats = mockEntry.getStats();
+      assertEquals(stats.getNumOfFiles(), json.getLong("numOfFiles"));
+      assertEquals(stats.getTotalSpace(), json.getLong("totalSpace"));
+      assertEquals(stats.getAvailableSpace(),
+          json.getLong("availableSpace"));
+      assertEquals(stats.getNumOfBlocksMissing(),
+          json.getLong("numOfBlocksMissing"));
+      assertEquals(stats.getNumOfActiveDatanodes(),
+          json.getLong("numOfActiveDatanodes"));
+      assertEquals(stats.getNumOfDeadDatanodes(),
+          json.getLong("numOfDeadDatanodes"));
+      assertEquals(stats.getNumOfDecommissioningDatanodes(),
+          json.getLong("numOfDecommissioningDatanodes"));
+      assertEquals(stats.getNumOfDecomActiveDatanodes(),
+          json.getLong("numOfDecomActiveDatanodes"));
+      assertEquals(stats.getNumOfDecomDeadDatanodes(),
+          json.getLong("numOfDecomDeadDatanodes"));
+      nameservicesFound++;
+    }
+    assertEquals(getNameservices().size(), nameservicesFound);
+  }
+
+  private void assertNotNullAndNotEmpty(String field) {
+    assertNotNull(field);
+    assertTrue(field.length() > 0);
+  }
+
+  private void validateClusterStatsBean(FederationMBean bean)
+      throws IOException {
+
+    // Determine aggregates
+    long numBlocks = 0;
+    long numLive = 0;
+    long numDead = 0;
+    long numDecom = 0;
+    long numDecomLive = 0;
+    long numDecomDead = 0;
+    long numFiles = 0;
+    for (MembershipState mock : getActiveMemberships()) {
+      MembershipStats stats = mock.getStats();
+      numBlocks += stats.getNumOfBlocks();
+      numLive += stats.getNumOfActiveDatanodes();
+      numDead += stats.getNumOfDeadDatanodes();
+      numDecom += stats.getNumOfDecommissioningDatanodes();
+      numDecomLive += stats.getNumOfDecomActiveDatanodes();
+      numDecomDead += stats.getNumOfDecomDeadDatanodes();
+    }
+
+    assertEquals(numBlocks, bean.getNumBlocks());
+    assertEquals(numLive, bean.getNumLiveNodes());
+    assertEquals(numDead, bean.getNumDeadNodes());
+    assertEquals(numDecom, bean.getNumDecommissioningNodes());
+    assertEquals(numDecomLive, bean.getNumDecomLiveNodes());
+    assertEquals(numDecomDead, bean.getNumDecomDeadNodes());
+    assertEquals(numFiles, bean.getNumFiles());
+    assertEquals(getActiveMemberships().size() + getStandbyMemberships().size(),
+        bean.getNumNamenodes());
+    assertEquals(getNameservices().size(), bean.getNumNameservices());
+    assertTrue(bean.getVersion().length() > 0);
+    assertTrue(bean.getCompiledDate().length() > 0);
+    assertTrue(bean.getCompileInfo().length() > 0);
+    assertTrue(bean.getRouterStarted().length() > 0);
+    assertTrue(bean.getHostAndPort().length() > 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestMetricsBase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestMetricsBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestMetricsBase.java
new file mode 100644
index 0000000..bbcfbe8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestMetricsBase.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.metrics;
+
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMESERVICES;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.clearAllRecords;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.createMockMountTable;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.createMockRegistrationForNamenode;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.waitStateStore;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.junit.After;
+import org.junit.Before;
+
+/**
+ * Test the basic metrics functionality.
+ */
+public class TestMetricsBase {
+
+  private StateStoreService stateStore;
+  private MembershipStore membershipStore;
+  private Router router;
+  private Configuration routerConfig;
+
+  private List<MembershipState> activeMemberships;
+  private List<MembershipState> standbyMemberships;
+  private List<MountTable> mockMountTable;
+  private List<String> nameservices;
+
+  @Before
+  public void setupBase() throws Exception {
+
+    if (router == null) {
+      routerConfig = new RouterConfigBuilder()
+          .stateStore()
+          .metrics()
+          .build();
+      router = new Router();
+      router.init(routerConfig);
+      router.setRouterId("routerId");
+      router.start();
+      stateStore = router.getStateStore();
+
+      membershipStore =
+          stateStore.getRegisteredRecordStore(MembershipStore.class);
+
+      // Read all data and load all caches
+      waitStateStore(stateStore, 10000);
+      createFixtures();
+      stateStore.refreshCaches(true);
+      Thread.sleep(1000);
+    }
+  }
+
+  @After
+  public void tearDownBase() throws IOException {
+    if (router != null) {
+      router.stop();
+      router.close();
+      router = null;
+    }
+  }
+
+  private void createFixtures() throws IOException {
+    // Clear all records
+    clearAllRecords(stateStore);
+
+    nameservices = new ArrayList<>();
+    nameservices.add(NAMESERVICES[0]);
+    nameservices.add(NAMESERVICES[1]);
+
+    // 2 NNs per NS
+    activeMemberships = new ArrayList<>();
+    standbyMemberships = new ArrayList<>();
+
+    for (String nameservice : nameservices) {
+      MembershipState namenode1 = createMockRegistrationForNamenode(
+          nameservice, NAMENODES[0], FederationNamenodeServiceState.ACTIVE);
+      NamenodeHeartbeatRequest request1 =
+          NamenodeHeartbeatRequest.newInstance(namenode1);
+      assertTrue(membershipStore.namenodeHeartbeat(request1).getResult());
+      activeMemberships.add(namenode1);
+
+      MembershipState namenode2 = createMockRegistrationForNamenode(
+          nameservice, NAMENODES[1], FederationNamenodeServiceState.STANDBY);
+      NamenodeHeartbeatRequest request2 =
+          NamenodeHeartbeatRequest.newInstance(namenode2);
+      assertTrue(membershipStore.namenodeHeartbeat(request2).getResult());
+      standbyMemberships.add(namenode2);
+    }
+
+    // Add 2 mount table memberships
+    mockMountTable = createMockMountTable(nameservices);
+    synchronizeRecords(stateStore, mockMountTable, MountTable.class);
+  }
+
+  protected Router getRouter() {
+    return router;
+  }
+
+  protected List<MountTable> getMockMountTable() {
+    return mockMountTable;
+  }
+
+  protected List<MembershipState> getActiveMemberships() {
+    return activeMemberships;
+  }
+
+  protected List<MembershipState> getStandbyMemberships() {
+    return standbyMemberships;
+  }
+
+  protected List<String> getNameservices() {
+    return nameservices;
+  }
+
+  protected StateStoreService getStateStore() {
+    return stateStore;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
index 2074d3d..e22be84 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.federation.MockResolver;
 import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
 import org.apache.hadoop.service.Service.STATE;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -46,15 +48,19 @@ public class TestRouter {
   public static void create() throws IOException {
     // Basic configuration without the state store
     conf = new Configuration();
+    // 1 sec cache refresh
+    conf.setInt(DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, 1);
     // Mock resolver classes
-    conf.set(DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
-        MockResolver.class.getCanonicalName());
-    conf.set(DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
-        MockResolver.class.getCanonicalName());
+    conf.setClass(DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
+        MockResolver.class, ActiveNamenodeResolver.class);
+    conf.setClass(DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
+        MockResolver.class, FileSubclusterResolver.class);
 
     // Bind to any available port
     conf.set(DFSConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0");
     conf.set(DFSConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0");
+    conf.set(DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, "127.0.0.1:0");
+    conf.set(DFSConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY, "0.0.0.0");
 
     // Simulate a co-located NN
     conf.set(DFSConfigKeys.DFS_NAMESERVICES, "ns0");
@@ -95,9 +101,18 @@ public class TestRouter {
   @Test
   public void testRouterService() throws InterruptedException, IOException {
 
+    // Admin only
+    testRouterStartup(new RouterConfigBuilder(conf).admin().build());
+
     // Rpc only
     testRouterStartup(new RouterConfigBuilder(conf).rpc().build());
 
+    // Metrics only
+    testRouterStartup(new RouterConfigBuilder(conf).metrics().build());
+
+    // Statestore only
+    testRouterStartup(new RouterConfigBuilder(conf).stateStore().build());
+
     // Heartbeat only
     testRouterStartup(new RouterConfigBuilder(conf).heartbeat().build());
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
index 65e763b..01fe149 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
@@ -35,6 +35,7 @@ import java.util.Map.Entry;
 import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
 import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
 import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
@@ -377,6 +378,74 @@ public class TestStateStoreDriverBase {
     testFetchErrors(driver, MountTable.class);
   }
 
+  public void testMetrics(StateStoreDriver driver)
+      throws IOException, IllegalArgumentException, IllegalAccessException {
+
+    MountTable insertRecord =
+        this.generateFakeRecord(MountTable.class);
+
+    // Put single
+    StateStoreMetrics metrics = stateStore.getMetrics();
+    assertEquals(0, metrics.getWriteOps());
+    driver.put(insertRecord, true, false);
+    assertEquals(1, metrics.getWriteOps());
+
+    // Put multiple
+    metrics.reset();
+    assertEquals(0, metrics.getWriteOps());
+    driver.put(insertRecord, true, false);
+    assertEquals(1, metrics.getWriteOps());
+
+    // Get Single
+    metrics.reset();
+    assertEquals(0, metrics.getReadOps());
+
+    final String querySourcePath = insertRecord.getSourcePath();
+    MountTable partial = MountTable.newInstance();
+    partial.setSourcePath(querySourcePath);
+    final Query<MountTable> query = new Query<>(partial);
+    driver.get(MountTable.class, query);
+    assertEquals(1, metrics.getReadOps());
+
+    // GetAll
+    metrics.reset();
+    assertEquals(0, metrics.getReadOps());
+    driver.get(MountTable.class);
+    assertEquals(1, metrics.getReadOps());
+
+    // GetMultiple
+    metrics.reset();
+    assertEquals(0, metrics.getReadOps());
+    driver.getMultiple(MountTable.class, query);
+    assertEquals(1, metrics.getReadOps());
+
+    // Insert fails
+    metrics.reset();
+    assertEquals(0, metrics.getFailureOps());
+    driver.put(insertRecord, false, true);
+    assertEquals(1, metrics.getFailureOps());
+
+    // Remove single
+    metrics.reset();
+    assertEquals(0, metrics.getRemoveOps());
+    driver.remove(insertRecord);
+    assertEquals(1, metrics.getRemoveOps());
+
+    // Remove multiple
+    metrics.reset();
+    driver.put(insertRecord, true, false);
+    assertEquals(0, metrics.getRemoveOps());
+    driver.remove(MountTable.class, query);
+    assertEquals(1, metrics.getRemoveOps());
+
+    // Remove all
+    metrics.reset();
+    driver.put(insertRecord, true, false);
+    assertEquals(0, metrics.getRemoveOps());
+    driver.removeAll(MountTable.class);
+    assertEquals(1, metrics.getRemoveOps());
+  }
+
   /**
    * Sets the value of a field on the object.
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/2] hadoop git commit: HDFS-12335. Federation Metrics. Contributed by Inigo Goiri.

Posted by in...@apache.org.
HDFS-12335. Federation Metrics. Contributed by Inigo Goiri.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d522007c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d522007c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d522007c

Branch: refs/heads/HDFS-10467
Commit: d522007c7b3f48cc826347b36b7854645f991f2f
Parents: da858ca
Author: Inigo Goiri <in...@apache.org>
Authored: Fri Sep 8 09:37:10 2017 -0700
Committer: Inigo Goiri <in...@apache.org>
Committed: Fri Sep 8 09:37:10 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  11 +
 .../federation/metrics/FederationMBean.java     | 204 ++++++
 .../federation/metrics/FederationMetrics.java   | 673 +++++++++++++++++++
 .../federation/metrics/FederationRPCMBean.java  |  90 +++
 .../metrics/FederationRPCMetrics.java           | 239 +++++++
 .../FederationRPCPerformanceMonitor.java        | 211 ++++++
 .../federation/metrics/NamenodeBeanMetrics.java | 624 +++++++++++++++++
 .../federation/metrics/StateStoreMBean.java     |  45 ++
 .../federation/metrics/StateStoreMetrics.java   | 144 ++++
 .../server/federation/metrics/package-info.java |  27 +
 .../federation/router/ConnectionManager.java    |  23 +
 .../federation/router/ConnectionPool.java       |  23 +
 .../hdfs/server/federation/router/Router.java   |  62 ++
 .../server/federation/router/RouterMetrics.java |  73 ++
 .../federation/router/RouterMetricsService.java | 108 +++
 .../federation/router/RouterRpcClient.java      |  39 +-
 .../federation/router/RouterRpcMonitor.java     |  95 +++
 .../federation/router/RouterRpcServer.java      |  63 +-
 .../federation/store/CachedRecordStore.java     |   8 +
 .../federation/store/StateStoreService.java     |  42 +-
 .../store/driver/StateStoreDriver.java          |  17 +-
 .../driver/impl/StateStoreSerializableImpl.java |   6 +-
 .../driver/impl/StateStoreZooKeeperImpl.java    |  26 +
 .../store/records/MembershipState.java          |   2 +-
 .../federation/store/records/MountTable.java    |  23 +
 .../records/impl/pb/MembershipStatePBImpl.java  |   5 +-
 .../src/main/resources/hdfs-default.xml         |  19 +-
 .../server/federation/FederationTestUtils.java  |  13 +
 .../server/federation/RouterConfigBuilder.java  |  13 +
 .../metrics/TestFederationMetrics.java          | 237 +++++++
 .../federation/metrics/TestMetricsBase.java     | 150 +++++
 .../server/federation/router/TestRouter.java    |  23 +-
 .../store/driver/TestStateStoreDriverBase.java  |  69 ++
 33 files changed, 3383 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 5fd0811..91f0dab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFau
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
 import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcMonitor;
+import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCPerformanceMonitor;
 import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
@@ -1149,6 +1151,15 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       FEDERATION_ROUTER_PREFIX + "rpc.enable";
   public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true;
 
+  public static final String DFS_ROUTER_METRICS_ENABLE =
+      FEDERATION_ROUTER_PREFIX + "metrics.enable";
+  public static final boolean DFS_ROUTER_METRICS_ENABLE_DEFAULT = true;
+  public static final String DFS_ROUTER_METRICS_CLASS =
+      FEDERATION_ROUTER_PREFIX + "metrics.class";
+  public static final Class<? extends RouterRpcMonitor>
+      DFS_ROUTER_METRICS_CLASS_DEFAULT =
+          FederationRPCPerformanceMonitor.class;
+
   // HDFS Router heartbeat
   public static final String DFS_ROUTER_HEARTBEAT_ENABLE =
       FEDERATION_ROUTER_PREFIX + "heartbeat.enable";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMBean.java
new file mode 100644
index 0000000..43efb3c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMBean.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.metrics;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * JMX interface for the federation statistics.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface FederationMBean {
+
+  /**
+   * Get information about all the namenodes in the federation or null if
+   * failure.
+   * @return JSON with all the Namenodes.
+   */
+  String getNamenodes();
+
+  /**
+   * Get the latest info for each registered nameservice.
+   * @return JSON with all the nameservices.
+   */
+  String getNameservices();
+
+  /**
+   * Get the mount table for the federated filesystem or null if failure.
+   * @return JSON with the mount table.
+   */
+  String getMountTable();
+
+  /**
+   * Get the total capacity of the federated cluster.
+   * @return Total capacity of the federated cluster.
+   */
+  long getTotalCapacity();
+
+  /**
+   * Get the used capacity of the federated cluster.
+   * @return Used capacity of the federated cluster.
+   */
+  long getUsedCapacity();
+
+  /**
+   * Get the remaining capacity of the federated cluster.
+   * @return Remaining capacity of the federated cluster.
+   */
+  long getRemainingCapacity();
+
+  /**
+   * Get the number of nameservices in the federation.
+   * @return Number of nameservices in the federation.
+   */
+  int getNumNameservices();
+
+  /**
+   * Get the number of namenodes.
+   * @return Number of namenodes.
+   */
+  int getNumNamenodes();
+
+  /**
+   * Get the number of expired namenodes.
+   * @return Number of expired namenodes.
+   */
+  int getNumExpiredNamenodes();
+
+  /**
+   * Get the number of live datanodes.
+   * @return Number of live datanodes.
+   */
+  int getNumLiveNodes();
+
+  /**
+   * Get the number of dead datanodes.
+   * @return Number of dead datanodes.
+   */
+  int getNumDeadNodes();
+
+  /**
+   * Get the number of decommissioning datanodes.
+   * @return Number of decommissioning datanodes.
+   */
+  int getNumDecommissioningNodes();
+
+  /**
+   * Get the number of live decommissioned datanodes.
+   * @return Number of live decommissioned datanodes.
+   */
+  int getNumDecomLiveNodes();
+
+  /**
+   * Get the number of dead decommissioned datanodes.
+   * @return Number of dead decommissioned datanodes.
+   */
+  int getNumDecomDeadNodes();
+
+  /**
+   * Get Max, Median, Min and Standard Deviation of DataNodes usage.
+   * @return the DataNode usage information, as a JSON string.
+   */
+  String getNodeUsage();
+
+  /**
+   * Get the number of blocks in the federation.
+   * @return Number of blocks in the federation.
+   */
+  long getNumBlocks();
+
+  /**
+   * Get the number of missing blocks in the federation.
+   * @return Number of missing blocks in the federation.
+   */
+  long getNumOfMissingBlocks();
+
+  /**
+   * Get the number of pending replication blocks in the federation.
+   * @return Number of pending replication blocks in the federation.
+   */
+  long getNumOfBlocksPendingReplication();
+
+  /**
+   * Get the number of under replicated blocks in the federation.
+   * @return Number of under replicated blocks in the federation.
+   */
+  long getNumOfBlocksUnderReplicated();
+
+  /**
+   * Get the number of pending deletion blocks in the federation.
+   * @return Number of pending deletion blocks in the federation.
+   */
+  long getNumOfBlocksPendingDeletion();
+
+  /**
+   * Get the number of files in the federation.
+   * @return Number of files in the federation.
+   */
+  long getNumFiles();
+
+  /**
+   * When the router started.
+   * @return Date as a string the router started.
+   */
+  String getRouterStarted();
+
+  /**
+   * Get the version of the router.
+   * @return Version of the router.
+   */
+  String getVersion();
+
+  /**
+   * Get the compilation date of the router.
+   * @return Compilation date of the router.
+   */
+  String getCompiledDate();
+
+  /**
+   * Get the compilation info of the router.
+   * @return Compilation info of the router.
+   */
+  String getCompileInfo();
+
+  /**
+   * Get the host and port of the router.
+   * @return Host and port of the router.
+   */
+  String getHostAndPort();
+
+  /**
+   * Get the identifier of the router.
+   * @return Identifier of the router.
+   */
+  String getRouterId();
+
+  /**
+   * Get the host and port of the router.
+   * @return Host and port of the router.
+   */
+  String getClusterId();
+
+  /**
+   * Get the host and port of the router.
+   * @return Host and port of the router.
+   */
+  String getBlockPoolId();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java
new file mode 100644
index 0000000..1e80256
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java
@@ -0,0 +1,673 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.metrics;
+
+import static org.apache.hadoop.util.Time.now;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.ToIntFunction;
+import java.util.function.ToLongFunction;
+import java.util.stream.Collectors;
+
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
+import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.VersionInfo;
+import org.codehaus.jettison.json.JSONObject;
+import org.eclipse.jetty.util.ajax.JSON;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of the Router metrics collector.
+ */
+public class FederationMetrics implements FederationMBean {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(FederationMetrics.class);
+
+  /** Format for a date. */
+  private static final String DATE_FORMAT = "yyyy/MM/dd HH:mm:ss";
+
+
+  /** Router interface. */
+  private final Router router;
+
+  /** FederationState JMX bean. */
+  private ObjectName beanName;
+
+  /** Resolve the namenode for each namespace. */
+  private final ActiveNamenodeResolver namenodeResolver;
+
+  /** State store. */
+  private final StateStoreService stateStore;
+  /** Membership state store. */
+  private MembershipStore membershipStore;
+  /** Mount table store. */
+  private MountTableStore mountTableStore;
+
+
+  public FederationMetrics(Router router) throws IOException {
+    this.router = router;
+
+    try {
+      StandardMBean bean = new StandardMBean(this, FederationMBean.class);
+      this.beanName = MBeans.register("Router", "FederationState", bean);
+      LOG.info("Registered Router MBean: {}", this.beanName);
+    } catch (NotCompliantMBeanException e) {
+      throw new RuntimeException("Bad Router MBean setup", e);
+    }
+
+    // Resolve namenode for each nameservice
+    this.namenodeResolver = this.router.getNamenodeResolver();
+
+    // State store interfaces
+    this.stateStore = this.router.getStateStore();
+    if (this.stateStore == null) {
+      LOG.error("State store not available");
+    } else {
+      this.membershipStore = stateStore.getRegisteredRecordStore(
+          MembershipStore.class);
+      this.mountTableStore = stateStore.getRegisteredRecordStore(
+          MountTableStore.class);
+    }
+  }
+
+  /**
+   * Unregister the JMX beans.
+   */
+  public void close() {
+    if (this.beanName != null) {
+      MBeans.unregister(beanName);
+    }
+  }
+
+  @Override
+  public String getNamenodes() {
+    final Map<String, Map<String, Object>> info = new LinkedHashMap<>();
+    try {
+      // Get the values from the store
+      GetNamenodeRegistrationsRequest request =
+          GetNamenodeRegistrationsRequest.newInstance();
+      GetNamenodeRegistrationsResponse response =
+          membershipStore.getNamenodeRegistrations(request);
+
+      // Order the namenodes
+      final List<MembershipState> namenodes = response.getNamenodeMemberships();
+      if (namenodes == null || namenodes.size() == 0) {
+        return JSON.toString(info);
+      }
+      List<MembershipState> namenodesOrder = new ArrayList<>(namenodes);
+      Collections.sort(namenodesOrder, MembershipState.NAME_COMPARATOR);
+
+      // Dump namenodes information into JSON
+      for (MembershipState namenode : namenodesOrder) {
+        Map<String, Object> innerInfo = new HashMap<>();
+        Map<String, Object> map = getJson(namenode);
+        innerInfo.putAll(map);
+        long dateModified = namenode.getDateModified();
+        long lastHeartbeat = getSecondsSince(dateModified);
+        innerInfo.put("lastHeartbeat", lastHeartbeat);
+        MembershipStats stats = namenode.getStats();
+        long used = stats.getTotalSpace() - stats.getAvailableSpace();
+        innerInfo.put("used", used);
+        info.put(namenode.getNamenodeKey(),
+            Collections.unmodifiableMap(innerInfo));
+      }
+    } catch (IOException e) {
+      LOG.error("Enable to fetch json representation of namenodes {}",
+          e.getMessage());
+      return "{}";
+    }
+    return JSON.toString(info);
+  }
+
+  @Override
+  public String getNameservices() {
+    final Map<String, Map<String, Object>> info = new LinkedHashMap<>();
+    try {
+      final List<MembershipState> namenodes = getActiveNamenodeRegistrations();
+      List<MembershipState> namenodesOrder = new ArrayList<>(namenodes);
+      Collections.sort(namenodesOrder, MembershipState.NAME_COMPARATOR);
+
+      // Dump namenodes information into JSON
+      for (MembershipState namenode : namenodesOrder) {
+        Map<String, Object> innerInfo = new HashMap<>();
+        Map<String, Object> map = getJson(namenode);
+        innerInfo.putAll(map);
+        long dateModified = namenode.getDateModified();
+        long lastHeartbeat = getSecondsSince(dateModified);
+        innerInfo.put("lastHeartbeat", lastHeartbeat);
+        MembershipStats stats = namenode.getStats();
+        long used = stats.getTotalSpace() - stats.getAvailableSpace();
+        innerInfo.put("used", used);
+        info.put(namenode.getNamenodeKey(),
+            Collections.unmodifiableMap(innerInfo));
+      }
+    } catch (IOException e) {
+      LOG.error("Cannot retrieve nameservices for JMX: {}", e.getMessage());
+      return "{}";
+    }
+    return JSON.toString(info);
+  }
+
+  @Override
+  public String getMountTable() {
+    final List<Map<String, Object>> info = new LinkedList<>();
+
+    try {
+      // Get all the mount points in order
+      GetMountTableEntriesRequest request =
+          GetMountTableEntriesRequest.newInstance("/");
+      GetMountTableEntriesResponse response =
+          mountTableStore.getMountTableEntries(request);
+      final List<MountTable> mounts = response.getEntries();
+      List<MountTable> orderedMounts = new ArrayList<>(mounts);
+      Collections.sort(orderedMounts, MountTable.SOURCE_COMPARATOR);
+
+      // Dump mount table entries information into JSON
+      for (MountTable entry : orderedMounts) {
+        // Sumarize destinations
+        Set<String> nameservices = new LinkedHashSet<>();
+        Set<String> paths = new LinkedHashSet<>();
+        for (RemoteLocation location : entry.getDestinations()) {
+          nameservices.add(location.getNameserviceId());
+          paths.add(location.getDest());
+        }
+
+        Map<String, Object> map = getJson(entry);
+        // We add some values with a cleaner format
+        map.put("dateCreated", getDateString(entry.getDateCreated()));
+        map.put("dateModified", getDateString(entry.getDateModified()));
+
+        Map<String, Object> innerInfo = new HashMap<>();
+        innerInfo.putAll(map);
+        innerInfo.put("nameserviceId", StringUtils.join(",", nameservices));
+        innerInfo.put("path", StringUtils.join(",", paths));
+        if (nameservices.size() > 1) {
+          innerInfo.put("order", entry.getDestOrder().toString());
+        } else {
+          innerInfo.put("order", "");
+        }
+        innerInfo.put("readonly", entry.isReadOnly());
+        info.add(Collections.unmodifiableMap(innerInfo));
+      }
+    } catch (IOException e) {
+      LOG.error(
+          "Cannot generate JSON of mount table from store: {}", e.getMessage());
+      return "[]";
+    }
+    return JSON.toString(info);
+  }
+
+  @Override
+  public long getTotalCapacity() {
+    return getNameserviceAggregatedLong(MembershipStats::getTotalSpace);
+  }
+
+  @Override
+  public long getRemainingCapacity() {
+    return getNameserviceAggregatedLong(MembershipStats::getAvailableSpace);
+  }
+
+  @Override
+  public long getUsedCapacity() {
+    return getTotalCapacity() - getRemainingCapacity();
+  }
+
+  @Override
+  public int getNumNameservices() {
+    try {
+      Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+      return nss.size();
+    } catch (IOException e) {
+      LOG.error(
+          "Cannot fetch number of expired registrations from the store: {}",
+          e.getMessage());
+      return 0;
+    }
+  }
+
+  @Override
+  public int getNumNamenodes() {
+    try {
+      GetNamenodeRegistrationsRequest request =
+          GetNamenodeRegistrationsRequest.newInstance();
+      GetNamenodeRegistrationsResponse response =
+          membershipStore.getNamenodeRegistrations(request);
+      List<MembershipState> memberships = response.getNamenodeMemberships();
+      return memberships.size();
+    } catch (IOException e) {
+      LOG.error("Cannot retrieve numNamenodes for JMX: {}", e.getMessage());
+      return 0;
+    }
+  }
+
+  @Override
+  public int getNumExpiredNamenodes() {
+    try {
+      GetNamenodeRegistrationsRequest request =
+          GetNamenodeRegistrationsRequest.newInstance();
+      GetNamenodeRegistrationsResponse response =
+          membershipStore.getExpiredNamenodeRegistrations(request);
+      List<MembershipState> expiredMemberships =
+          response.getNamenodeMemberships();
+      return expiredMemberships.size();
+    } catch (IOException e) {
+      LOG.error(
+          "Cannot retrieve numExpiredNamenodes for JMX: {}", e.getMessage());
+      return 0;
+    }
+  }
+
+  @Override
+  public int getNumLiveNodes() {
+    return getNameserviceAggregatedInt(
+        MembershipStats::getNumOfActiveDatanodes);
+  }
+
+  @Override
+  public int getNumDeadNodes() {
+    return getNameserviceAggregatedInt(MembershipStats::getNumOfDeadDatanodes);
+  }
+
+  @Override
+  public int getNumDecommissioningNodes() {
+    return getNameserviceAggregatedInt(
+        MembershipStats::getNumOfDecommissioningDatanodes);
+  }
+
+  @Override
+  public int getNumDecomLiveNodes() {
+    return getNameserviceAggregatedInt(
+        MembershipStats::getNumOfDecomActiveDatanodes);
+  }
+
+  @Override
+  public int getNumDecomDeadNodes() {
+    return getNameserviceAggregatedInt(
+        MembershipStats::getNumOfDecomDeadDatanodes);
+  }
+
+  @Override // NameNodeMXBean
+  public String getNodeUsage() {
+    float median = 0;
+    float max = 0;
+    float min = 0;
+    float dev = 0;
+
+    final Map<String, Map<String, Object>> info = new HashMap<>();
+    try {
+      RouterRpcServer rpcServer = this.router.getRpcServer();
+      DatanodeInfo[] live =
+          rpcServer.getDatanodeReport(DatanodeReportType.LIVE);
+
+      if (live.length > 0) {
+        float totalDfsUsed = 0;
+        float[] usages = new float[live.length];
+        int i = 0;
+        for (DatanodeInfo dn : live) {
+          usages[i++] = dn.getDfsUsedPercent();
+          totalDfsUsed += dn.getDfsUsedPercent();
+        }
+        totalDfsUsed /= live.length;
+        Arrays.sort(usages);
+        median = usages[usages.length / 2];
+        max = usages[usages.length - 1];
+        min = usages[0];
+
+        for (i = 0; i < usages.length; i++) {
+          dev += (usages[i] - totalDfsUsed) * (usages[i] - totalDfsUsed);
+        }
+        dev = (float) Math.sqrt(dev / usages.length);
+      }
+    } catch (IOException e) {
+      LOG.info("Cannot get the live nodes: {}", e.getMessage());
+    }
+
+    final Map<String, Object> innerInfo = new HashMap<>();
+    innerInfo.put("min", StringUtils.format("%.2f%%", min));
+    innerInfo.put("median", StringUtils.format("%.2f%%", median));
+    innerInfo.put("max", StringUtils.format("%.2f%%", max));
+    innerInfo.put("stdDev", StringUtils.format("%.2f%%", dev));
+    info.put("nodeUsage", innerInfo);
+
+    return JSON.toString(info);
+  }
+
+  @Override
+  public long getNumBlocks() {
+    return getNameserviceAggregatedLong(MembershipStats::getNumOfBlocks);
+  }
+
+  @Override
+  public long getNumOfMissingBlocks() {
+    return getNameserviceAggregatedLong(MembershipStats::getNumOfBlocksMissing);
+  }
+
+  @Override
+  public long getNumOfBlocksPendingReplication() {
+    return getNameserviceAggregatedLong(
+        MembershipStats::getNumOfBlocksPendingReplication);
+  }
+
+  @Override
+  public long getNumOfBlocksUnderReplicated() {
+    return getNameserviceAggregatedLong(
+        MembershipStats::getNumOfBlocksUnderReplicated);
+  }
+
+  @Override
+  public long getNumOfBlocksPendingDeletion() {
+    return getNameserviceAggregatedLong(
+        MembershipStats::getNumOfBlocksPendingDeletion);
+  }
+
+  @Override
+  public long getNumFiles() {
+    return getNameserviceAggregatedLong(MembershipStats::getNumOfFiles);
+  }
+
+  @Override
+  public String getRouterStarted() {
+    long startTime = this.router.getStartTime();
+    return new Date(startTime).toString();
+  }
+
+  @Override
+  public String getVersion() {
+    return VersionInfo.getVersion() + ", r" + VersionInfo.getRevision();
+  }
+
+  @Override
+  public String getCompiledDate() {
+    return VersionInfo.getDate();
+  }
+
+  @Override
+  public String getCompileInfo() {
+    return VersionInfo.getDate() + " by " + VersionInfo.getUser() + " from "
+        + VersionInfo.getBranch();
+  }
+
+  @Override
+  public String getHostAndPort() {
+    // TODO this should be the HTTP address
+    return "Unknown";
+  }
+
+  @Override
+  public String getRouterId() {
+    return this.router.getRouterId();
+  }
+
+  @Override
+  public String getClusterId() {
+    try {
+      Collection<String> clusterIds =
+          getNamespaceInfo(FederationNamespaceInfo::getClusterId);
+      return clusterIds.toString();
+    } catch (IOException e) {
+      LOG.error("Cannot fetch cluster ID metrics: {}", e.getMessage());
+      return "";
+    }
+  }
+
+  @Override
+  public String getBlockPoolId() {
+    try {
+      Collection<String> blockpoolIds =
+          getNamespaceInfo(FederationNamespaceInfo::getBlockPoolId);
+      return blockpoolIds.toString();
+    } catch (IOException e) {
+      LOG.error("Cannot fetch block pool ID metrics: {}", e.getMessage());
+      return "";
+    }
+  }
+
+  /**
+   * Build a set of unique values found in all namespaces.
+   *
+   * @param f Method reference of the appropriate FederationNamespaceInfo
+   *          getter function
+   * @return Set of unique string values found in all discovered namespaces.
+   * @throws IOException if the query could not be executed.
+   */
+  private Collection<String> getNamespaceInfo(
+      Function<FederationNamespaceInfo, String> f) throws IOException {
+    GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance();
+    GetNamespaceInfoResponse response =
+        membershipStore.getNamespaceInfo(request);
+    return response.getNamespaceInfo().stream()
+      .map(f)
+      .collect(Collectors.toSet());
+  }
+
+  /**
+   * Get the aggregated value for a method for all nameservices.
+   * @param f Method reference
+   * @return Aggregated integer.
+   */
+  private int getNameserviceAggregatedInt(ToIntFunction<MembershipStats> f) {
+    try {
+      return getActiveNamenodeRegistrations().stream()
+               .map(MembershipState::getStats)
+               .collect(Collectors.summingInt(f));
+    } catch (IOException e) {
+      LOG.error("Unable to extract metrics: {}", e.getMessage());
+      return 0;
+    }
+  }
+
+  /**
+   * Get the aggregated value for a method for all nameservices.
+   * @param f Method reference
+   * @return Aggregated long.
+   */
+  private long getNameserviceAggregatedLong(ToLongFunction<MembershipStats> f) {
+    try {
+      return getActiveNamenodeRegistrations().stream()
+               .map(MembershipState::getStats)
+               .collect(Collectors.summingLong(f));
+    } catch (IOException e) {
+      LOG.error("Unable to extract metrics: {}", e.getMessage());
+      return 0;
+    }
+  }
+
+  /**
+   * Fetches the most active namenode memberships for all known nameservices.
+   * The fetched membership may not or may not be active. Excludes expired
+   * memberships.
+   * @throws IOException if the query could not be performed.
+   * @return List of the most active NNs from each known nameservice.
+   */
+  private List<MembershipState> getActiveNamenodeRegistrations()
+      throws IOException {
+
+    List<MembershipState> resultList = new ArrayList<>();
+    GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance();
+    GetNamespaceInfoResponse response =
+        membershipStore.getNamespaceInfo(request);
+    for (FederationNamespaceInfo nsInfo : response.getNamespaceInfo()) {
+      // Fetch the most recent namenode registration
+      String nsId = nsInfo.getNameserviceId();
+      List<? extends FederationNamenodeContext> nns =
+          namenodeResolver.getNamenodesForNameserviceId(nsId);
+      if (nns != null) {
+        FederationNamenodeContext nn = nns.get(0);
+        if (nn != null && nn instanceof MembershipState) {
+          resultList.add((MembershipState) nn);
+        }
+      }
+    }
+    return resultList;
+  }
+
+  /**
+   * Get time as a date string.
+   * @param time Seconds since 1970.
+   * @return String representing the date.
+   */
+  private static String getDateString(long time) {
+    if (time <= 0) {
+      return "-";
+    }
+    Date date = new Date(time);
+
+    SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
+    return sdf.format(date);
+  }
+
+  /**
+   * Get the number of seconds passed since a date.
+   *
+   * @param timeMs to use as a reference.
+   * @return Seconds since the date.
+   */
+  private static long getSecondsSince(long timeMs) {
+    if (timeMs < 0) {
+      return -1;
+    }
+    return (now() - timeMs) / 1000;
+  }
+
+  /**
+   * Get JSON for this record.
+   *
+   * @return Map representing the data for the JSON representation.
+   */
+  private static Map<String, Object> getJson(BaseRecord record) {
+    Map<String, Object> json = new HashMap<>();
+    Map<String, Class<?>> fields = getFields(record);
+
+    for (String fieldName : fields.keySet()) {
+      if (!fieldName.equalsIgnoreCase("proto")) {
+        try {
+          Object value = getField(record, fieldName);
+          if (value instanceof BaseRecord) {
+            BaseRecord recordField = (BaseRecord) value;
+            json.putAll(getJson(recordField));
+          } else {
+            json.put(fieldName, value == null ? JSONObject.NULL : value);
+          }
+        } catch (Exception e) {
+          throw new IllegalArgumentException(
+              "Cannot serialize field " + fieldName + " into JSON");
+        }
+      }
+    }
+    return json;
+  }
+
+  /**
+   * Returns all serializable fields in the object.
+   *
+   * @return Map with the fields.
+   */
+  private static Map<String, Class<?>> getFields(BaseRecord record) {
+    Map<String, Class<?>> getters = new HashMap<>();
+    for (Method m : record.getClass().getDeclaredMethods()) {
+      if (m.getName().startsWith("get")) {
+        try {
+          Class<?> type = m.getReturnType();
+          char[] c = m.getName().substring(3).toCharArray();
+          c[0] = Character.toLowerCase(c[0]);
+          String key = new String(c);
+          getters.put(key, type);
+        } catch (Exception e) {
+          LOG.error("Cannot execute getter {} on {}", m.getName(), record);
+        }
+      }
+    }
+    return getters;
+  }
+
+  /**
+   * Fetches the value for a field name.
+   *
+   * @param fieldName the legacy name of the field.
+   * @return The field data or null if not found.
+   */
+  private static Object getField(BaseRecord record, String fieldName) {
+    Object result = null;
+    Method m = locateGetter(record, fieldName);
+    if (m != null) {
+      try {
+        result = m.invoke(record);
+      } catch (Exception e) {
+        LOG.error("Cannot get field {} on {}", fieldName, record);
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Finds the appropriate getter for a field name.
+   *
+   * @param fieldName The legacy name of the field.
+   * @return The matching getter or null if not found.
+   */
+  private static Method locateGetter(BaseRecord record, String fieldName) {
+    for (Method m : record.getClass().getMethods()) {
+      if (m.getName().equalsIgnoreCase("get" + fieldName)) {
+        return m;
+      }
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
new file mode 100644
index 0000000..00209e9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.metrics;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * JMX interface for the RPC server.
+ * TODO use the default RPC MBean.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface FederationRPCMBean {
+
+  long getProxyOps();
+
+  double getProxyAvg();
+
+  long getProcessingOps();
+
+  double getProcessingAvg();
+
+  long getProxyOpFailureCommunicate();
+
+  long getProxyOpFailureStandby();
+
+  long getProxyOpNotImplemented();
+
+  long getRouterFailureStateStoreOps();
+
+  long getRouterFailureReadOnlyOps();
+
+  long getRouterFailureLockedOps();
+
+  long getRouterFailureSafemodeOps();
+
+  int getRpcServerCallQueue();
+
+  /**
+   * Get the number of RPC connections between the clients and the Router.
+   * @return Number of RPC connections between the clients and the Router.
+   */
+  int getRpcServerNumOpenConnections();
+
+  /**
+   * Get the number of RPC connections between the Router and the NNs.
+   * @return Number of RPC connections between the Router and the NNs.
+   */
+  int getRpcClientNumConnections();
+
+  /**
+   * Get the number of active RPC connections between the Router and the NNs.
+   * @return Number of active RPC connections between the Router and the NNs.
+   */
+  int getRpcClientNumActiveConnections();
+
+  /**
+   * Get the number of RPC connections to be created.
+   * @return Number of RPC connections to be created.
+   */
+  int getRpcClientNumCreatingConnections();
+
+  /**
+   * Get the number of connection pools between the Router and a NNs.
+   * @return Number of connection pools between the Router and a NNs.
+   */
+  int getRpcClientNumConnectionPools();
+
+  /**
+   * JSON representation of the RPC connections from the Router to the NNs.
+   * @return JSON string representation.
+   */
+  String getRpcClientConnections();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
new file mode 100644
index 0000000..427bca2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.metrics;
+
+import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
+import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableRate;
+
+/**
+ * Implementation of the RPC metrics collector.
+ */
+@Metrics(name = "RouterRPCActivity", about = "Router RPC Activity",
+    context = "router")
+public class FederationRPCMetrics implements FederationRPCMBean {
+
+  private final MetricsRegistry registry = new MetricsRegistry("router");
+
+  private RouterRpcServer rpcServer;
+
+  @Metric("Time for the router to process an operation internally")
+  private MutableRate processing;
+  @Metric("Number of operations the Router processed internally")
+  private MutableCounterLong processingOp;
+  @Metric("Time for the Router to proxy an operation to the Namenodes")
+  private MutableRate proxy;
+  @Metric("Number of operations the Router proxied to a Namenode")
+  private MutableCounterLong proxyOp;
+
+  @Metric("Number of operations to fail to reach NN")
+  private MutableCounterLong proxyOpFailureStandby;
+  @Metric("Number of operations to hit a standby NN")
+  private MutableCounterLong proxyOpFailureCommunicate;
+  @Metric("Number of operations not implemented")
+  private MutableCounterLong proxyOpNotImplemented;
+
+  @Metric("Failed requests due to State Store unavailable")
+  private MutableCounterLong routerFailureStateStore;
+  @Metric("Failed requests due to read only mount point")
+  private MutableCounterLong routerFailureReadOnly;
+  @Metric("Failed requests due to locked path")
+  private MutableCounterLong routerFailureLocked;
+  @Metric("Failed requests due to safe mode")
+  private MutableCounterLong routerFailureSafemode;
+
+  public FederationRPCMetrics(Configuration conf, RouterRpcServer rpcServer) {
+    this.rpcServer = rpcServer;
+
+    registry.tag(SessionId, "RouterRPCSession");
+    registry.tag(ProcessName, "Router");
+  }
+
+  public static FederationRPCMetrics create(Configuration conf,
+      RouterRpcServer rpcServer) {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    return ms.register(FederationRPCMetrics.class.getName(),
+        "HDFS Federation RPC Metrics",
+        new FederationRPCMetrics(conf, rpcServer));
+  }
+
+  /**
+   * Convert nanoseconds to milliseconds.
+   * @param ns Time in nanoseconds.
+   * @return Time in milliseconds.
+   */
+  private static double toMs(double ns) {
+    return ns / 1000000;
+  }
+
+  /**
+   * Reset the metrics system.
+   */
+  public static void reset() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    ms.unregisterSource(FederationRPCMetrics.class.getName());
+  }
+
+  public void incrProxyOpFailureStandby() {
+    proxyOpFailureStandby.incr();
+  }
+
+  @Override
+  public long getProxyOpFailureStandby() {
+    return proxyOpFailureStandby.value();
+  }
+
+  public void incrProxyOpFailureCommunicate() {
+    proxyOpFailureCommunicate.incr();
+  }
+
+  @Override
+  public long getProxyOpFailureCommunicate() {
+    return proxyOpFailureCommunicate.value();
+  }
+
+
+  public void incrProxyOpNotImplemented() {
+    proxyOpNotImplemented.incr();
+  }
+
+  @Override
+  public long getProxyOpNotImplemented() {
+    return proxyOpNotImplemented.value();
+  }
+
+  public void incrRouterFailureStateStore() {
+    routerFailureStateStore.incr();
+  }
+
+  @Override
+  public long getRouterFailureStateStoreOps() {
+    return routerFailureStateStore.value();
+  }
+
+  public void incrRouterFailureSafemode() {
+    routerFailureSafemode.incr();
+  }
+
+  @Override
+  public long getRouterFailureSafemodeOps() {
+    return routerFailureSafemode.value();
+  }
+
+  public void incrRouterFailureReadOnly() {
+    routerFailureReadOnly.incr();
+  }
+
+  @Override
+  public long getRouterFailureReadOnlyOps() {
+    return routerFailureReadOnly.value();
+  }
+
+  public void incrRouterFailureLocked() {
+    routerFailureLocked.incr();
+  }
+
+  @Override
+  public long getRouterFailureLockedOps() {
+    return routerFailureLocked.value();
+  }
+
+  @Override
+  public int getRpcServerCallQueue() {
+    return rpcServer.getServer().getCallQueueLen();
+  }
+
+  @Override
+  public int getRpcServerNumOpenConnections() {
+    return rpcServer.getServer().getNumOpenConnections();
+  }
+
+  @Override
+  public int getRpcClientNumConnections() {
+    return rpcServer.getRPCClient().getNumConnections();
+  }
+
+  @Override
+  public int getRpcClientNumActiveConnections() {
+    return rpcServer.getRPCClient().getNumActiveConnections();
+  }
+
+  @Override
+  public int getRpcClientNumCreatingConnections() {
+    return rpcServer.getRPCClient().getNumCreatingConnections();
+  }
+
+  @Override
+  public int getRpcClientNumConnectionPools() {
+    return rpcServer.getRPCClient().getNumConnectionPools();
+  }
+
+  @Override
+  public String getRpcClientConnections() {
+    return rpcServer.getRPCClient().getJSON();
+  }
+
+  /**
+   * Add the time to proxy an operation from the moment the Router sends it to
+   * the Namenode until it replied.
+   * @param time Proxy time of an operation in nanoseconds.
+   */
+  public void addProxyTime(long time) {
+    proxy.add(time);
+    proxyOp.incr();
+  }
+
+  @Override
+  public double getProxyAvg() {
+    return toMs(proxy.lastStat().mean());
+  }
+
+  @Override
+  public long getProxyOps() {
+    return proxyOp.value();
+  }
+
+  /**
+   * Add the time to process a request in the Router from the time we receive
+   * the call until we send it to the Namenode.
+   * @param time Process time of an operation in nanoseconds.
+   */
+  public void addProcessingTime(long time) {
+    processing.add(time);
+    processingOp.incr();
+  }
+
+  @Override
+  public double getProcessingAvg() {
+    return toMs(processing.lastStat().mean());
+  }
+
+  @Override
+  public long getProcessingOps() {
+    return processingOp.value();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java
new file mode 100644
index 0000000..e3a16b5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.metrics;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcMonitor;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.metrics2.util.MBeans;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Customizable RPC performance monitor. Receives events from the RPC server
+ * and aggregates them via JMX.
+ */
+public class FederationRPCPerformanceMonitor implements RouterRpcMonitor {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(FederationRPCPerformanceMonitor.class);
+
+
+  /** Time for an operation to be received in the Router. */
+  private static final ThreadLocal<Long> START_TIME = new ThreadLocal<>();
+  /** Time for an operation to be send to the Namenode. */
+  private static final ThreadLocal<Long> PROXY_TIME = new ThreadLocal<>();
+
+  /** Configuration for the performance monitor. */
+  private Configuration conf;
+  /** RPC server for the Router. */
+  private RouterRpcServer server;
+  /** State Store. */
+  private StateStoreService store;
+
+  /** JMX interface to monitor the RPC metrics. */
+  private FederationRPCMetrics metrics;
+  private ObjectName registeredBean;
+
+  /** Thread pool for logging stats. */
+  private ExecutorService executor;
+
+
+  @Override
+  public void init(Configuration configuration, RouterRpcServer rpcServer,
+      StateStoreService stateStore) {
+
+    this.conf = configuration;
+    this.server = rpcServer;
+    this.store = stateStore;
+
+    // Create metrics
+    this.metrics = FederationRPCMetrics.create(conf, server);
+
+    // Create thread pool
+    ThreadFactory threadFactory = new ThreadFactoryBuilder()
+        .setNameFormat("Federation RPC Performance Monitor-%d").build();
+    this.executor = Executors.newFixedThreadPool(1, threadFactory);
+
+    // Adding JMX interface
+    try {
+      StandardMBean bean =
+          new StandardMBean(this.metrics, FederationRPCMBean.class);
+      registeredBean = MBeans.register("Router", "FederationRPC", bean);
+      LOG.info("Registered FederationRPCMBean: {}", registeredBean);
+    } catch (NotCompliantMBeanException e) {
+      throw new RuntimeException("Bad FederationRPCMBean setup", e);
+    }
+  }
+
+  @Override
+  public void close() {
+    if (registeredBean != null) {
+      MBeans.unregister(registeredBean);
+      registeredBean = null;
+    }
+    if (this.executor != null) {
+      this.executor.shutdown();
+    }
+  }
+
+  /**
+   * Resets all RPC service performance counters to their defaults.
+   */
+  public void resetPerfCounters() {
+    if (registeredBean != null) {
+      MBeans.unregister(registeredBean);
+      registeredBean = null;
+    }
+    if (metrics != null) {
+      FederationRPCMetrics.reset();
+      metrics = null;
+    }
+    init(conf, server, store);
+  }
+
+  @Override
+  public void startOp() {
+    START_TIME.set(this.getNow());
+  }
+
+  @Override
+  public long proxyOp() {
+    PROXY_TIME.set(this.getNow());
+    long processingTime = getProcessingTime();
+    if (processingTime >= 0) {
+      metrics.addProcessingTime(processingTime);
+    }
+    return Thread.currentThread().getId();
+  }
+
+  @Override
+  public void proxyOpComplete(boolean success) {
+    if (success) {
+      long proxyTime = getProxyTime();
+      if (proxyTime >= 0) {
+        metrics.addProxyTime(proxyTime);
+      }
+    }
+  }
+
+  @Override
+  public void proxyOpFailureStandby() {
+    metrics.incrProxyOpFailureStandby();
+  }
+
+  @Override
+  public void proxyOpFailureCommunicate() {
+    metrics.incrProxyOpFailureCommunicate();
+  }
+
+  @Override
+  public void proxyOpNotImplemented() {
+    metrics.incrProxyOpNotImplemented();
+  }
+
+  @Override
+  public void routerFailureStateStore() {
+    metrics.incrRouterFailureStateStore();
+  }
+
+  @Override
+  public void routerFailureSafemode() {
+    metrics.incrRouterFailureSafemode();
+  }
+
+  @Override
+  public void routerFailureReadOnly() {
+    metrics.incrRouterFailureReadOnly();
+  }
+
+  @Override
+  public void routerFailureLocked() {
+    metrics.incrRouterFailureLocked();
+  }
+
+  /**
+   * Get current time.
+   * @return Current time in nanoseconds.
+   */
+  private long getNow() {
+    return System.nanoTime();
+  }
+
+  /**
+   * Get time between we receiving the operation and sending it to the Namenode.
+   * @return Processing time in nanoseconds.
+   */
+  private long getProcessingTime() {
+    if (START_TIME.get() != null && START_TIME.get() > 0 &&
+        PROXY_TIME.get() != null && PROXY_TIME.get() > 0) {
+      return PROXY_TIME.get() - START_TIME.get();
+    }
+    return -1;
+  }
+
+  /**
+   * Get time between now and when the operation was forwarded to the Namenode.
+   * @return Current proxy time in nanoseconds.
+   */
+  private long getProxyTime() {
+    if (PROXY_TIME.get() != null && PROXY_TIME.get() > 0) {
+      return getNow() - PROXY_TIME.get();
+    }
+    return -1;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java
new file mode 100644
index 0000000..23cd675
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java
@@ -0,0 +1,624 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.metrics;
+
+import static org.apache.hadoop.util.Time.now;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeMXBean;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeStatusMXBean;
+import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
+import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.VersionInfo;
+import org.eclipse.jetty.util.ajax.JSON;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Expose the Namenode metrics as the Router was one.
+ */
+public class NamenodeBeanMetrics
+    implements FSNamesystemMBean, NameNodeMXBean, NameNodeStatusMXBean {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(NamenodeBeanMetrics.class);
+
+  private final Router router;
+
+  /** FSNamesystem bean. */
+  private ObjectName fsBeanName;
+  /** FSNamesystemState bean. */
+  private ObjectName fsStateBeanName;
+  /** NameNodeInfo bean. */
+  private ObjectName nnInfoBeanName;
+  /** NameNodeStatus bean. */
+  private ObjectName nnStatusBeanName;
+
+
+  public NamenodeBeanMetrics(Router router) {
+    this.router = router;
+
+    try {
+      // TODO this needs to be done with the Metrics from FSNamesystem
+      StandardMBean bean = new StandardMBean(this, FSNamesystemMBean.class);
+      this.fsBeanName = MBeans.register("NameNode", "FSNamesystem", bean);
+      LOG.info("Registered FSNamesystem MBean: {}", this.fsBeanName);
+    } catch (NotCompliantMBeanException e) {
+      throw new RuntimeException("Bad FSNamesystem MBean setup", e);
+    }
+
+    try {
+      StandardMBean bean = new StandardMBean(this, FSNamesystemMBean.class);
+      this.fsStateBeanName =
+          MBeans.register("NameNode", "FSNamesystemState", bean);
+      LOG.info("Registered FSNamesystemState MBean: {}", this.fsStateBeanName);
+    } catch (NotCompliantMBeanException e) {
+      throw new RuntimeException("Bad FSNamesystemState MBean setup", e);
+    }
+
+    try {
+      StandardMBean bean = new StandardMBean(this, NameNodeMXBean.class);
+      this.nnInfoBeanName = MBeans.register("NameNode", "NameNodeInfo", bean);
+      LOG.info("Registered NameNodeInfo MBean: {}", this.nnInfoBeanName);
+    } catch (NotCompliantMBeanException e) {
+      throw new RuntimeException("Bad NameNodeInfo MBean setup", e);
+    }
+
+    try {
+      StandardMBean bean = new StandardMBean(this, NameNodeStatusMXBean.class);
+      this.nnStatusBeanName =
+          MBeans.register("NameNode", "NameNodeStatus", bean);
+      LOG.info("Registered NameNodeStatus MBean: {}", this.nnStatusBeanName);
+    } catch (NotCompliantMBeanException e) {
+      throw new RuntimeException("Bad NameNodeStatus MBean setup", e);
+    }
+  }
+
+  /**
+   * De-register the JMX interfaces.
+   */
+  public void close() {
+    if (fsStateBeanName != null) {
+      MBeans.unregister(fsStateBeanName);
+      fsStateBeanName = null;
+    }
+    if (nnInfoBeanName != null) {
+      MBeans.unregister(nnInfoBeanName);
+      nnInfoBeanName = null;
+    }
+    // Remove the NameNode status bean
+    if (nnStatusBeanName != null) {
+      MBeans.unregister(nnStatusBeanName);
+      nnStatusBeanName = null;
+    }
+  }
+
+  private FederationMetrics getFederationMetrics() {
+    return this.router.getMetrics();
+  }
+
+  /////////////////////////////////////////////////////////
+  // NameNodeMXBean
+  /////////////////////////////////////////////////////////
+
+  @Override
+  public String getVersion() {
+    return VersionInfo.getVersion() + ", r" + VersionInfo.getRevision();
+  }
+
+  @Override
+  public String getSoftwareVersion() {
+    return VersionInfo.getVersion();
+  }
+
+  @Override
+  public long getUsed() {
+    return getFederationMetrics().getUsedCapacity();
+  }
+
+  @Override
+  public long getFree() {
+    return getFederationMetrics().getRemainingCapacity();
+  }
+
+  @Override
+  public long getTotal() {
+    return getFederationMetrics().getTotalCapacity();
+  }
+
+  @Override
+  public String getSafemode() {
+    // We assume that the global federated view is never in safe mode
+    return "";
+  }
+
+  @Override
+  public boolean isUpgradeFinalized() {
+    // We assume the upgrade is always finalized in a federated biew
+    return true;
+  }
+
+  @Override
+  public RollingUpgradeInfo.Bean getRollingUpgradeStatus() {
+    return null;
+  }
+
+  @Override
+  public long getNonDfsUsedSpace() {
+    return 0;
+  }
+
+  @Override
+  public float getPercentUsed() {
+    return DFSUtilClient.getPercentUsed(getCapacityUsed(), getCapacityTotal());
+  }
+
+  @Override
+  public float getPercentRemaining() {
+    return DFSUtilClient.getPercentUsed(
+        getCapacityRemaining(), getCapacityTotal());
+  }
+
+  @Override
+  public long getCacheUsed() {
+    return 0;
+  }
+
+  @Override
+  public long getCacheCapacity() {
+    return 0;
+  }
+
+  @Override
+  public long getBlockPoolUsedSpace() {
+    return 0;
+  }
+
+  @Override
+  public float getPercentBlockPoolUsed() {
+    return 0;
+  }
+
+  @Override
+  public long getTotalBlocks() {
+    return getFederationMetrics().getNumBlocks();
+  }
+
+  @Override
+  public long getNumberOfMissingBlocks() {
+    return getFederationMetrics().getNumOfMissingBlocks();
+  }
+
+  @Override
+  @Deprecated
+  public long getPendingReplicationBlocks() {
+    return getFederationMetrics().getNumOfBlocksPendingReplication();
+  }
+
+  @Override
+  public long getPendingReconstructionBlocks() {
+    return getFederationMetrics().getNumOfBlocksPendingReplication();
+  }
+
+  @Override
+  @Deprecated
+  public long getUnderReplicatedBlocks() {
+    return getFederationMetrics().getNumOfBlocksUnderReplicated();
+  }
+
+  @Override
+  public long getLowRedundancyBlocks() {
+    return getFederationMetrics().getNumOfBlocksUnderReplicated();
+  }
+
+  @Override
+  public long getPendingDeletionBlocks() {
+    return getFederationMetrics().getNumOfBlocksPendingDeletion();
+  }
+
+  @Override
+  public long getScheduledReplicationBlocks() {
+    return -1;
+  }
+
+  @Override
+  public long getNumberOfMissingBlocksWithReplicationFactorOne() {
+    return 0;
+  }
+
+  @Override
+  public String getCorruptFiles() {
+    return "N/A";
+  }
+
+  @Override
+  public int getThreads() {
+    return ManagementFactory.getThreadMXBean().getThreadCount();
+  }
+
+  @Override
+  public String getLiveNodes() {
+    return this.getNodes(DatanodeReportType.LIVE);
+  }
+
+  @Override
+  public String getDeadNodes() {
+    return this.getNodes(DatanodeReportType.DEAD);
+  }
+
+  @Override
+  public String getDecomNodes() {
+    return this.getNodes(DatanodeReportType.DECOMMISSIONING);
+  }
+
+  /**
+   * Get all the nodes in the federation from a particular type.
+   * TODO this is expensive, we may want to cache it.
+   * @param type Type of the datanodes to check.
+   * @return JSON with the nodes.
+   */
+  private String getNodes(DatanodeReportType type) {
+    final Map<String, Map<String, Object>> info = new HashMap<>();
+    try {
+      RouterRpcServer rpcServer = this.router.getRpcServer();
+      DatanodeInfo[] datanodes = rpcServer.getDatanodeReport(type);
+      for (DatanodeInfo node : datanodes) {
+        Map<String, Object> innerinfo = new HashMap<>();
+        innerinfo.put("infoAddr", node.getInfoAddr());
+        innerinfo.put("infoSecureAddr", node.getInfoSecureAddr());
+        innerinfo.put("xferaddr", node.getXferAddr());
+        innerinfo.put("location", node.getNetworkLocation());
+        innerinfo.put("lastContact", getLastContact(node));
+        innerinfo.put("usedSpace", node.getDfsUsed());
+        innerinfo.put("adminState", node.getAdminState().toString());
+        innerinfo.put("nonDfsUsedSpace", node.getNonDfsUsed());
+        innerinfo.put("capacity", node.getCapacity());
+        innerinfo.put("numBlocks", -1); // node.numBlocks()
+        innerinfo.put("version", (node.getSoftwareVersion() == null ?
+                        "UNKNOWN" : node.getSoftwareVersion()));
+        innerinfo.put("used", node.getDfsUsed());
+        innerinfo.put("remaining", node.getRemaining());
+        innerinfo.put("blockScheduled", -1); // node.getBlocksScheduled()
+        innerinfo.put("blockPoolUsed", node.getBlockPoolUsed());
+        innerinfo.put("blockPoolUsedPercent", node.getBlockPoolUsedPercent());
+        innerinfo.put("volfails", -1); // node.getVolumeFailures()
+        info.put(node.getHostName() + ":" + node.getXferPort(),
+            Collections.unmodifiableMap(innerinfo));
+      }
+    } catch (StandbyException e) {
+      LOG.error("Cannot get {} nodes, Router in safe mode", type);
+    } catch (IOException e) {
+      LOG.error("Cannot get " + type + " nodes", e);
+    }
+    return JSON.toString(info);
+  }
+
+  @Override
+  public String getClusterId() {
+    try {
+      return getNamespaceInfo(FederationNamespaceInfo::getClusterId).toString();
+    } catch (IOException e) {
+      LOG.error("Cannot fetch cluster ID metrics {}", e.getMessage());
+      return "";
+    }
+  }
+
+  @Override
+  public String getBlockPoolId() {
+    try {
+      return
+          getNamespaceInfo(FederationNamespaceInfo::getBlockPoolId).toString();
+    } catch (IOException e) {
+      LOG.error("Cannot fetch block pool ID metrics {}", e.getMessage());
+      return "";
+    }
+  }
+
+  /**
+   * Build a set of unique values found in all namespaces.
+   *
+   * @param f Method reference of the appropriate FederationNamespaceInfo
+   *          getter function
+   * @return Set of unique string values found in all discovered namespaces.
+   * @throws IOException if the query could not be executed.
+   */
+  private Collection<String> getNamespaceInfo(
+      Function<FederationNamespaceInfo, String> f) throws IOException {
+    StateStoreService stateStore = router.getStateStore();
+    MembershipStore membershipStore =
+        stateStore.getRegisteredRecordStore(MembershipStore.class);
+
+    GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance();
+    GetNamespaceInfoResponse response =
+        membershipStore.getNamespaceInfo(request);
+    return response.getNamespaceInfo().stream()
+      .map(f)
+      .collect(Collectors.toSet());
+  }
+
+  @Override
+  public String getNameDirStatuses() {
+    return "N/A";
+  }
+
+  @Override
+  public String getNodeUsage() {
+    return "N/A";
+  }
+
+  @Override
+  public String getNameJournalStatus() {
+    return "N/A";
+  }
+
+  @Override
+  public String getJournalTransactionInfo() {
+    return "N/A";
+  }
+
+  @Override
+  public long getNNStartedTimeInMillis() {
+    return this.router.getStartTime();
+  }
+
+  @Override
+  public String getCompileInfo() {
+    return VersionInfo.getDate() + " by " + VersionInfo.getUser() +
+        " from " + VersionInfo.getBranch();
+  }
+
+  @Override
+  public int getDistinctVersionCount() {
+    return 0;
+  }
+
+  @Override
+  public Map<String, Integer> getDistinctVersions() {
+    return null;
+  }
+
+  /////////////////////////////////////////////////////////
+  // FSNamesystemMBean
+  /////////////////////////////////////////////////////////
+
+  @Override
+  public String getFSState() {
+    // We assume is not in safe mode
+    return "Operational";
+  }
+
+  @Override
+  public long getBlocksTotal() {
+    return this.getTotalBlocks();
+  }
+
+  @Override
+  public long getCapacityTotal() {
+    return this.getTotal();
+  }
+
+  @Override
+  public long getCapacityRemaining() {
+    return this.getFree();
+  }
+
+  @Override
+  public long getCapacityUsed() {
+    return this.getUsed();
+  }
+
+  @Override
+  public long getFilesTotal() {
+    return getFederationMetrics().getNumFiles();
+  }
+
+  @Override
+  public int getTotalLoad() {
+    return -1;
+  }
+
+  @Override
+  public int getNumLiveDataNodes() {
+    return this.router.getMetrics().getNumLiveNodes();
+  }
+
+  @Override
+  public int getNumDeadDataNodes() {
+    return this.router.getMetrics().getNumDeadNodes();
+  }
+
+  @Override
+  public int getNumStaleDataNodes() {
+    return -1;
+  }
+
+  @Override
+  public int getNumDecomLiveDataNodes() {
+    return this.router.getMetrics().getNumDecomLiveNodes();
+  }
+
+  @Override
+  public int getNumDecomDeadDataNodes() {
+    return this.router.getMetrics().getNumDecomDeadNodes();
+  }
+
+  @Override
+  public int getNumDecommissioningDataNodes() {
+    return this.router.getMetrics().getNumDecommissioningNodes();
+  }
+
+  @Override
+  public int getNumInMaintenanceLiveDataNodes() {
+    return 0;
+  }
+
+  @Override
+  public int getNumInMaintenanceDeadDataNodes() {
+    return 0;
+  }
+
+  @Override
+  public int getNumEnteringMaintenanceDataNodes() {
+    return 0;
+  }
+
+  @Override
+  public int getVolumeFailuresTotal() {
+    return 0;
+  }
+
+  @Override
+  public long getEstimatedCapacityLostTotal() {
+    return 0;
+  }
+
+  @Override
+  public String getSnapshotStats() {
+    return null;
+  }
+
+  @Override
+  public long getMaxObjects() {
+    return 0;
+  }
+
+  @Override
+  public long getBlockDeletionStartTime() {
+    return -1;
+  }
+
+  @Override
+  public int getNumStaleStorages() {
+    return -1;
+  }
+
+  @Override
+  public String getTopUserOpCounts() {
+    return "N/A";
+  }
+
+  @Override
+  public int getFsLockQueueLength() {
+    return 0;
+  }
+
+  @Override
+  public long getTotalSyncCount() {
+    return 0;
+  }
+
+  @Override
+  public String getTotalSyncTimes() {
+    return "";
+  }
+
+  private long getLastContact(DatanodeInfo node) {
+    return (now() - node.getLastUpdate()) / 1000;
+  }
+
+  /////////////////////////////////////////////////////////
+  // NameNodeStatusMXBean
+  /////////////////////////////////////////////////////////
+
+  @Override
+  public String getNNRole() {
+    return NamenodeRole.NAMENODE.toString();
+  }
+
+  @Override
+  public String getState() {
+    return HAServiceState.ACTIVE.toString();
+  }
+
+  @Override
+  public String getHostAndPort() {
+    return NetUtils.getHostPortString(router.getRpcServerAddress());
+  }
+
+  @Override
+  public boolean isSecurityEnabled() {
+    return false;
+  }
+
+  @Override
+  public long getLastHATransitionTime() {
+    return 0;
+  }
+
+  @Override
+  public long getBytesWithFutureGenerationStamps() {
+    return 0;
+  }
+
+  @Override
+  public String getSlowPeersReport() {
+    return "N/A";
+  }
+
+  @Override
+  public String getSlowDisksReport() {
+    return "N/A";
+  }
+
+  @Override
+  public long getNumberOfSnapshottableDirs() {
+    return 0;
+  }
+
+  @Override
+  public String getEnteringMaintenanceNodes() {
+    return "N/A";
+  }
+
+  @Override
+  public String getNameDirSize() {
+    return "N/A";
+  }
+
+  @Override
+  public int getNumEncryptionZones() {
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMBean.java
new file mode 100644
index 0000000..5e4ccab
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMBean.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.metrics;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * JMX interface for the State Store metrics.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface StateStoreMBean {
+
+  long getReadOps();
+
+  double getReadAvg();
+
+  long getWriteOps();
+
+  double getWriteAvg();
+
+  long getFailureOps();
+
+  double getFailureAvg();
+
+  long getRemoveOps();
+
+  double getRemoveAvg();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMetrics.java
new file mode 100644
index 0000000..c17eabc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMetrics.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.metrics;
+
+import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
+import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.metrics2.lib.MutableRate;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Implementations of the JMX interface for the State Store metrics.
+ */
+@Metrics(name = "StateStoreActivity", about = "Router metrics",
+    context = "router")
+public final class StateStoreMetrics implements StateStoreMBean {
+
+  private final MetricsRegistry registry = new MetricsRegistry("router");
+
+  @Metric("GET transactions")
+  private MutableRate reads;
+  @Metric("PUT transactions")
+  private MutableRate writes;
+  @Metric("REMOVE transactions")
+  private MutableRate removes;
+  @Metric("Failed transactions")
+  private MutableRate failures;
+
+  private Map<String, MutableGaugeInt> cacheSizes;
+
+  private StateStoreMetrics(Configuration conf) {
+    registry.tag(SessionId, "RouterSession");
+    registry.tag(ProcessName, "Router");
+    cacheSizes = new HashMap<>();
+  }
+
+  public static StateStoreMetrics create(Configuration conf) {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    return ms.register(new StateStoreMetrics(conf));
+  }
+
+  public void shutdown() {
+    DefaultMetricsSystem.shutdown();
+    reset();
+  }
+
+  public void addRead(long latency) {
+    reads.add(latency);
+  }
+
+  public long getReadOps() {
+    return reads.lastStat().numSamples();
+  }
+
+  public double getReadAvg() {
+    return reads.lastStat().mean();
+  }
+
+  public void addWrite(long latency) {
+    writes.add(latency);
+  }
+
+  public long getWriteOps() {
+    return writes.lastStat().numSamples();
+  }
+
+  public double getWriteAvg() {
+    return writes.lastStat().mean();
+  }
+
+  public void addFailure(long latency) {
+    failures.add(latency);
+  }
+
+  public long getFailureOps() {
+    return failures.lastStat().numSamples();
+  }
+
+  public double getFailureAvg() {
+    return failures.lastStat().mean();
+  }
+
+  public void addRemove(long latency) {
+    removes.add(latency);
+  }
+
+  public long getRemoveOps() {
+    return removes.lastStat().numSamples();
+  }
+
+  public double getRemoveAvg() {
+    return removes.lastStat().mean();
+  }
+
+  /**
+   * Set the size of the cache for a State Store interface.
+   *
+   * @param name Name of the record to cache.
+   * @param size Number of records.
+   */
+  public void setCacheSize(String name, int size) {
+    String counterName = "Cache" + name + "Size";
+    MutableGaugeInt counter = cacheSizes.get(counterName);
+    if (counter == null) {
+      counter = registry.newGauge(counterName, name, size);
+      cacheSizes.put(counterName, counter);
+    }
+    counter.set(size);
+  }
+
+  @VisibleForTesting
+  public void reset() {
+    reads.resetMinMax();
+    writes.resetMinMax();
+    removes.resetMinMax();
+    failures.resetMinMax();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/package-info.java
new file mode 100644
index 0000000..c56c823
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/package-info.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Report metrics for Router-based Federation.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+package org.apache.hadoop.hdfs.server.federation.metrics;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
index d93d498..543d964 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
@@ -23,6 +23,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.TreeMap;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -35,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Time;
+import org.eclipse.jetty.util.ajax.JSON;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -283,6 +285,27 @@ public class ConnectionManager {
   }
 
   /**
+   * Get a JSON representation of the connection pool.
+   *
+   * @return JSON representation of all the connection pools.
+   */
+  public String getJSON() {
+    final Map<String, String> info = new TreeMap<>();
+    readLock.lock();
+    try {
+      for (Entry<ConnectionPoolId, ConnectionPool> entry :
+          this.pools.entrySet()) {
+        ConnectionPoolId connectionPoolId = entry.getKey();
+        ConnectionPool pool = entry.getValue();
+        info.put(connectionPoolId.toString(), pool.getJSON());
+      }
+    } finally {
+      readLock.unlock();
+    }
+    return JSON.toString(info);
+  }
+
+  /**
    * Removes stale connections not accessed recently from the pool. This is
    * invoked periodically.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
index f76f621..ca113ef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
@@ -20,8 +20,10 @@ package org.apache.hadoop.hdfs.server.federation.router;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -46,6 +48,7 @@ import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Time;
+import org.eclipse.jetty.util.ajax.JSON;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -258,6 +261,26 @@ public class ConnectionPool {
   }
 
   /**
+   * JSON representation of the connection pool.
+   *
+   * @return String representation of the JSON.
+   */
+  public String getJSON() {
+    final Map<String, String> info = new LinkedHashMap<>();
+    info.put("active", Integer.toString(getNumActiveConnections()));
+    info.put("total", Integer.toString(getNumConnections()));
+    if (LOG.isDebugEnabled()) {
+      List<ConnectionContext> tmpConnections = this.connections;
+      for (int i=0; i<tmpConnections.size(); i++) {
+        ConnectionContext connection = tmpConnections.get(i);
+        info.put(i + " active", Boolean.toString(connection.isActive()));
+        info.put(i + " closed", Boolean.toString(connection.isClosed()));
+      }
+    }
+    return JSON.toString(info);
+  }
+
+  /**
    * Create a new proxy wrapper for a client NN connection.
    * @return Proxy for the target ClientProtocol that contains the user's
    *         security context.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d522007c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
index fcbd2eb..3ab5e2d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
@@ -36,10 +36,14 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.federation.metrics.FederationMetrics;
 import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
 import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.JvmPauseMonitor;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
@@ -96,6 +100,12 @@ public class Router extends CompositeService {
   /** Updates the namenode status in the namenode resolver. */
   private Collection<NamenodeHeartbeatService> namenodeHearbeatServices;
 
+  /** Router metrics. */
+  private RouterMetricsService metrics;
+
+  /** JVM pauses (GC and others). */
+  private JvmPauseMonitor pauseMonitor;
+
 
   /** Usage string for help message. */
   private static final String USAGE = "Usage: java Router";
@@ -174,18 +184,46 @@ public class Router extends CompositeService {
       }
     }
 
+    // Router metrics system
+    if (conf.getBoolean(
+        DFSConfigKeys.DFS_ROUTER_METRICS_ENABLE,
+        DFSConfigKeys.DFS_ROUTER_METRICS_ENABLE_DEFAULT)) {
+
+      DefaultMetricsSystem.initialize("Router");
+
+      this.metrics = new RouterMetricsService(this);
+      addService(this.metrics);
+
+      // JVM pause monitor
+      this.pauseMonitor = new JvmPauseMonitor();
+      this.pauseMonitor.init(conf);
+    }
+
     super.serviceInit(conf);
   }
 
   @Override
   protected void serviceStart() throws Exception {
 
+    if (this.pauseMonitor != null) {
+      this.pauseMonitor.start();
+      JvmMetrics jvmMetrics = this.metrics.getJvmMetrics();
+      if (jvmMetrics != null) {
+        jvmMetrics.setPauseMonitor(pauseMonitor);
+      }
+    }
+
     super.serviceStart();
   }
 
   @Override
   protected void serviceStop() throws Exception {
 
+    // JVM pause monitor
+    if (this.pauseMonitor != null) {
+      this.pauseMonitor.stop();
+    }
+
     super.serviceStop();
   }
 
@@ -419,6 +457,30 @@ public class Router extends CompositeService {
   }
 
   /**
+   * Get the metrics system for the Router.
+   *
+   * @return Router metrics.
+   */
+  public RouterMetrics getRouterMetrics() {
+    if (this.metrics != null) {
+      return this.metrics.getRouterMetrics();
+    }
+    return null;
+  }
+
+  /**
+   * Get the federation metrics.
+   *
+   * @return Federation metrics.
+   */
+  public FederationMetrics getMetrics() {
+    if (this.metrics != null) {
+      return this.metrics.getFederationMetrics();
+    }
+    return null;
+  }
+
+  /**
    * Get the subcluster resolver for files.
    *
    * @return Subcluster resolver for files.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org