You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2017/09/08 20:58:16 UTC
[44/45] hadoop git commit: HDFS-12335. Federation Metrics.
Contributed by Inigo Goiri.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a4ced36/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetrics.java
new file mode 100644
index 0000000..851538a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetrics.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
+import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
+
+/**
+ * This class is for maintaining the various Router activity statistics
+ * and publishing them through the metrics interfaces.
+ */
+@Metrics(name="RouterActivity", about="Router metrics", context="dfs")
+public class RouterMetrics {
+
+ private final MetricsRegistry registry = new MetricsRegistry("router");
+
+ @Metric("Duration in SafeMode at startup in msec")
+ private MutableGaugeInt safeModeTime;
+
+ private JvmMetrics jvmMetrics = null;
+
+ RouterMetrics(
+ String processName, String sessionId, final JvmMetrics jvmMetrics) {
+ this.jvmMetrics = jvmMetrics;
+ registry.tag(ProcessName, processName).tag(SessionId, sessionId);
+ }
+
+ public static RouterMetrics create(Configuration conf) {
+ String sessionId = conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY);
+ String processName = "Router";
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ JvmMetrics jm = JvmMetrics.create(processName, sessionId, ms);
+
+ return ms.register(new RouterMetrics(processName, sessionId, jm));
+ }
+
+ public JvmMetrics getJvmMetrics() {
+ return jvmMetrics;
+ }
+
+ public void shutdown() {
+ DefaultMetricsSystem.shutdown();
+ }
+
+ public void setSafeModeTime(long elapsed) {
+ safeModeTime.set((int) elapsed);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a4ced36/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java
new file mode 100644
index 0000000..f4debce
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.metrics.FederationMetrics;
+import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
+import org.apache.hadoop.service.AbstractService;
+
+/**
+ * Service to manage the metrics of the Router.
+ */
+public class RouterMetricsService extends AbstractService {
+
+ /** Router for this metrics. */
+ private final Router router;
+
+ /** Router metrics. */
+ private RouterMetrics routerMetrics;
+ /** Federation metrics. */
+ private FederationMetrics federationMetrics;
+ /** Namenode mock metrics. */
+ private NamenodeBeanMetrics nnMetrics;
+
+
+ public RouterMetricsService(final Router router) {
+ super(RouterMetricsService.class.getName());
+ this.router = router;
+ }
+
+ @Override
+ protected void serviceInit(Configuration configuration) throws Exception {
+ this.routerMetrics = RouterMetrics.create(configuration);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ // Wrapper for all the FSNamesystem JMX interfaces
+ this.nnMetrics = new NamenodeBeanMetrics(this.router);
+
+ // Federation MBean JMX interface
+ this.federationMetrics = new FederationMetrics(this.router);
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ // Remove JMX interfaces
+ if (this.federationMetrics != null) {
+ this.federationMetrics.close();
+ }
+
+ // Remove Namenode JMX interfaces
+ if (this.nnMetrics != null) {
+ this.nnMetrics.close();
+ }
+
+ // Shutdown metrics
+ if (this.routerMetrics != null) {
+ this.routerMetrics.shutdown();
+ }
+ }
+
+ /**
+ * Get the metrics system for the Router.
+ *
+ * @return Router metrics.
+ */
+ public RouterMetrics getRouterMetrics() {
+ return this.routerMetrics;
+ }
+
+ /**
+ * Get the federation metrics.
+ *
+ * @return Federation metrics.
+ */
+ public FederationMetrics getFederationMetrics() {
+ return this.federationMetrics;
+ }
+
+ /**
+ * Get the JVM metrics for the Router.
+ *
+ * @return JVM metrics.
+ */
+ public JvmMetrics getJvmMetrics() {
+ if (this.routerMetrics == null) {
+ return null;
+ }
+ return this.routerMetrics.getJvmMetrics();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a4ced36/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index 3a32be1..5c33c2e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -97,6 +97,8 @@ public class RouterRpcClient {
private final ExecutorService executorService;
/** Retry policy for router -> NN communication. */
private final RetryPolicy retryPolicy;
+ /** Optional perf monitor. */
+ private final RouterRpcMonitor rpcMonitor;
/** Pattern to parse a stack trace line. */
private static final Pattern STACK_TRACE_PATTERN =
@@ -111,8 +113,7 @@ public class RouterRpcClient {
* @param monitor Optional performance monitor.
*/
public RouterRpcClient(Configuration conf, String identifier,
- ActiveNamenodeResolver resolver) {
-
+ ActiveNamenodeResolver resolver, RouterRpcMonitor monitor) {
this.routerId = identifier;
this.namenodeResolver = resolver;
@@ -125,6 +126,8 @@ public class RouterRpcClient {
.build();
this.executorService = Executors.newCachedThreadPool(threadFactory);
+ this.rpcMonitor = monitor;
+
int maxFailoverAttempts = conf.getInt(
HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY,
HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT);
@@ -192,6 +195,15 @@ public class RouterRpcClient {
}
/**
+ * JSON representation of the connection pool.
+ *
+ * @return String representation of the JSON.
+ */
+ public String getJSON() {
+ return this.connectionManager.getJSON();
+ }
+
+ /**
* Get ClientProtocol proxy client for a NameNode. Each combination of user +
* NN must use a unique proxy client. Previously created clients are cached
* and stored in a connection pool by the ConnectionManager.
@@ -294,6 +306,9 @@ public class RouterRpcClient {
}
Object ret = null;
+ if (rpcMonitor != null) {
+ rpcMonitor.proxyOp();
+ }
boolean failover = false;
Map<FederationNamenodeContext, IOException> ioes = new LinkedHashMap<>();
for (FederationNamenodeContext namenode : namenodes) {
@@ -310,18 +325,31 @@ public class RouterRpcClient {
InetSocketAddress address = client.getAddress();
namenodeResolver.updateActiveNamenode(nsId, address);
}
+ if (this.rpcMonitor != null) {
+ this.rpcMonitor.proxyOpComplete(true);
+ }
return ret;
} catch (IOException ioe) {
ioes.put(namenode, ioe);
if (ioe instanceof StandbyException) {
// Fail over indicated by retry policy and/or NN
+ if (this.rpcMonitor != null) {
+ this.rpcMonitor.proxyOpFailureStandby();
+ }
failover = true;
} else if (ioe instanceof RemoteException) {
+ if (this.rpcMonitor != null) {
+ this.rpcMonitor.proxyOpComplete(true);
+ }
// RemoteException returned by NN
throw (RemoteException) ioe;
} else {
// Other communication error, this is a failure
// Communication retries are handled by the retry policy
+ if (this.rpcMonitor != null) {
+ this.rpcMonitor.proxyOpFailureCommunicate();
+ this.rpcMonitor.proxyOpComplete(false);
+ }
throw ioe;
}
} finally {
@@ -330,6 +358,9 @@ public class RouterRpcClient {
}
}
}
+ if (this.rpcMonitor != null) {
+ this.rpcMonitor.proxyOpComplete(false);
+ }
// All namenodes were unavailable or in standby
String msg = "No namenode available to invoke " + method.getName() + " " +
@@ -746,6 +777,10 @@ public class RouterRpcClient {
}
}
+ if (rpcMonitor != null) {
+ rpcMonitor.proxyOp();
+ }
+
try {
List<Future<Object>> futures = executorService.invokeAll(callables);
Map<T, Object> results = new TreeMap<>();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a4ced36/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
new file mode 100644
index 0000000..d889a56
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+
+/**
+ * Metrics and monitoring interface for the router RPC server. Allows pluggable
+ * diagnostics and monitoring services to be attached.
+ */
+public interface RouterRpcMonitor {
+
+ /**
+ * Initialize the monitor.
+ * @param conf Configuration for the monitor.
+ * @param server RPC server.
+ * @param store State Store.
+ */
+ void init(
+ Configuration conf, RouterRpcServer server, StateStoreService store);
+
+ /**
+ * Close the monitor.
+ */
+ void close();
+
+ /**
+ * Start processing an operation on the Router.
+ */
+ void startOp();
+
+ /**
+ * Start proxying an operation to the Namenode.
+ * @return Id of the thread doing the proxying.
+ */
+ long proxyOp();
+
+ /**
+ * Mark a proxy operation as completed.
+ * @param success If the operation was successful.
+ */
+ void proxyOpComplete(boolean success);
+
+ /**
+ * Failed to proxy an operation to a Namenode because it was in standby.
+ */
+ void proxyOpFailureStandby();
+
+ /**
+ * Failed to proxy an operation to a Namenode because of an unexpected
+ * exception.
+ */
+ void proxyOpFailureCommunicate();
+
+ /**
+ * Failed to proxy an operation because it is not implemented.
+ */
+ void proxyOpNotImplemented();
+
+ /**
+ * If the Router cannot contact the State Store in an operation.
+ */
+ void routerFailureStateStore();
+
+ /**
+ * If the Router is in safe mode.
+ */
+ void routerFailureSafemode();
+
+ /**
+ * If a path is locked.
+ */
+ void routerFailureLocked();
+
+ /**
+ * If a path is in a read only mount point.
+ */
+ void routerFailureReadOnly();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a4ced36/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index f9b4a5d..6aee1ee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -120,6 +120,7 @@ import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -159,6 +160,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
/** RPC clients to connect to the Namenodes. */
private final RouterRpcClient rpcClient;
+ /** Monitor metrics for the RPC calls. */
+ private final RouterRpcMonitor rpcMonitor;
+
/** Interface to identify the active NN for a nameservice or blockpool ID. */
private final ActiveNamenodeResolver namenodeResolver;
@@ -256,14 +260,28 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
this.rpcAddress = new InetSocketAddress(
confRpcAddress.getHostName(), listenAddress.getPort());
+ // Create metrics monitor
+ Class<? extends RouterRpcMonitor> rpcMonitorClass = this.conf.getClass(
+ DFSConfigKeys.DFS_ROUTER_METRICS_CLASS,
+ DFSConfigKeys.DFS_ROUTER_METRICS_CLASS_DEFAULT,
+ RouterRpcMonitor.class);
+ this.rpcMonitor = ReflectionUtils.newInstance(rpcMonitorClass, conf);
+
// Create the client
this.rpcClient = new RouterRpcClient(this.conf, this.router.getRouterId(),
- this.namenodeResolver);
+ this.namenodeResolver, this.rpcMonitor);
}
@Override
protected void serviceInit(Configuration configuration) throws Exception {
this.conf = configuration;
+
+ if (this.rpcMonitor == null) {
+ LOG.error("Cannot instantiate Router RPC metrics class");
+ } else {
+ this.rpcMonitor.init(this.conf, this, this.router.getStateStore());
+ }
+
super.serviceInit(configuration);
}
@@ -281,6 +299,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
if (this.rpcServer != null) {
this.rpcServer.stop();
}
+ if (rpcMonitor != null) {
+ this.rpcMonitor.close();
+ }
super.serviceStop();
}
@@ -294,6 +315,15 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
}
/**
+ * Get the RPC monitor and metrics.
+ *
+ * @return RPC monitor and metrics.
+ */
+ public RouterRpcMonitor getRPCMonitor() {
+ return rpcMonitor;
+ }
+
+ /**
* Allow access to the client RPC server for testing.
*
* @return The RPC server.
@@ -330,6 +360,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
checkOperation(op);
if (!supported) {
+ if (rpcMonitor != null) {
+ rpcMonitor.proxyOpNotImplemented();
+ }
String methodName = getMethodName();
throw new UnsupportedOperationException(
"Operation \"" + methodName + "\" is not supported");
@@ -346,6 +379,10 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
*/
private void checkOperation(OperationCategory op) throws StandbyException {
// Log the function we are currently calling.
+ if (rpcMonitor != null) {
+ rpcMonitor.startOp();
+ }
+ // Log the function we are currently calling.
if (LOG.isDebugEnabled()) {
String methodName = getMethodName();
LOG.debug("Proxying operation: {}", methodName);
@@ -1912,16 +1949,22 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
*/
private List<RemoteLocation> getLocationsForPath(
String path, boolean failIfLocked) throws IOException {
- // Check the location for this path
- final PathLocation location =
- this.subclusterResolver.getDestinationForPath(path);
- if (location == null) {
- throw new IOException("Cannot find locations for " + path + " in " +
- this.subclusterResolver);
- }
+ try {
+ // Check the location for this path
+ final PathLocation location =
+ this.subclusterResolver.getDestinationForPath(path);
+ if (location == null) {
+ throw new IOException("Cannot find locations for " + path + " in " +
+ this.subclusterResolver);
+ }
- // Log the access to a path
- return location.getDestinations();
+ return location.getDestinations();
+ } catch (IOException ioe) {
+ if (this.rpcMonitor != null) {
+ this.rpcMonitor.routerFailureStateStore();
+ }
+ throw ioe;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a4ced36/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java
index 90a6699..fbece88 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java
@@ -26,6 +26,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
@@ -140,6 +141,13 @@ public abstract class CachedRecordStore<R extends BaseRecord>
writeLock.unlock();
}
+ // Update the metrics for the cache State Store size
+ StateStoreMetrics metrics = getDriver().getMetrics();
+ if (metrics != null) {
+ String recordName = getRecordClass().getSimpleName();
+ metrics.setCacheSize(recordName, this.records.size());
+ }
+
lastUpdate = Time.monotonicNow();
}
return true;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a4ced36/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
index 3aa3ffd..0289ba6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
@@ -25,15 +25,23 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMBean;
+import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.impl.MembershipStoreImpl;
import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.metrics2.MetricsException;
+import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
@@ -87,6 +95,8 @@ public class StateStoreService extends CompositeService {
/** Service to maintain data store connection. */
private StateStoreConnectionMonitorService monitorService;
+ /** StateStore metrics. */
+ private StateStoreMetrics metrics;
/** Supported record stores. */
private final Map<
@@ -152,6 +162,21 @@ public class StateStoreService extends CompositeService {
this.cacheUpdater = new StateStoreCacheUpdateService(this);
addService(this.cacheUpdater);
+ // Create metrics for the State Store
+ this.metrics = StateStoreMetrics.create(conf);
+
+ // Adding JMX interface
+ try {
+ StandardMBean bean = new StandardMBean(metrics, StateStoreMBean.class);
+ ObjectName registeredObject =
+ MBeans.register("Router", "StateStore", bean);
+ LOG.info("Registered StateStoreMBean: {}", registeredObject);
+ } catch (NotCompliantMBeanException e) {
+ throw new RuntimeException("Bad StateStoreMBean setup", e);
+ } catch (MetricsException e) {
+ LOG.info("Failed to register State Store bean {}", e.getMessage());
+ }
+
super.serviceInit(this.conf);
}
@@ -165,6 +190,11 @@ public class StateStoreService extends CompositeService {
protected void serviceStop() throws Exception {
closeDriver();
+ if (metrics != null) {
+ metrics.shutdown();
+ metrics = null;
+ }
+
super.serviceStop();
}
@@ -228,7 +258,8 @@ public class StateStoreService extends CompositeService {
synchronized (this.driver) {
if (!isDriverReady()) {
String driverName = this.driver.getClass().getSimpleName();
- if (this.driver.init(conf, getIdentifier(), getSupportedRecords())) {
+ if (this.driver.init(
+ conf, getIdentifier(), getSupportedRecords(), metrics)) {
LOG.info("Connection to the State Store driver {} is open and ready",
driverName);
this.refreshCaches();
@@ -398,4 +429,13 @@ public class StateStoreService extends CompositeService {
throw new IOException("Registered cache was not found for " + clazz);
}
+ /**
+ * Get the metrics for the State Store.
+ *
+ * @return State Store metrics.
+ */
+ public StateStoreMetrics getMetrics() {
+ return metrics;
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a4ced36/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java
index 90111bf..3ebab0b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java
@@ -21,6 +21,7 @@ import java.net.InetAddress;
import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
@@ -46,6 +47,9 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations {
/** Identifier for the driver. */
private String identifier;
+ /** State Store metrics. */
+ private StateStoreMetrics metrics;
+
/**
* Initialize the state store connection.
@@ -56,10 +60,12 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations {
* @return If initialized and ready, false if failed to initialize driver.
*/
public boolean init(final Configuration config, final String id,
- final Collection<Class<? extends BaseRecord>> records) {
+ final Collection<Class<? extends BaseRecord>> records,
+ final StateStoreMetrics stateStoreMetrics) {
this.conf = config;
this.identifier = id;
+ this.metrics = stateStoreMetrics;
if (this.identifier == null) {
LOG.warn("The identifier for the State Store connection is not set");
@@ -101,6 +107,15 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations {
}
/**
+ * Get the metrics for the State Store.
+ *
+ * @return State Store metrics.
+ */
+ public StateStoreMetrics getMetrics() {
+ return this.metrics;
+ }
+
+ /**
* Prepare the driver to access data storage.
*
* @return True if the driver was successfully initialized. If false is
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a4ced36/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java
index e2038fa..7bc93de 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
@@ -41,8 +42,9 @@ public abstract class StateStoreSerializableImpl extends StateStoreBaseImpl {
@Override
public boolean init(final Configuration config, final String id,
- final Collection<Class<? extends BaseRecord>> records) {
- boolean ret = super.init(config, id, records);
+ final Collection<Class<? extends BaseRecord>> records,
+ final StateStoreMetrics metrics) {
+ boolean ret = super.init(config, id, records, metrics);
this.serializer = StateStoreSerializer.getSerializer(config);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a4ced36/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
index ddcd537..97c821e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple;
import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.getRecordName;
import static org.apache.hadoop.util.curator.ZKCuratorManager.getNodePath;
+import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.IOException;
import java.util.ArrayList;
@@ -123,6 +124,7 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz, String sub)
throws IOException {
verifyDriverReady();
+ long start = monotonicNow();
List<T> ret = new ArrayList<>();
String znode = getZNodeForClass(clazz);
try {
@@ -157,11 +159,14 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
}
}
} catch (Exception e) {
+ getMetrics().addFailure(monotonicNow() - start);
String msg = "Cannot get children for \"" + znode + "\": " +
e.getMessage();
LOG.error(msg);
throw new IOException(msg);
}
+ long end = monotonicNow();
+ getMetrics().addRead(end - start);
return new QueryResult<T>(ret, getTime());
}
@@ -178,6 +183,7 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
Class<? extends BaseRecord> recordClass = record0.getClass();
String znode = getZNodeForClass(recordClass);
+ long start = monotonicNow();
boolean status = true;
for (T record : records) {
String primaryKey = getPrimaryKey(record);
@@ -187,6 +193,12 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
status = false;
}
}
+ long end = monotonicNow();
+ if (status) {
+ getMetrics().addWrite(end - start);
+ } else {
+ getMetrics().addFailure(end - start);
+ }
return status;
}
@@ -199,12 +211,14 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
}
// Read the current data
+ long start = monotonicNow();
List<T> records = null;
try {
QueryResult<T> result = get(clazz);
records = result.getRecords();
} catch (IOException ex) {
LOG.error("Cannot get existing records", ex);
+ getMetrics().addFailure(monotonicNow() - start);
return 0;
}
@@ -226,14 +240,20 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
}
} catch (Exception e) {
LOG.error("Cannot remove \"{}\"", existingRecord, e);
+ getMetrics().addFailure(monotonicNow() - start);
}
}
+ long end = monotonicNow();
+ if (removed > 0) {
+ getMetrics().addRemove(end - start);
+ }
return removed;
}
@Override
public <T extends BaseRecord> boolean removeAll(Class<T> clazz)
throws IOException {
+ long start = monotonicNow();
boolean status = true;
String znode = getZNodeForClass(clazz);
LOG.info("Deleting all children under {}", znode);
@@ -248,6 +268,12 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
LOG.error("Cannot remove {}: {}", znode, e.getMessage());
status = false;
}
+ long time = monotonicNow() - start;
+ if (status) {
+ getMetrics().addRemove(time);
+ } else {
+ getMetrics().addFailure(time);
+ }
return status;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a4ced36/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MembershipState.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MembershipState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MembershipState.java
index ab0ff0a..ac0b22e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MembershipState.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MembershipState.java
@@ -154,7 +154,7 @@ public abstract class MembershipState extends BaseRecord
public abstract void setStats(MembershipStats stats);
- public abstract MembershipStats getStats() throws IOException;
+ public abstract MembershipStats getStats();
public abstract void setLastContact(long contact);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a4ced36/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java
index 16f2b8b..0a3f19d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.federation.store.records;
import java.io.IOException;
+import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -46,6 +47,28 @@ public abstract class MountTable extends BaseRecord {
private static final Logger LOG = LoggerFactory.getLogger(MountTable.class);
+ /** Comparator for paths which considers the /. */
+ public static final Comparator<String> PATH_COMPARATOR =
+ new Comparator<String>() {
+ @Override
+ public int compare(String o1, String o2) {
+ String s1 = o1.replace('/', ' ');
+ String s2 = o2.replace('/', ' ');
+ return s1.compareTo(s2);
+ }
+ };
+
+ /** Comparator based on the mount table source. */
+ public static final Comparator<MountTable> SOURCE_COMPARATOR =
+ new Comparator<MountTable>() {
+ public int compare(MountTable m1, MountTable m2) {
+ String src1 = m1.getSourcePath();
+ String src2 = m2.getSourcePath();
+ return PATH_COMPARATOR.compare(src1, src2);
+ }
+ };
+
+
/**
* Default constructor for a mount table entry.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a4ced36/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MembershipStatePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MembershipStatePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MembershipStatePBImpl.java
index 805c2af..614957b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MembershipStatePBImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MembershipStatePBImpl.java
@@ -288,7 +288,7 @@ public class MembershipStatePBImpl extends MembershipState implements PBRecord {
}
@Override
- public MembershipStats getStats() throws IOException {
+ public MembershipStats getStats() {
NamenodeMembershipStatsRecordProto statsProto =
this.translator.getProtoOrBuilder().getStats();
MembershipStats stats =
@@ -298,7 +298,8 @@ public class MembershipStatePBImpl extends MembershipState implements PBRecord {
statsPB.setProto(statsProto);
return statsPB;
} else {
- throw new IOException("Cannot get stats for the membership");
+ throw new IllegalArgumentException(
+ "Cannot get stats for the membership");
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a4ced36/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index e80227d..920fb3c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4726,7 +4726,24 @@
</description>
</property>
- <property>
+ <property>
+ <name>dfs.federation.router.metrics.enable</name>
+ <value>true</value>
+ <description>
+ If the metrics in the router are enabled.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.federation.router.metrics.class</name>
+ <value>org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCPerformanceMonitor</value>
+ <description>
+ Class to monitor the RPC system in the router. It must implement the
+ RouterRpcMonitor interface.
+ </description>
+ </property>
+
+ <property>
<name>dfs.federation.router.admin.enable</name>
<value>true</value>
<description>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a4ced36/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java
index bbb548ca..0c01763 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java
@@ -26,12 +26,18 @@ import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.lang.management.ManagementFactory;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Date;
import java.util.List;
import java.util.Random;
+import javax.management.JMX;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -135,6 +141,13 @@ public final class FederationTestUtils {
return Math.abs(d1.getTime() - d2.getTime()) < precision;
}
+ public static <T> T getBean(String name, Class<T> obj)
+ throws MalformedObjectNameException {
+ MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
+ ObjectName poolName = new ObjectName(name);
+ return JMX.newMXBeanProxy(mBeanServer, poolName, obj);
+ }
+
public static boolean addDirectory(FileSystem context, String path)
throws IOException {
context.mkdirs(new Path(path), new FsPermission("777"));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a4ced36/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
index cac5e6b..58ca1d1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
@@ -32,6 +32,7 @@ public class RouterConfigBuilder {
private boolean enableHeartbeat = false;
private boolean enableLocalHeartbeat = false;
private boolean enableStateStore = false;
+ private boolean enableMetrics = false;
public RouterConfigBuilder(Configuration configuration) {
this.conf = configuration;
@@ -47,6 +48,7 @@ public class RouterConfigBuilder {
this.enableHeartbeat = true;
this.enableLocalHeartbeat = true;
this.enableStateStore = true;
+ this.enableMetrics = true;
return this;
}
@@ -75,6 +77,11 @@ public class RouterConfigBuilder {
return this;
}
+ public RouterConfigBuilder metrics(boolean enable) {
+ this.enableMetrics = enable;
+ return this;
+ }
+
public RouterConfigBuilder rpc() {
return this.rpc(true);
}
@@ -91,6 +98,10 @@ public class RouterConfigBuilder {
return this.stateStore(true);
}
+ public RouterConfigBuilder metrics() {
+ return this.metrics(true);
+ }
+
public Configuration build() {
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_STORE_ENABLE,
this.enableStateStore);
@@ -101,6 +112,8 @@ public class RouterConfigBuilder {
this.enableHeartbeat);
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE,
this.enableLocalHeartbeat);
+ conf.setBoolean(DFSConfigKeys.DFS_ROUTER_METRICS_ENABLE,
+ this.enableMetrics);
return conf;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a4ced36/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestFederationMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestFederationMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestFederationMetrics.java
new file mode 100644
index 0000000..d6a194f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestFederationMetrics.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.metrics;
+
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getBean;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.management.MalformedObjectNameException;
+
+import org.apache.commons.collections.ListUtils;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.junit.Test;
+
+/**
+ * Test the JMX interface for the {@link Router}.
+ */
+public class TestFederationMetrics extends TestMetricsBase {
+
+ public static final String FEDERATION_BEAN =
+ "Hadoop:service=Router,name=FederationState";
+ public static final String STATE_STORE_BEAN =
+ "Hadoop:service=Router,name=StateStore";
+ public static final String RPC_BEAN =
+ "Hadoop:service=Router,name=FederationRPC";
+
+ @Test
+ public void testClusterStatsJMX()
+ throws MalformedObjectNameException, IOException {
+
+ FederationMBean bean = getBean(FEDERATION_BEAN, FederationMBean.class);
+ validateClusterStatsBean(bean);
+ }
+
+ @Test
+ public void testClusterStatsDataSource() throws IOException {
+ FederationMetrics metrics = getRouter().getMetrics();
+ validateClusterStatsBean(metrics);
+ }
+
+ @Test
+ public void testMountTableStatsDataSource()
+ throws IOException, JSONException {
+
+ FederationMetrics metrics = getRouter().getMetrics();
+ String jsonString = metrics.getMountTable();
+ JSONArray jsonArray = new JSONArray(jsonString);
+ assertEquals(jsonArray.length(), getMockMountTable().size());
+
+ int match = 0;
+ for (int i = 0; i < jsonArray.length(); i++) {
+ JSONObject json = jsonArray.getJSONObject(i);
+ String src = json.getString("sourcePath");
+
+ for (MountTable entry : getMockMountTable()) {
+ if (entry.getSourcePath().equals(src)) {
+ assertEquals(entry.getDefaultLocation().getNameserviceId(),
+ json.getString("nameserviceId"));
+ assertEquals(entry.getDefaultLocation().getDest(),
+ json.getString("path"));
+ assertNotNullAndNotEmpty(json.getString("dateCreated"));
+ assertNotNullAndNotEmpty(json.getString("dateModified"));
+ match++;
+ }
+ }
+ }
+ assertEquals(match, getMockMountTable().size());
+ }
+
+ private MembershipState findMockNamenode(String nsId, String nnId) {
+
+ @SuppressWarnings("unchecked")
+ List<MembershipState> namenodes =
+ ListUtils.union(getActiveMemberships(), getStandbyMemberships());
+ for (MembershipState nn : namenodes) {
+ if (nn.getNamenodeId().equals(nnId)
+ && nn.getNameserviceId().equals(nsId)) {
+ return nn;
+ }
+ }
+ return null;
+ }
+
+ @Test
+ public void testNamenodeStatsDataSource() throws IOException, JSONException {
+
+ FederationMetrics metrics = getRouter().getMetrics();
+ String jsonString = metrics.getNamenodes();
+ JSONObject jsonObject = new JSONObject(jsonString);
+ Iterator<?> keys = jsonObject.keys();
+ int nnsFound = 0;
+ while (keys.hasNext()) {
+ // Validate each entry against our mocks
+ JSONObject json = jsonObject.getJSONObject((String) keys.next());
+ String nameserviceId = json.getString("nameserviceId");
+ String namenodeId = json.getString("namenodeId");
+
+ MembershipState mockEntry =
+ this.findMockNamenode(nameserviceId, namenodeId);
+ assertNotNull(mockEntry);
+
+ assertEquals(json.getString("state"), mockEntry.getState().toString());
+ MembershipStats stats = mockEntry.getStats();
+ assertEquals(json.getLong("numOfActiveDatanodes"),
+ stats.getNumOfActiveDatanodes());
+ assertEquals(json.getLong("numOfDeadDatanodes"),
+ stats.getNumOfDeadDatanodes());
+ assertEquals(json.getLong("numOfDecommissioningDatanodes"),
+ stats.getNumOfDecommissioningDatanodes());
+ assertEquals(json.getLong("numOfDecomActiveDatanodes"),
+ stats.getNumOfDecomActiveDatanodes());
+ assertEquals(json.getLong("numOfDecomDeadDatanodes"),
+ stats.getNumOfDecomDeadDatanodes());
+ assertEquals(json.getLong("numOfBlocks"), stats.getNumOfBlocks());
+ assertEquals(json.getString("rpcAddress"), mockEntry.getRpcAddress());
+ assertEquals(json.getString("webAddress"), mockEntry.getWebAddress());
+ nnsFound++;
+ }
+ // Validate all memberships are present
+ assertEquals(getActiveMemberships().size() + getStandbyMemberships().size(),
+ nnsFound);
+ }
+
+ @Test
+ public void testNameserviceStatsDataSource()
+ throws IOException, JSONException {
+
+ FederationMetrics metrics = getRouter().getMetrics();
+ String jsonString = metrics.getNameservices();
+ JSONObject jsonObject = new JSONObject(jsonString);
+ Iterator<?> keys = jsonObject.keys();
+ int nameservicesFound = 0;
+ while (keys.hasNext()) {
+ JSONObject json = jsonObject.getJSONObject((String) keys.next());
+ String nameserviceId = json.getString("nameserviceId");
+ String namenodeId = json.getString("namenodeId");
+
+ MembershipState mockEntry =
+ this.findMockNamenode(nameserviceId, namenodeId);
+ assertNotNull(mockEntry);
+
+ // NS should report the active NN
+ assertEquals(mockEntry.getState().toString(), json.getString("state"));
+ assertEquals("ACTIVE", json.getString("state"));
+
+ // Stats in the NS should reflect the stats for the most active NN
+ MembershipStats stats = mockEntry.getStats();
+ assertEquals(stats.getNumOfFiles(), json.getLong("numOfFiles"));
+ assertEquals(stats.getTotalSpace(), json.getLong("totalSpace"));
+ assertEquals(stats.getAvailableSpace(),
+ json.getLong("availableSpace"));
+ assertEquals(stats.getNumOfBlocksMissing(),
+ json.getLong("numOfBlocksMissing"));
+ assertEquals(stats.getNumOfActiveDatanodes(),
+ json.getLong("numOfActiveDatanodes"));
+ assertEquals(stats.getNumOfDeadDatanodes(),
+ json.getLong("numOfDeadDatanodes"));
+ assertEquals(stats.getNumOfDecommissioningDatanodes(),
+ json.getLong("numOfDecommissioningDatanodes"));
+ assertEquals(stats.getNumOfDecomActiveDatanodes(),
+ json.getLong("numOfDecomActiveDatanodes"));
+ assertEquals(stats.getNumOfDecomDeadDatanodes(),
+ json.getLong("numOfDecomDeadDatanodes"));
+ nameservicesFound++;
+ }
+ assertEquals(getNameservices().size(), nameservicesFound);
+ }
+
+ private void assertNotNullAndNotEmpty(String field) {
+ assertNotNull(field);
+ assertTrue(field.length() > 0);
+ }
+
+ private void validateClusterStatsBean(FederationMBean bean)
+ throws IOException {
+
+ // Determine aggregates
+ long numBlocks = 0;
+ long numLive = 0;
+ long numDead = 0;
+ long numDecom = 0;
+ long numDecomLive = 0;
+ long numDecomDead = 0;
+ long numFiles = 0;
+ for (MembershipState mock : getActiveMemberships()) {
+ MembershipStats stats = mock.getStats();
+ numBlocks += stats.getNumOfBlocks();
+ numLive += stats.getNumOfActiveDatanodes();
+ numDead += stats.getNumOfDeadDatanodes();
+ numDecom += stats.getNumOfDecommissioningDatanodes();
+ numDecomLive += stats.getNumOfDecomActiveDatanodes();
+ numDecomDead += stats.getNumOfDecomDeadDatanodes();
+ }
+
+ assertEquals(numBlocks, bean.getNumBlocks());
+ assertEquals(numLive, bean.getNumLiveNodes());
+ assertEquals(numDead, bean.getNumDeadNodes());
+ assertEquals(numDecom, bean.getNumDecommissioningNodes());
+ assertEquals(numDecomLive, bean.getNumDecomLiveNodes());
+ assertEquals(numDecomDead, bean.getNumDecomDeadNodes());
+ assertEquals(numFiles, bean.getNumFiles());
+ assertEquals(getActiveMemberships().size() + getStandbyMemberships().size(),
+ bean.getNumNamenodes());
+ assertEquals(getNameservices().size(), bean.getNumNameservices());
+ assertTrue(bean.getVersion().length() > 0);
+ assertTrue(bean.getCompiledDate().length() > 0);
+ assertTrue(bean.getCompileInfo().length() > 0);
+ assertTrue(bean.getRouterStarted().length() > 0);
+ assertTrue(bean.getHostAndPort().length() > 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a4ced36/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestMetricsBase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestMetricsBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestMetricsBase.java
new file mode 100644
index 0000000..bbcfbe8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestMetricsBase.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.metrics;
+
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMESERVICES;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.clearAllRecords;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.createMockMountTable;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.createMockRegistrationForNamenode;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.waitStateStore;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.junit.After;
+import org.junit.Before;
+
+/**
+ * Test the basic metrics functionality.
+ */
+public class TestMetricsBase {
+
+ private StateStoreService stateStore;
+ private MembershipStore membershipStore;
+ private Router router;
+ private Configuration routerConfig;
+
+ private List<MembershipState> activeMemberships;
+ private List<MembershipState> standbyMemberships;
+ private List<MountTable> mockMountTable;
+ private List<String> nameservices;
+
+ @Before
+ public void setupBase() throws Exception {
+
+ if (router == null) {
+ routerConfig = new RouterConfigBuilder()
+ .stateStore()
+ .metrics()
+ .build();
+ router = new Router();
+ router.init(routerConfig);
+ router.setRouterId("routerId");
+ router.start();
+ stateStore = router.getStateStore();
+
+ membershipStore =
+ stateStore.getRegisteredRecordStore(MembershipStore.class);
+
+ // Read all data and load all caches
+ waitStateStore(stateStore, 10000);
+ createFixtures();
+ stateStore.refreshCaches(true);
+ Thread.sleep(1000);
+ }
+ }
+
+ @After
+ public void tearDownBase() throws IOException {
+ if (router != null) {
+ router.stop();
+ router.close();
+ router = null;
+ }
+ }
+
+ private void createFixtures() throws IOException {
+ // Clear all records
+ clearAllRecords(stateStore);
+
+ nameservices = new ArrayList<>();
+ nameservices.add(NAMESERVICES[0]);
+ nameservices.add(NAMESERVICES[1]);
+
+ // 2 NNs per NS
+ activeMemberships = new ArrayList<>();
+ standbyMemberships = new ArrayList<>();
+
+ for (String nameservice : nameservices) {
+ MembershipState namenode1 = createMockRegistrationForNamenode(
+ nameservice, NAMENODES[0], FederationNamenodeServiceState.ACTIVE);
+ NamenodeHeartbeatRequest request1 =
+ NamenodeHeartbeatRequest.newInstance(namenode1);
+ assertTrue(membershipStore.namenodeHeartbeat(request1).getResult());
+ activeMemberships.add(namenode1);
+
+ MembershipState namenode2 = createMockRegistrationForNamenode(
+ nameservice, NAMENODES[1], FederationNamenodeServiceState.STANDBY);
+ NamenodeHeartbeatRequest request2 =
+ NamenodeHeartbeatRequest.newInstance(namenode2);
+ assertTrue(membershipStore.namenodeHeartbeat(request2).getResult());
+ standbyMemberships.add(namenode2);
+ }
+
+ // Add 2 mount table memberships
+ mockMountTable = createMockMountTable(nameservices);
+ synchronizeRecords(stateStore, mockMountTable, MountTable.class);
+ }
+
+ protected Router getRouter() {
+ return router;
+ }
+
+ protected List<MountTable> getMockMountTable() {
+ return mockMountTable;
+ }
+
+ protected List<MembershipState> getActiveMemberships() {
+ return activeMemberships;
+ }
+
+ protected List<MembershipState> getStandbyMemberships() {
+ return standbyMemberships;
+ }
+
+ protected List<String> getNameservices() {
+ return nameservices;
+ }
+
+ protected StateStoreService getStateStore() {
+ return stateStore;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a4ced36/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
index 2074d3d..e22be84 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.federation.MockResolver;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.service.Service.STATE;
import org.junit.After;
import org.junit.AfterClass;
@@ -46,15 +48,19 @@ public class TestRouter {
public static void create() throws IOException {
// Basic configuration without the state store
conf = new Configuration();
+ // 1 sec cache refresh
+ conf.setInt(DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, 1);
// Mock resolver classes
- conf.set(DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
- MockResolver.class.getCanonicalName());
- conf.set(DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
- MockResolver.class.getCanonicalName());
+ conf.setClass(DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
+ MockResolver.class, ActiveNamenodeResolver.class);
+ conf.setClass(DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
+ MockResolver.class, FileSubclusterResolver.class);
// Bind to any available port
conf.set(DFSConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0");
conf.set(DFSConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0");
+ conf.set(DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, "127.0.0.1:0");
+ conf.set(DFSConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY, "0.0.0.0");
// Simulate a co-located NN
conf.set(DFSConfigKeys.DFS_NAMESERVICES, "ns0");
@@ -95,9 +101,18 @@ public class TestRouter {
@Test
public void testRouterService() throws InterruptedException, IOException {
+ // Admin only
+ testRouterStartup(new RouterConfigBuilder(conf).admin().build());
+
// Rpc only
testRouterStartup(new RouterConfigBuilder(conf).rpc().build());
+ // Metrics only
+ testRouterStartup(new RouterConfigBuilder(conf).metrics().build());
+
+ // Statestore only
+ testRouterStartup(new RouterConfigBuilder(conf).stateStore().build());
+
// Heartbeat only
testRouterStartup(new RouterConfigBuilder(conf).heartbeat().build());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a4ced36/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
index 65e763b..01fe149 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
@@ -35,6 +35,7 @@ import java.util.Map.Entry;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
@@ -377,6 +378,74 @@ public class TestStateStoreDriverBase {
testFetchErrors(driver, MountTable.class);
}
+ public void testMetrics(StateStoreDriver driver)
+ throws IOException, IllegalArgumentException, IllegalAccessException {
+
+ MountTable insertRecord =
+ this.generateFakeRecord(MountTable.class);
+
+ // Put single
+ StateStoreMetrics metrics = stateStore.getMetrics();
+ assertEquals(0, metrics.getWriteOps());
+ driver.put(insertRecord, true, false);
+ assertEquals(1, metrics.getWriteOps());
+
+ // Put multiple
+ metrics.reset();
+ assertEquals(0, metrics.getWriteOps());
+ driver.put(insertRecord, true, false);
+ assertEquals(1, metrics.getWriteOps());
+
+ // Get Single
+ metrics.reset();
+ assertEquals(0, metrics.getReadOps());
+
+ final String querySourcePath = insertRecord.getSourcePath();
+ MountTable partial = MountTable.newInstance();
+ partial.setSourcePath(querySourcePath);
+ final Query<MountTable> query = new Query<>(partial);
+ driver.get(MountTable.class, query);
+ assertEquals(1, metrics.getReadOps());
+
+ // GetAll
+ metrics.reset();
+ assertEquals(0, metrics.getReadOps());
+ driver.get(MountTable.class);
+ assertEquals(1, metrics.getReadOps());
+
+ // GetMultiple
+ metrics.reset();
+ assertEquals(0, metrics.getReadOps());
+ driver.getMultiple(MountTable.class, query);
+ assertEquals(1, metrics.getReadOps());
+
+ // Insert fails
+ metrics.reset();
+ assertEquals(0, metrics.getFailureOps());
+ driver.put(insertRecord, false, true);
+ assertEquals(1, metrics.getFailureOps());
+
+ // Remove single
+ metrics.reset();
+ assertEquals(0, metrics.getRemoveOps());
+ driver.remove(insertRecord);
+ assertEquals(1, metrics.getRemoveOps());
+
+ // Remove multiple
+ metrics.reset();
+ driver.put(insertRecord, true, false);
+ assertEquals(0, metrics.getRemoveOps());
+ driver.remove(MountTable.class, query);
+ assertEquals(1, metrics.getRemoveOps());
+
+ // Remove all
+ metrics.reset();
+ driver.put(insertRecord, true, false);
+ assertEquals(0, metrics.getRemoveOps());
+ driver.removeAll(MountTable.class);
+ assertEquals(1, metrics.getRemoveOps());
+ }
+
/**
* Sets the value of a field on the object.
*
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org