You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2014/12/15 19:36:34 UTC
[43/50] [abbrv] hadoop git commit: HDFS-7426. Change nntop JMX format
to be a JSON blob.
HDFS-7426. Change nntop JMX format to be a JSON blob.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fa7b9248
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fa7b9248
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fa7b9248
Branch: refs/heads/YARN-2139
Commit: fa7b9248e415c04bb555772f44fadaf8d9f34974
Parents: e5a6925
Author: Andrew Wang <wa...@apache.org>
Authored: Fri Dec 12 17:04:33 2014 -0800
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Dec 12 17:04:33 2014 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 +
.../hdfs/server/namenode/FSNamesystem.java | 38 +++-
.../namenode/metrics/FSNamesystemMBean.java | 7 +
.../server/namenode/top/TopAuditLogger.java | 20 +-
.../hdfs/server/namenode/top/TopConf.java | 29 +--
.../server/namenode/top/metrics/TopMetrics.java | 216 ++++--------------
.../top/window/RollingWindowManager.java | 223 +++++++++++++------
.../server/namenode/TestFSNamesystemMBean.java | 2 +
.../server/namenode/TestNameNodeMXBean.java | 116 ++++++++++
.../namenode/metrics/TestNameNodeMetrics.java | 59 -----
.../top/window/TestRollingWindowManager.java | 63 +++---
11 files changed, 417 insertions(+), 358 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7b9248/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index eeedb0d..9dfecc1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -455,6 +455,8 @@ Release 2.7.0 - UNRELEASED
HDFS-7509. Avoid resolving path multiple times. (jing9)
+ HDFS-7426. Change nntop JMX format to be a JSON blob. (wang)
+
OPTIMIZATIONS
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7b9248/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index b4b897a..1ac19fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -120,6 +120,7 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@@ -241,6 +242,7 @@ import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
+import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -281,6 +283,7 @@ import org.apache.hadoop.util.VersionInfo;
import org.apache.log4j.Appender;
import org.apache.log4j.AsyncAppender;
import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
import org.mortbay.util.ajax.JSON;
import com.google.common.annotations.VisibleForTesting;
@@ -539,6 +542,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
private final FSImage fsImage;
+ private final TopConf topConf;
+ private TopMetrics topMetrics;
+
/**
* Notify that loading of this FSDirectory is complete, and
* it is imageLoaded for use
@@ -842,6 +848,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
this.snapshotManager = new SnapshotManager(dir);
this.cacheManager = new CacheManager(this, conf, blockManager);
this.safeMode = new SafeModeInfo(conf);
+ this.topConf = new TopConf(conf);
this.auditLoggers = initAuditLoggers(conf);
this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
auditLoggers.get(0) instanceof DefaultAuditLogger;
@@ -952,13 +959,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
// Add audit logger to calculate top users
- if (conf.getBoolean(DFSConfigKeys.NNTOP_ENABLED_KEY,
- DFSConfigKeys.NNTOP_ENABLED_DEFAULT)) {
- String sessionId = conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY);
- TopConf nntopConf = new TopConf(conf);
- TopMetrics.initSingleton(conf, NamenodeRole.NAMENODE.name(), sessionId,
- nntopConf.nntopReportingPeriodsMs);
- auditLoggers.add(new TopAuditLogger());
+ if (topConf.isEnabled) {
+ topMetrics = new TopMetrics(conf, topConf.nntopReportingPeriodsMs);
+ auditLoggers.add(new TopAuditLogger(topMetrics));
}
return Collections.unmodifiableList(auditLoggers);
@@ -6013,6 +6016,27 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return getBlockManager().getDatanodeManager().getNumStaleStorages();
}
+ @Override // FSNamesystemMBean
+ public String getTopUserOpCounts() {
+ if (!topConf.isEnabled) {
+ return null;
+ }
+
+ Date now = new Date();
+ final List<RollingWindowManager.TopWindow> topWindows =
+ topMetrics.getTopWindows();
+ Map<String, Object> topMap = new TreeMap<String, Object>();
+ topMap.put("windows", topWindows);
+ topMap.put("timestamp", DFSUtil.dateToIso8601String(now));
+ ObjectMapper mapper = new ObjectMapper();
+ try {
+ return mapper.writeValueAsString(topMap);
+ } catch (IOException e) {
+ LOG.warn("Failed to fetch TopUser metrics", e);
+ }
+ return null;
+ }
+
/**
* Increments, logs and then returns the stamp
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7b9248/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java
index 708591b..86f4bd6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java
@@ -164,4 +164,11 @@ public interface FSNamesystemMBean {
*/
public int getNumStaleStorages();
+ /**
+ * Returns a nested JSON object listing the top users for different RPC
+ * operations over tracked time windows.
+ *
+ * @return JSON string
+ */
+ public String getTopUserOpCounts();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7b9248/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopAuditLogger.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopAuditLogger.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopAuditLogger.java
index 4f26b17..49c9153 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopAuditLogger.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopAuditLogger.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode.top;
import java.net.InetAddress;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -36,6 +37,14 @@ import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
public class TopAuditLogger implements AuditLogger {
public static final Logger LOG = LoggerFactory.getLogger(TopAuditLogger.class);
+ private final TopMetrics topMetrics;
+
+ public TopAuditLogger(TopMetrics topMetrics) {
+ Preconditions.checkNotNull(topMetrics, "Cannot init with a null " +
+ "TopMetrics");
+ this.topMetrics = topMetrics;
+ }
+
@Override
public void initialize(Configuration conf) {
}
@@ -43,12 +52,11 @@ public class TopAuditLogger implements AuditLogger {
@Override
public void logAuditEvent(boolean succeeded, String userName,
InetAddress addr, String cmd, String src, String dst, FileStatus status) {
-
- TopMetrics instance = TopMetrics.getInstance();
- if (instance != null) {
- instance.report(succeeded, userName, addr, cmd, src, dst, status);
- } else {
- LOG.error("TopMetrics is not initialized yet!");
+ try {
+ topMetrics.report(succeeded, userName, addr, cmd, src, dst, status);
+ } catch (Throwable t) {
+ LOG.error("An error occurred while reflecting the event in top service, "
+ + "event: (cmd={},userName={})", cmd, userName);
}
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7b9248/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopConf.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopConf.java
index 0f4ebac..ba82032 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopConf.java
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.namenode.top;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.primitives.Ints;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -27,34 +30,34 @@ import com.google.common.base.Preconditions;
*/
@InterfaceAudience.Private
public final class TopConf {
-
- public static final String TOP_METRICS_REGISTRATION_NAME = "topusers";
- public static final String TOP_METRICS_RECORD_NAME = "topparam";
/**
- * A meta command representing the total number of commands
+ * Whether TopMetrics are enabled
*/
- public static final String CMD_TOTAL = "total";
+ public final boolean isEnabled;
+
/**
- * A meta user representing all users
+ * A meta command representing the total number of calls to all commands
*/
- public static String ALL_USERS = "ALL";
+ public static final String ALL_CMDS = "*";
/**
* nntop reporting periods in milliseconds
*/
- public final long[] nntopReportingPeriodsMs;
+ public final int[] nntopReportingPeriodsMs;
public TopConf(Configuration conf) {
+ isEnabled = conf.getBoolean(DFSConfigKeys.NNTOP_ENABLED_KEY,
+ DFSConfigKeys.NNTOP_ENABLED_DEFAULT);
String[] periodsStr = conf.getTrimmedStrings(
DFSConfigKeys.NNTOP_WINDOWS_MINUTES_KEY,
DFSConfigKeys.NNTOP_WINDOWS_MINUTES_DEFAULT);
- nntopReportingPeriodsMs = new long[periodsStr.length];
+ nntopReportingPeriodsMs = new int[periodsStr.length];
for (int i = 0; i < periodsStr.length; i++) {
- nntopReportingPeriodsMs[i] = Integer.parseInt(periodsStr[i]) *
- 60L * 1000L; //min to ms
+ nntopReportingPeriodsMs[i] = Ints.checkedCast(
+ TimeUnit.MINUTES.toMillis(Integer.parseInt(periodsStr[i])));
}
- for (long aPeriodMs: nntopReportingPeriodsMs) {
- Preconditions.checkArgument(aPeriodMs >= 60L * 1000L,
+ for (int aPeriodMs: nntopReportingPeriodsMs) {
+ Preconditions.checkArgument(aPeriodMs >= TimeUnit.MINUTES.toMillis(1),
"minimum reporting period is 1 min!");
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7b9248/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java
index e8a4e23..ab55392 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java
@@ -17,67 +17,50 @@
*/
package org.apache.hadoop.hdfs.server.namenode.top.metrics;
-import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
-import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
-import static org.apache.hadoop.metrics2.lib.Interns.info;
-
import java.net.InetAddress;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
-import org.apache.hadoop.metrics2.MetricsCollector;
-import org.apache.hadoop.metrics2.MetricsRecordBuilder;
-import org.apache.hadoop.metrics2.MetricsSource;
-import org.apache.hadoop.metrics2.MetricsSystem;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
-import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.MetricValueMap;
+import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.TopWindow;
-/***
- * The interface to the top metrics
+/**
+ * The interface to the top metrics.
+ * <p/>
+ * Metrics are collected by a custom audit logger, {@link org.apache.hadoop
+ * .hdfs.server.namenode.top.TopAuditLogger}, which calls TopMetrics to
+ * increment per-operation, per-user counts on every audit log call. These
+ * counts are used to show the top users by NameNode operation as well as
+ * across all operations.
+ * <p/>
+ * TopMetrics maintains these counts for a configurable number of time
+ * intervals, e.g. 1min, 5min, 25min. Each interval is tracked by a
+ * RollingWindowManager.
* <p/>
- * The producers use the {@link #report} method to report events and the
- * consumers use {@link #getMetrics(MetricsCollector, boolean)} to retrieve the
- * current top metrics. The default consumer is JMX but it could be any other
- * user interface.
+ * These metrics are published as a JSON string via {@link org.apache.hadoop
+ * .hdfs.server .namenode.metrics.FSNamesystemMBean#getTopWindows}. This is
+ * done by calling {@link org.apache.hadoop.hdfs.server.namenode.top.window
+ * .RollingWindowManager#snapshot} on each RollingWindowManager.
* <p/>
* Thread-safe: relies on thread-safety of RollingWindowManager
*/
@InterfaceAudience.Private
-public class TopMetrics implements MetricsSource {
+public class TopMetrics {
public static final Logger LOG = LoggerFactory.getLogger(TopMetrics.class);
- enum Singleton {
- INSTANCE;
-
- volatile TopMetrics impl = null;
-
- synchronized TopMetrics init(Configuration conf, String processName,
- String sessionId, long[] reportingPeriods) {
- if (impl == null) {
- impl =
- create(conf, processName, sessionId, reportingPeriods,
- DefaultMetricsSystem.instance());
- }
- logConf(conf);
- return impl;
- }
- }
-
private static void logConf(Configuration conf) {
LOG.info("NNTop conf: " + DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY +
" = " + conf.get(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY));
@@ -88,127 +71,34 @@ public class TopMetrics implements MetricsSource {
}
/**
- * Return only the shortest periods for default
- * TODO: make it configurable
- */
- final boolean smallestOnlyDefault = true;
-
- /**
- * The smallest of reporting periods
- */
- long smallestPeriod = Long.MAX_VALUE;
-
- /**
- * processName and sessionId might later be leveraged later when we aggregate
- * report from multiple federated name nodes
- */
- final String processName, sessionId;
-
- /**
* A map from reporting periods to WindowManager. Thread-safety is provided by
* the fact that the mapping is not changed after construction.
*/
- final Map<Long, RollingWindowManager> rollingWindowManagers =
- new HashMap<Long, RollingWindowManager>();
+ final Map<Integer, RollingWindowManager> rollingWindowManagers =
+ new HashMap<Integer, RollingWindowManager>();
- TopMetrics(Configuration conf, String processName, String sessionId,
- long[] reportingPeriods) {
- this.processName = processName;
- this.sessionId = sessionId;
+ public TopMetrics(Configuration conf, int[] reportingPeriods) {
+ logConf(conf);
for (int i = 0; i < reportingPeriods.length; i++) {
- smallestPeriod = Math.min(smallestPeriod, reportingPeriods[i]);
rollingWindowManagers.put(reportingPeriods[i], new RollingWindowManager(
conf, reportingPeriods[i]));
}
}
- public static TopMetrics create(Configuration conf, String processName,
- String sessionId, long[] reportingPeriods, MetricsSystem ms) {
- return ms.register(TopConf.TOP_METRICS_REGISTRATION_NAME,
- "top metrics of the namenode in a last period of time", new TopMetrics(
- conf, processName, sessionId, reportingPeriods));
- }
-
- public static TopMetrics initSingleton(Configuration conf,
- String processName, String sessionId, long[] reportingPeriods) {
- return Singleton.INSTANCE.init(conf, processName, sessionId,
- reportingPeriods);
- }
-
- public static TopMetrics getInstance() {
- TopMetrics topMetrics = Singleton.INSTANCE.impl;
- Preconditions.checkArgument(topMetrics != null,
- "The TopMetric singleton instance is not initialized."
- + " Have you called initSingleton first?");
- return topMetrics;
- }
-
/**
- * In testing, the previous initialization should be reset if the entire
- * metric system is reinitialized
+ * Get a list of the current TopWindow statistics, one TopWindow per tracked
+ * time interval.
*/
- @VisibleForTesting
- public static void reset() {
- Singleton.INSTANCE.impl = null;
- }
-
- @Override
- public void getMetrics(MetricsCollector collector, boolean all) {
- long realTime = Time.monotonicNow();
- getMetrics(smallestOnlyDefault, realTime, collector, all);
- }
-
- public void getMetrics(boolean smallestOnly, long currTime,
- MetricsCollector collector, boolean all) {
- for (Entry<Long, RollingWindowManager> entry : rollingWindowManagers
+ public List<TopWindow> getTopWindows() {
+ long monoTime = Time.monotonicNow();
+ List<TopWindow> windows = Lists.newArrayListWithCapacity
+ (rollingWindowManagers.size());
+ for (Entry<Integer, RollingWindowManager> entry : rollingWindowManagers
.entrySet()) {
- if (!smallestOnly || smallestPeriod == entry.getKey()) {
- getMetrics(currTime, collector, entry.getKey(), entry.getValue(), all);
- }
- }
- }
-
- /**
- * Get metrics for a particular recording period and its corresponding
- * {@link RollingWindowManager}
- * <p/>
- *
- * @param collector the metric collector
- * @param period the reporting period
- * @param rollingWindowManager the window manager corresponding to the
- * reporting period
- * @param all currently ignored
- */
- void getMetrics(long currTime, MetricsCollector collector, Long period,
- RollingWindowManager rollingWindowManager, boolean all) {
- MetricsRecordBuilder rb =
- collector.addRecord(createTopMetricsRecordName(period))
- .setContext("namenode").tag(ProcessName, processName)
- .tag(SessionId, sessionId);
-
- MetricValueMap snapshotMetrics = rollingWindowManager.snapshot(currTime);
- LOG.debug("calling snapshot, result size is: " + snapshotMetrics.size());
- for (Map.Entry<String, Number> entry : snapshotMetrics.entrySet()) {
- String key = entry.getKey();
- Number value = entry.getValue();
- LOG.debug("checking an entry: key: {} value: {}", key, value);
- long min = period / 1000L / 60L; //ms -> min
- String desc = "top user of name node in the past " + min + " minutes";
-
- if (value instanceof Integer) {
- rb.addGauge(info(key, desc), (Integer) value);
- } else if (value instanceof Long) {
- rb.addGauge(info(key, desc), (Long) value);
- } else if (value instanceof Float) {
- rb.addGauge(info(key, desc), (Float) value);
- } else if (value instanceof Double) {
- rb.addGauge(info(key, desc), (Double) value);
- } else {
- LOG.warn("Unsupported metric type: " + value.getClass());
- }
+ TopWindow window = entry.getValue().snapshot(monoTime);
+ windows.add(window);
}
- LOG.debug("END iterating over metrics, result size is: {}",
- snapshotMetrics.size());
+ return windows;
}
/**
@@ -216,18 +106,10 @@ public class TopMetrics implements MetricsSource {
* log file. This is to be consistent when {@link TopMetrics} is charged with
* data read back from log files instead of being invoked directly by the
* FsNamesystem
- *
- * @param succeeded
- * @param userName
- * @param addr
- * @param cmd
- * @param src
- * @param dst
- * @param status
*/
public void report(boolean succeeded, String userName, InetAddress addr,
String cmd, String src, String dst, FileStatus status) {
- //currently we nntop makes use of only the username and the command
+ // currently nntop only makes use of the username and the command
report(userName, cmd);
}
@@ -239,27 +121,11 @@ public class TopMetrics implements MetricsSource {
public void report(long currTime, String userName, String cmd) {
LOG.debug("a metric is reported: cmd: {} user: {}", cmd, userName);
userName = UserGroupInformation.trimLoginMethod(userName);
- try {
- for (RollingWindowManager rollingWindowManager : rollingWindowManagers
- .values()) {
- rollingWindowManager.recordMetric(currTime, cmd, userName, 1);
- rollingWindowManager.recordMetric(currTime,
- TopConf.CMD_TOTAL, userName, 1);
- }
- } catch (Throwable t) {
- LOG.error("An error occurred while reflecting the event in top service, "
- + "event: (time,cmd,userName)=(" + currTime + "," + cmd + ","
- + userName);
+ for (RollingWindowManager rollingWindowManager : rollingWindowManagers
+ .values()) {
+ rollingWindowManager.recordMetric(currTime, cmd, userName, 1);
+ rollingWindowManager.recordMetric(currTime,
+ TopConf.ALL_CMDS, userName, 1);
}
}
-
- /***
- *
- * @param period the reporting period length in ms
- * @return
- */
- public static String createTopMetricsRecordName(Long period) {
- return TopConf.TOP_METRICS_RECORD_NAME + "-" + period;
- }
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7b9248/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java
index d818cce..00e7087 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java
@@ -17,21 +17,22 @@
*/
package org.apache.hadoop.hdfs.server.namenode.top.window;
-import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
+import java.util.Stack;
import java.util.concurrent.ConcurrentHashMap;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
/**
* A class to manage the set of {@link RollingWindow}s. This class is the
@@ -46,25 +47,93 @@ public class RollingWindowManager {
public static final Logger LOG = LoggerFactory.getLogger(
RollingWindowManager.class);
- private int windowLenMs;
- private int bucketsPerWindow; // e.g., 10 buckets per minute
- private int topUsersCnt; // e.g., report top 10 metrics
+ private final int windowLenMs;
+ private final int bucketsPerWindow; // e.g., 10 buckets per minute
+ private final int topUsersCnt; // e.g., report top 10 metrics
+
+ static private class RollingWindowMap extends
+ ConcurrentHashMap<String, RollingWindow> {
+ private static final long serialVersionUID = -6785807073237052051L;
+ }
/**
- * Create a metric name composed of the command and user
- *
- * @param command the command executed
- * @param user the user
- * @return a composed metric name
+ * Represents a snapshot of the rolling window. It contains one Op per
+ * operation in the window, with ranked users for each Op.
*/
- @VisibleForTesting
- public static String createMetricName(String command, String user) {
- return command + "." + user;
+ public static class TopWindow {
+ private final int windowMillis;
+ private final List<Op> top;
+
+ public TopWindow(int windowMillis) {
+ this.windowMillis = windowMillis;
+ this.top = Lists.newArrayList();
+ }
+
+ public void addOp(Op op) {
+ top.add(op);
+ }
+
+ public int getWindowLenMs() {
+ return windowMillis;
+ }
+
+ public List<Op> getOps() {
+ return top;
+ }
}
- static private class RollingWindowMap extends
- ConcurrentHashMap<String, RollingWindow> {
- private static final long serialVersionUID = -6785807073237052051L;
+ /**
+ * Represents an operation within a TopWindow. It contains a ranked
+ * set of the top users for the operation.
+ */
+ public static class Op {
+ private final String opType;
+ private final List<User> topUsers;
+ private final long totalCount;
+
+ public Op(String opType, long totalCount) {
+ this.opType = opType;
+ this.topUsers = Lists.newArrayList();
+ this.totalCount = totalCount;
+ }
+
+ public void addUser(User u) {
+ topUsers.add(u);
+ }
+
+ public String getOpType() {
+ return opType;
+ }
+
+ public List<User> getTopUsers() {
+ return topUsers;
+ }
+
+ public long getTotalCount() {
+ return totalCount;
+ }
+ }
+
+ /**
+ * Represents a user who called an Op within a TopWindow. Specifies the
+ * user and the number of times the user called the operation.
+ */
+ public static class User {
+ private final String user;
+ private final long count;
+
+ public User(String user, long count) {
+ this.user = user;
+ this.count = count;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public long getCount() {
+ return count;
+ }
}
/**
@@ -75,8 +144,9 @@ public class RollingWindowManager {
public ConcurrentHashMap<String, RollingWindowMap> metricMap =
new ConcurrentHashMap<String, RollingWindowMap>();
- public RollingWindowManager(Configuration conf, long reportingPeriodMs) {
- windowLenMs = (int) reportingPeriodMs;
+ public RollingWindowManager(Configuration conf, int reportingPeriodMs) {
+
+ windowLenMs = reportingPeriodMs;
bucketsPerWindow =
conf.getInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY,
DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_DEFAULT);
@@ -112,53 +182,71 @@ public class RollingWindowManager {
* Take a snapshot of current top users in the past period.
*
* @param time the current time
- * @return a map between the top metrics and their values. The user is encoded
- * in the metric name. Refer to {@link RollingWindowManager#createMetricName} for
- * the actual format.
+ * @return a TopWindow describing the top users for each metric in the
+ * window.
*/
- public MetricValueMap snapshot(long time) {
- MetricValueMap map = new MetricValueMap();
- Set<String> metricNames = metricMap.keySet();
- LOG.debug("iterating in reported metrics, size={} values={}",
- metricNames.size(), metricNames);
- for (Map.Entry<String,RollingWindowMap> rwEntry: metricMap.entrySet()) {
- String metricName = rwEntry.getKey();
- RollingWindowMap rollingWindows = rwEntry.getValue();
- TopN topN = new TopN(topUsersCnt);
- Iterator<Map.Entry<String, RollingWindow>> iterator =
- rollingWindows.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<String, RollingWindow> entry = iterator.next();
- String userName = entry.getKey();
- RollingWindow aWindow = entry.getValue();
- long windowSum = aWindow.getSum(time);
- // do the gc here
- if (windowSum == 0) {
- LOG.debug("gc window of metric: {} userName: {}",
- metricName, userName);
- iterator.remove();
- continue;
- }
- LOG.debug("offer window of metric: {} userName: {} sum: {}",
- metricName, userName, windowSum);
- topN.offer(new NameValuePair(userName, windowSum));
- }
- int n = topN.size();
- LOG.info("topN size for command " + metricName + " is: " + n);
- if (n == 0) {
+ public TopWindow snapshot(long time) {
+ TopWindow window = new TopWindow(windowLenMs);
+ if (LOG.isDebugEnabled()) {
+ Set<String> metricNames = metricMap.keySet();
+ LOG.debug("iterating in reported metrics, size={} values={}",
+ metricNames.size(), metricNames);
+ }
+ for (Map.Entry<String, RollingWindowMap> entry : metricMap.entrySet()) {
+ String metricName = entry.getKey();
+ RollingWindowMap rollingWindows = entry.getValue();
+ TopN topN = getTopUsersForMetric(time, metricName, rollingWindows);
+ final int size = topN.size();
+ if (size == 0) {
continue;
}
- String allMetricName =
- createMetricName(metricName, TopConf.ALL_USERS);
- map.put(allMetricName, Long.valueOf(topN.total));
- for (int i = 0; i < n; i++) {
- NameValuePair userEntry = topN.poll();
- String userMetricName =
- createMetricName(metricName, userEntry.name);
- map.put(userMetricName, Long.valueOf(userEntry.value));
+ Op op = new Op(metricName, topN.getTotal());
+ window.addOp(op);
+ // Reverse the users from the TopUsers using a stack,
+ // since we'd like them sorted in descending rather than ascending order
+ Stack<NameValuePair> reverse = new Stack<NameValuePair>();
+ for (int i = 0; i < size; i++) {
+ reverse.push(topN.poll());
}
+ for (int i = 0; i < size; i++) {
+ NameValuePair userEntry = reverse.pop();
+ User user = new User(userEntry.name, Long.valueOf(userEntry.value));
+ op.addUser(user);
+ }
+ }
+ return window;
+ }
+
+ /**
+ * Calculates the top N users over a time interval.
+ *
+ * @param time the current time
+ * @param metricName Name of metric
+ * @return
+ */
+ private TopN getTopUsersForMetric(long time, String metricName,
+ RollingWindowMap rollingWindows) {
+ TopN topN = new TopN(topUsersCnt);
+ Iterator<Map.Entry<String, RollingWindow>> iterator =
+ rollingWindows.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<String, RollingWindow> entry = iterator.next();
+ String userName = entry.getKey();
+ RollingWindow aWindow = entry.getValue();
+ long windowSum = aWindow.getSum(time);
+ // do the gc here
+ if (windowSum == 0) {
+ LOG.debug("gc window of metric: {} userName: {}",
+ metricName, userName);
+ iterator.remove();
+ continue;
+ }
+ LOG.debug("offer window of metric: {} userName: {} sum: {}",
+ metricName, userName, windowSum);
+ topN.offer(new NameValuePair(userName, windowSum));
}
- return map;
+ LOG.info("topN size for command {} is: {}", metricName, topN.size());
+ return topN;
}
/**
@@ -190,7 +278,8 @@ public class RollingWindowManager {
}
/**
- * A pair of a name and its corresponding value
+ * A pair of a name and its corresponding value. Defines a custom
+ * comparator so the TopN PriorityQueue sorts based on the count.
*/
static private class NameValuePair implements Comparable<NameValuePair> {
String name;
@@ -254,12 +343,4 @@ public class RollingWindowManager {
return total;
}
}
-
- /**
- * A mapping from metric names to their absolute values and their percentage
- */
- @InterfaceAudience.Private
- public static class MetricValueMap extends HashMap<String, Number> {
- private static final long serialVersionUID = 8936732010242400171L;
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7b9248/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemMBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemMBean.java
index 39e1165..3703c2d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemMBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemMBean.java
@@ -96,6 +96,8 @@ public class TestFSNamesystemMBean {
"MaxObjects"));
Integer numStaleStorages = (Integer) (mbs.getAttribute(
mxbeanNameFsns, "NumStaleStorages"));
+ String topUsers =
+ (String) (mbs.getAttribute(mxbeanNameFsns, "TopUserOpCounts"));
// Metrics that belong to "NameNodeInfo".
// These are metrics that FSNamesystem registers directly with MBeanServer.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7b9248/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
index 03ade90..c649621 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
@@ -26,9 +26,12 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
+import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
import org.apache.hadoop.util.VersionInfo;
+import org.codehaus.jackson.map.ObjectMapper;
import org.junit.Test;
import org.mortbay.util.ajax.JSON;
@@ -38,10 +41,15 @@ import java.io.File;
import java.lang.management.ManagementFactory;
import java.net.URI;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op;
+import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.TopWindow;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
@@ -257,4 +265,112 @@ public class TestNameNodeMXBean {
}
}
}
+
+ @Test(timeout=120000)
+ @SuppressWarnings("unchecked")
+ public void testTopUsers() throws Exception {
+ final Configuration conf = new Configuration();
+ MiniDFSCluster cluster = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+ cluster.waitActive();
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ ObjectName mxbeanNameFsns = new ObjectName(
+ "Hadoop:service=NameNode,name=FSNamesystemState");
+ FileSystem fs = cluster.getFileSystem();
+ final Path path = new Path("/");
+ final int NUM_OPS = 10;
+ for (int i=0; i< NUM_OPS; i++) {
+ fs.listStatus(path);
+ fs.setTimes(path, 0, 1);
+ }
+ String topUsers =
+ (String) (mbs.getAttribute(mxbeanNameFsns, "TopUserOpCounts"));
+ ObjectMapper mapper = new ObjectMapper();
+ Map<String, Object> map = mapper.readValue(topUsers, Map.class);
+ assertTrue("Could not find map key timestamp",
+ map.containsKey("timestamp"));
+ assertTrue("Could not find map key windows", map.containsKey("windows"));
+ List<Map<String, List<Map<String, Object>>>> windows =
+ (List<Map<String, List<Map<String, Object>>>>) map.get("windows");
+ assertEquals("Unexpected num windows", 3, windows.size());
+ for (Map<String, List<Map<String, Object>>> window : windows) {
+ final List<Map<String, Object>> ops = window.get("ops");
+ assertEquals("Unexpected num ops", 3, ops.size());
+ for (Map<String, Object> op: ops) {
+ final long count = Long.parseLong(op.get("totalCount").toString());
+ final String opType = op.get("opType").toString();
+ final int expected;
+ if (opType.equals(TopConf.ALL_CMDS)) {
+ expected = 2*NUM_OPS;
+ } else {
+ expected = NUM_OPS;
+ }
+ assertEquals("Unexpected total count", expected, count);
+ }
+ }
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ @Test(timeout=120000)
+ public void testTopUsersDisabled() throws Exception {
+ final Configuration conf = new Configuration();
+ // Disable nntop
+ conf.setBoolean(DFSConfigKeys.NNTOP_ENABLED_KEY, false);
+ MiniDFSCluster cluster = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+ cluster.waitActive();
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ ObjectName mxbeanNameFsns = new ObjectName(
+ "Hadoop:service=NameNode,name=FSNamesystemState");
+ FileSystem fs = cluster.getFileSystem();
+ final Path path = new Path("/");
+ final int NUM_OPS = 10;
+ for (int i=0; i< NUM_OPS; i++) {
+ fs.listStatus(path);
+ fs.setTimes(path, 0, 1);
+ }
+ String topUsers =
+ (String) (mbs.getAttribute(mxbeanNameFsns, "TopUserOpCounts"));
+ assertNull("Did not expect to find TopUserOpCounts bean!", topUsers);
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ @Test(timeout=120000)
+ public void testTopUsersNoPeriods() throws Exception {
+ final Configuration conf = new Configuration();
+ conf.setBoolean(DFSConfigKeys.NNTOP_ENABLED_KEY, true);
+ conf.set(DFSConfigKeys.NNTOP_WINDOWS_MINUTES_KEY, "");
+ MiniDFSCluster cluster = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+ cluster.waitActive();
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ ObjectName mxbeanNameFsns = new ObjectName(
+ "Hadoop:service=NameNode,name=FSNamesystemState");
+ FileSystem fs = cluster.getFileSystem();
+ final Path path = new Path("/");
+ final int NUM_OPS = 10;
+ for (int i=0; i< NUM_OPS; i++) {
+ fs.listStatus(path);
+ fs.setTimes(path, 0, 1);
+ }
+ String topUsers =
+ (String) (mbs.getAttribute(mxbeanNameFsns, "TopUserOpCounts"));
+ assertNotNull("Expected TopUserOpCounts bean!", topUsers);
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7b9248/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
index c028a4a..6c37822 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
@@ -47,10 +47,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
-import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
-import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
-import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -58,7 +55,6 @@ import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -93,11 +89,6 @@ public class TestNameNodeMetrics {
CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
((Log4JLogger)LogFactory.getLog(MetricsAsserts.class))
.getLogger().setLevel(Level.DEBUG);
- /**
- * need it to test {@link #testTopAuditLogger}
- */
- CONF.set(DFS_NAMENODE_AUDIT_LOGGERS_KEY,
- TopAuditLogger.class.getName());
}
private MiniDFSCluster cluster;
@@ -112,7 +103,6 @@ public class TestNameNodeMetrics {
@Before
public void setUp() throws Exception {
- TopMetrics.reset();//reset the static init done by prev test
cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(DATANODE_COUNT).build();
cluster.waitActive();
namesystem = cluster.getNamesystem();
@@ -465,53 +455,4 @@ public class TestNameNodeMetrics {
assertQuantileGauges("Syncs1s", rb);
assertQuantileGauges("BlockReport1s", rb);
}
-
- /**
- * Test whether {@link TopMetrics} is registered with metrics system
- * @throws Exception
- */
- @Test
- public void testTopMetrics() throws Exception {
- final String testUser = "NNTopTestUser";
- final String testOp = "NNTopTestOp";
- final String metricName =
- RollingWindowManager.createMetricName(testOp, testUser);
- TopMetrics.getInstance().report(testUser, testOp);
- final String regName = TopConf.TOP_METRICS_REGISTRATION_NAME;
- MetricsRecordBuilder rb = getMetrics(regName);
- assertGauge(metricName, 1L, rb);
- }
-
- /**
- * Test whether {@link TopAuditLogger} is registered as an audit logger
- * @throws Exception
- */
- @Test
- public void testTopAuditLogger() throws Exception {
- //note: the top audit logger should already be set in conf
- //issue one command, any command is fine
- FileSystem fs = cluster.getFileSystem();
- long time = System.currentTimeMillis();
- fs.setTimes(new Path("/"), time, time);
- //the command should be reflected in the total count of all users
- final String testUser = TopConf.ALL_USERS;
- final String testOp = TopConf.CMD_TOTAL;
- final String metricName =
- RollingWindowManager.createMetricName(testOp, testUser);
- final String regName = TopConf.TOP_METRICS_REGISTRATION_NAME;
- MetricsRecordBuilder rb = getMetrics(regName);
- assertGaugeGreaterThan(metricName, 1L, rb);
- }
-
- /**
- * Assert a long gauge metric greater than
- * @param name of the metric
- * @param expected minimum expected value of the metric
- * @param rb the record builder mock used to getMetrics
- */
- public static void assertGaugeGreaterThan(String name, long expected,
- MetricsRecordBuilder rb) {
- Assert.assertTrue("Bad value for metric " + name,
- expected <= MetricsAsserts.getLongGauge(name, rb));
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7b9248/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java
index de21714..494ed08 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java
@@ -17,16 +17,19 @@
*/
package org.apache.hadoop.hdfs.server.namenode.top.window;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
import org.junit.Before;
import org.junit.Test;
-import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.MetricValueMap;
+import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op;
+import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.TopWindow;
+import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.User;
+import static org.junit.Assert.assertEquals;
public class TestRollingWindowManager {
@@ -61,33 +64,39 @@ public class TestRollingWindowManager {
for (int i = 0; i < users.length; i++)
manager.recordMetric(time, "close", users[i], i + 1);
time++;
- MetricValueMap tops = manager.snapshot(time);
+ TopWindow tops = manager.snapshot(time);
- assertEquals("The number of returned top metrics is invalid",
- 2 * (N_TOP_USERS + 1), tops.size());
- int userIndex = users.length - 2;
- String metricName = RollingWindowManager.createMetricName("open",
- users[userIndex]);
- boolean includes = tops.containsKey(metricName);
- assertTrue("The order of entries in top metrics is wrong", includes);
- assertEquals("The reported value by top is different from recorded one",
- (userIndex + 1) * 2, ((Long) tops.get(metricName)).longValue());
+ assertEquals("Unexpected number of ops", 2, tops.getOps().size());
+ for (Op op : tops.getOps()) {
+ final List<User> topUsers = op.getTopUsers();
+ assertEquals("Unexpected number of users", N_TOP_USERS, topUsers.size());
+ if (op.getOpType() == "open") {
+ for (int i = 0; i < topUsers.size(); i++) {
+ User user = topUsers.get(i);
+ assertEquals("Unexpected count for user " + user.getUser(),
+ (users.length-i)*2, user.getCount());
+ }
+ // Closed form of sum(range(2,42,2))
+ assertEquals("Unexpected total count for op",
+ (2+(users.length*2))*(users.length/2),
+ op.getTotalCount());
+ }
+ }
// move the window forward not to see the "open" results
time += WINDOW_LEN_MS - 2;
- // top should not include only "close" results
tops = manager.snapshot(time);
- assertEquals("The number of returned top metrics is invalid",
- N_TOP_USERS + 1, tops.size());
- includes = tops.containsKey(metricName);
- assertFalse("After rolling, the top list still includes the stale metrics",
- includes);
-
- metricName = RollingWindowManager.createMetricName("close",
- users[userIndex]);
- includes = tops.containsKey(metricName);
- assertTrue("The order of entries in top metrics is wrong", includes);
- assertEquals("The reported value by top is different from recorded one",
- (userIndex + 1), ((Long) tops.get(metricName)).longValue());
+ assertEquals("Unexpected number of ops", 1, tops.getOps().size());
+ final Op op = tops.getOps().get(0);
+ assertEquals("Should only see close ops", "close", op.getOpType());
+ final List<User> topUsers = op.getTopUsers();
+ for (int i = 0; i < topUsers.size(); i++) {
+ User user = topUsers.get(i);
+ assertEquals("Unexpected count for user " + user.getUser(),
+ (users.length-i), user.getCount());
+ }
+ // Closed form of sum(range(1,21))
+ assertEquals("Unexpected total count for op",
+ (1 + users.length) * (users.length / 2), op.getTotalCount());
}
}