You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2014/12/15 19:36:34 UTC

[43/50] [abbrv] hadoop git commit: HDFS-7426. Change nntop JMX format to be a JSON blob.

HDFS-7426. Change nntop JMX format to be a JSON blob.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fa7b9248
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fa7b9248
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fa7b9248

Branch: refs/heads/YARN-2139
Commit: fa7b9248e415c04bb555772f44fadaf8d9f34974
Parents: e5a6925
Author: Andrew Wang <wa...@apache.org>
Authored: Fri Dec 12 17:04:33 2014 -0800
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Dec 12 17:04:33 2014 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   2 +
 .../hdfs/server/namenode/FSNamesystem.java      |  38 +++-
 .../namenode/metrics/FSNamesystemMBean.java     |   7 +
 .../server/namenode/top/TopAuditLogger.java     |  20 +-
 .../hdfs/server/namenode/top/TopConf.java       |  29 +--
 .../server/namenode/top/metrics/TopMetrics.java | 216 ++++--------------
 .../top/window/RollingWindowManager.java        | 223 +++++++++++++------
 .../server/namenode/TestFSNamesystemMBean.java  |   2 +
 .../server/namenode/TestNameNodeMXBean.java     | 116 ++++++++++
 .../namenode/metrics/TestNameNodeMetrics.java   |  59 -----
 .../top/window/TestRollingWindowManager.java    |  63 +++---
 11 files changed, 417 insertions(+), 358 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7b9248/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index eeedb0d..9dfecc1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -455,6 +455,8 @@ Release 2.7.0 - UNRELEASED
 
     HDFS-7509. Avoid resolving path multiple times. (jing9)
 
+    HDFS-7426. Change nntop JMX format to be a JSON blob. (wang)
+
   OPTIMIZATIONS
 
     HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7b9248/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index b4b897a..1ac19fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -120,6 +120,7 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
@@ -241,6 +242,7 @@ import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
 import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
 import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
 import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
+import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
 import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -281,6 +283,7 @@ import org.apache.hadoop.util.VersionInfo;
 import org.apache.log4j.Appender;
 import org.apache.log4j.AsyncAppender;
 import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.mortbay.util.ajax.JSON;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -539,6 +542,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
   private final FSImage fsImage;
 
+  private final TopConf topConf;
+  private TopMetrics topMetrics;
+
   /**
    * Notify that loading of this FSDirectory is complete, and
    * it is imageLoaded for use
@@ -842,6 +848,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       this.snapshotManager = new SnapshotManager(dir);
       this.cacheManager = new CacheManager(this, conf, blockManager);
       this.safeMode = new SafeModeInfo(conf);
+      this.topConf = new TopConf(conf);
       this.auditLoggers = initAuditLoggers(conf);
       this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
         auditLoggers.get(0) instanceof DefaultAuditLogger;
@@ -952,13 +959,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
 
     // Add audit logger to calculate top users
-    if (conf.getBoolean(DFSConfigKeys.NNTOP_ENABLED_KEY,
-        DFSConfigKeys.NNTOP_ENABLED_DEFAULT)) {
-      String sessionId = conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY);
-      TopConf nntopConf = new TopConf(conf);
-      TopMetrics.initSingleton(conf, NamenodeRole.NAMENODE.name(), sessionId,
-          nntopConf.nntopReportingPeriodsMs);
-      auditLoggers.add(new TopAuditLogger());
+    if (topConf.isEnabled) {
+      topMetrics = new TopMetrics(conf, topConf.nntopReportingPeriodsMs);
+      auditLoggers.add(new TopAuditLogger(topMetrics));
     }
 
     return Collections.unmodifiableList(auditLoggers);
@@ -6013,6 +6016,27 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return getBlockManager().getDatanodeManager().getNumStaleStorages();
   }
 
+  @Override // FSNamesystemMBean
+  public String getTopUserOpCounts() {
+    if (!topConf.isEnabled) {
+      return null;
+    }
+
+    Date now = new Date();
+    final List<RollingWindowManager.TopWindow> topWindows =
+        topMetrics.getTopWindows();
+    Map<String, Object> topMap = new TreeMap<String, Object>();
+    topMap.put("windows", topWindows);
+    topMap.put("timestamp", DFSUtil.dateToIso8601String(now));
+    ObjectMapper mapper = new ObjectMapper();
+    try {
+      return mapper.writeValueAsString(topMap);
+    } catch (IOException e) {
+      LOG.warn("Failed to fetch TopUser metrics", e);
+    }
+    return null;
+  }
+
   /**
    * Increments, logs and then returns the stamp
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7b9248/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java
index 708591b..86f4bd6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java
@@ -164,4 +164,11 @@ public interface FSNamesystemMBean {
    */
   public int getNumStaleStorages();
 
+  /**
+   * Returns a nested JSON object listing the top users for different RPC 
+   * operations over tracked time windows.
+   * 
+   * @return JSON string
+   */
+  public String getTopUserOpCounts();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7b9248/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopAuditLogger.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopAuditLogger.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopAuditLogger.java
index 4f26b17..49c9153 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopAuditLogger.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopAuditLogger.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode.top;
 
 import java.net.InetAddress;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -36,6 +37,14 @@ import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
 public class TopAuditLogger implements AuditLogger {
   public static final Logger LOG = LoggerFactory.getLogger(TopAuditLogger.class);
 
+  private final TopMetrics topMetrics;
+
+  public TopAuditLogger(TopMetrics topMetrics) {
+    Preconditions.checkNotNull(topMetrics, "Cannot init with a null " +
+        "TopMetrics");
+    this.topMetrics = topMetrics;
+  }
+
   @Override
   public void initialize(Configuration conf) {
   }
@@ -43,12 +52,11 @@ public class TopAuditLogger implements AuditLogger {
   @Override
   public void logAuditEvent(boolean succeeded, String userName,
       InetAddress addr, String cmd, String src, String dst, FileStatus status) {
-
-    TopMetrics instance = TopMetrics.getInstance();
-    if (instance != null) {
-      instance.report(succeeded, userName, addr, cmd, src, dst, status);
-    } else {
-      LOG.error("TopMetrics is not initialized yet!");
+    try {
+      topMetrics.report(succeeded, userName, addr, cmd, src, dst, status);
+    } catch (Throwable t) {
+      LOG.error("An error occurred while reflecting the event in top service, "
+          + "event: (cmd={},userName={})", cmd, userName);
     }
 
     if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7b9248/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopConf.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopConf.java
index 0f4ebac..ba82032 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopConf.java
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.top;
 
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.primitives.Ints;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -27,34 +30,34 @@ import com.google.common.base.Preconditions;
  */
 @InterfaceAudience.Private
 public final class TopConf {
-
-  public static final String TOP_METRICS_REGISTRATION_NAME = "topusers";
-  public static final String TOP_METRICS_RECORD_NAME = "topparam";
   /**
-   * A meta command representing the total number of commands
+   * Whether TopMetrics are enabled
    */
-  public static final String CMD_TOTAL = "total";
+  public final boolean isEnabled;
+
   /**
-   * A meta user representing all users
+   * A meta command representing the total number of calls to all commands
    */
-  public static String ALL_USERS = "ALL";
+  public static final String ALL_CMDS = "*";
 
   /**
    * nntop reporting periods in milliseconds
    */
-  public final long[] nntopReportingPeriodsMs;
+  public final int[] nntopReportingPeriodsMs;
 
   public TopConf(Configuration conf) {
+    isEnabled = conf.getBoolean(DFSConfigKeys.NNTOP_ENABLED_KEY,
+        DFSConfigKeys.NNTOP_ENABLED_DEFAULT);
     String[] periodsStr = conf.getTrimmedStrings(
         DFSConfigKeys.NNTOP_WINDOWS_MINUTES_KEY,
         DFSConfigKeys.NNTOP_WINDOWS_MINUTES_DEFAULT);
-    nntopReportingPeriodsMs = new long[periodsStr.length];
+    nntopReportingPeriodsMs = new int[periodsStr.length];
     for (int i = 0; i < periodsStr.length; i++) {
-      nntopReportingPeriodsMs[i] = Integer.parseInt(periodsStr[i]) *
-          60L * 1000L; //min to ms
+      nntopReportingPeriodsMs[i] = Ints.checkedCast(
+          TimeUnit.MINUTES.toMillis(Integer.parseInt(periodsStr[i])));
     }
-    for (long aPeriodMs: nntopReportingPeriodsMs) {
-      Preconditions.checkArgument(aPeriodMs >= 60L * 1000L,
+    for (int aPeriodMs: nntopReportingPeriodsMs) {
+      Preconditions.checkArgument(aPeriodMs >= TimeUnit.MINUTES.toMillis(1),
           "minimum reporting period is 1 min!");
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7b9248/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java
index e8a4e23..ab55392 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java
@@ -17,67 +17,50 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.top.metrics;
 
-import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
-import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
-import static org.apache.hadoop.metrics2.lib.Interns.info;
-
 import java.net.InetAddress;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
 import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
-import org.apache.hadoop.metrics2.MetricsCollector;
-import org.apache.hadoop.metrics2.MetricsRecordBuilder;
-import org.apache.hadoop.metrics2.MetricsSource;
-import org.apache.hadoop.metrics2.MetricsSystem;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
-import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.MetricValueMap;
+import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.TopWindow;
 
-/***
- * The interface to the top metrics
+/**
+ * The interface to the top metrics.
+ * <p/>
+ * Metrics are collected by a custom audit logger, {@link org.apache.hadoop
+ * .hdfs.server.namenode.top.TopAuditLogger}, which calls TopMetrics to
+ * increment per-operation, per-user counts on every audit log call. These
+ * counts are used to show the top users by NameNode operation as well as
+ * across all operations.
+ * <p/>
+ * TopMetrics maintains these counts for a configurable number of time
+ * intervals, e.g. 1min, 5min, 25min. Each interval is tracked by a
+ * RollingWindowManager.
  * <p/>
- * The producers use the {@link #report} method to report events and the
- * consumers use {@link #getMetrics(MetricsCollector, boolean)} to retrieve the
- * current top metrics. The default consumer is JMX but it could be any other
- * user interface.
+ * These metrics are published as a JSON string via {@link org.apache.hadoop
+ * .hdfs.server .namenode.metrics.FSNamesystemMBean#getTopWindows}. This is
+ * done by calling {@link org.apache.hadoop.hdfs.server.namenode.top.window
+ * .RollingWindowManager#snapshot} on each RollingWindowManager.
  * <p/>
  * Thread-safe: relies on thread-safety of RollingWindowManager
  */
 @InterfaceAudience.Private
-public class TopMetrics implements MetricsSource {
+public class TopMetrics {
   public static final Logger LOG = LoggerFactory.getLogger(TopMetrics.class);
 
-  enum Singleton {
-    INSTANCE;
-
-    volatile TopMetrics impl = null;
-
-    synchronized TopMetrics init(Configuration conf, String processName,
-        String sessionId, long[] reportingPeriods) {
-      if (impl == null) {
-        impl =
-            create(conf, processName, sessionId, reportingPeriods,
-                DefaultMetricsSystem.instance());
-      }
-      logConf(conf);
-      return impl;
-    }
-  }
-
   private static void logConf(Configuration conf) {
     LOG.info("NNTop conf: " + DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY +
         " = " +  conf.get(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY));
@@ -88,127 +71,34 @@ public class TopMetrics implements MetricsSource {
   }
 
   /**
-   * Return only the shortest periods for default
-   * TODO: make it configurable
-   */
-  final boolean smallestOnlyDefault = true;
-
-  /**
-   * The smallest of reporting periods
-   */
-  long smallestPeriod = Long.MAX_VALUE;
-
-  /**
-   * processName and sessionId might later be leveraged later when we aggregate
-   * report from multiple federated name nodes
-   */
-  final String processName, sessionId;
-
-  /**
    * A map from reporting periods to WindowManager. Thread-safety is provided by
    * the fact that the mapping is not changed after construction.
    */
-  final Map<Long, RollingWindowManager> rollingWindowManagers =
-      new HashMap<Long, RollingWindowManager>();
+  final Map<Integer, RollingWindowManager> rollingWindowManagers =
+      new HashMap<Integer, RollingWindowManager>();
 
-  TopMetrics(Configuration conf, String processName, String sessionId,
-      long[] reportingPeriods) {
-    this.processName = processName;
-    this.sessionId = sessionId;
+  public TopMetrics(Configuration conf, int[] reportingPeriods) {
+    logConf(conf);
     for (int i = 0; i < reportingPeriods.length; i++) {
-      smallestPeriod = Math.min(smallestPeriod, reportingPeriods[i]);
       rollingWindowManagers.put(reportingPeriods[i], new RollingWindowManager(
           conf, reportingPeriods[i]));
     }
   }
 
-  public static TopMetrics create(Configuration conf, String processName,
-      String sessionId, long[] reportingPeriods, MetricsSystem ms) {
-    return ms.register(TopConf.TOP_METRICS_REGISTRATION_NAME,
-        "top metrics of the namenode in a last period of time", new TopMetrics(
-            conf, processName, sessionId, reportingPeriods));
-  }
-
-  public static TopMetrics initSingleton(Configuration conf,
-      String processName, String sessionId, long[] reportingPeriods) {
-    return Singleton.INSTANCE.init(conf, processName, sessionId,
-        reportingPeriods);
-  }
-
-  public static TopMetrics getInstance() {
-    TopMetrics topMetrics = Singleton.INSTANCE.impl;
-    Preconditions.checkArgument(topMetrics != null,
-          "The TopMetric singleton instance is not initialized."
-              + " Have you called initSingleton first?");
-    return topMetrics;
-  }
-
   /**
-   * In testing, the previous initialization should be reset if the entire
-   * metric system is reinitialized
+   * Get a list of the current TopWindow statistics, one TopWindow per tracked
+   * time interval.
    */
-  @VisibleForTesting
-  public static void reset() {
-    Singleton.INSTANCE.impl = null;
-  }
-
-  @Override
-  public void getMetrics(MetricsCollector collector, boolean all) {
-    long realTime = Time.monotonicNow();
-    getMetrics(smallestOnlyDefault, realTime, collector, all);
-  }
-
-  public void getMetrics(boolean smallestOnly, long currTime,
-      MetricsCollector collector, boolean all) {
-    for (Entry<Long, RollingWindowManager> entry : rollingWindowManagers
+  public List<TopWindow> getTopWindows() {
+    long monoTime = Time.monotonicNow();
+    List<TopWindow> windows = Lists.newArrayListWithCapacity
+        (rollingWindowManagers.size());
+    for (Entry<Integer, RollingWindowManager> entry : rollingWindowManagers
         .entrySet()) {
-      if (!smallestOnly || smallestPeriod == entry.getKey()) {
-        getMetrics(currTime, collector, entry.getKey(), entry.getValue(), all);
-      }
-    }
-  }
-
-  /**
-   * Get metrics for a particular recording period and its corresponding
-   * {@link RollingWindowManager}
-   * <p/>
-   *
-   * @param collector the metric collector
-   * @param period the reporting period
-   * @param rollingWindowManager the window manager corresponding to the
-   *          reporting period
-   * @param all currently ignored
-   */
-  void getMetrics(long currTime, MetricsCollector collector, Long period,
-      RollingWindowManager rollingWindowManager, boolean all) {
-    MetricsRecordBuilder rb =
-        collector.addRecord(createTopMetricsRecordName(period))
-            .setContext("namenode").tag(ProcessName, processName)
-            .tag(SessionId, sessionId);
-
-    MetricValueMap snapshotMetrics = rollingWindowManager.snapshot(currTime);
-    LOG.debug("calling snapshot, result size is: " + snapshotMetrics.size());
-    for (Map.Entry<String, Number> entry : snapshotMetrics.entrySet()) {
-      String key = entry.getKey();
-      Number value = entry.getValue();
-      LOG.debug("checking an entry: key: {} value: {}", key, value);
-      long min = period / 1000L / 60L; //ms -> min
-      String desc = "top user of name node in the past " + min + " minutes";
-
-      if (value instanceof Integer) {
-        rb.addGauge(info(key, desc), (Integer) value);
-      } else if (value instanceof Long) {
-        rb.addGauge(info(key, desc), (Long) value);
-      } else if (value instanceof Float) {
-        rb.addGauge(info(key, desc), (Float) value);
-      } else if (value instanceof Double) {
-        rb.addGauge(info(key, desc), (Double) value);
-      } else {
-        LOG.warn("Unsupported metric type: " + value.getClass());
-      }
+      TopWindow window = entry.getValue().snapshot(monoTime);
+      windows.add(window);
     }
-    LOG.debug("END iterating over metrics, result size is: {}",
-        snapshotMetrics.size());
+    return windows;
   }
 
   /**
@@ -216,18 +106,10 @@ public class TopMetrics implements MetricsSource {
    * log file. This is to be consistent when {@link TopMetrics} is charged with
    * data read back from log files instead of being invoked directly by the
    * FsNamesystem
-   *
-   * @param succeeded
-   * @param userName
-   * @param addr
-   * @param cmd
-   * @param src
-   * @param dst
-   * @param status
    */
   public void report(boolean succeeded, String userName, InetAddress addr,
       String cmd, String src, String dst, FileStatus status) {
-    //currently we nntop makes use of only the username and the command
+    // currently nntop only makes use of the username and the command
     report(userName, cmd);
   }
 
@@ -239,27 +121,11 @@ public class TopMetrics implements MetricsSource {
   public void report(long currTime, String userName, String cmd) {
     LOG.debug("a metric is reported: cmd: {} user: {}", cmd, userName);
     userName = UserGroupInformation.trimLoginMethod(userName);
-    try {
-      for (RollingWindowManager rollingWindowManager : rollingWindowManagers
-          .values()) {
-        rollingWindowManager.recordMetric(currTime, cmd, userName, 1);
-        rollingWindowManager.recordMetric(currTime,
-            TopConf.CMD_TOTAL, userName, 1);
-      }
-    } catch (Throwable t) {
-      LOG.error("An error occurred while reflecting the event in top service, "
-          + "event: (time,cmd,userName)=(" + currTime + "," + cmd + ","
-          + userName);
+    for (RollingWindowManager rollingWindowManager : rollingWindowManagers
+        .values()) {
+      rollingWindowManager.recordMetric(currTime, cmd, userName, 1);
+      rollingWindowManager.recordMetric(currTime,
+          TopConf.ALL_CMDS, userName, 1);
     }
   }
-
-  /***
-   *
-   * @param period the reporting period length in ms
-   * @return
-   */
-  public static String createTopMetricsRecordName(Long period) {
-    return TopConf.TOP_METRICS_RECORD_NAME + "-" + period;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7b9248/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java
index d818cce..00e7087 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java
@@ -17,21 +17,22 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.top.window;
 
-import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Set;
+import java.util.Stack;
 import java.util.concurrent.ConcurrentHashMap;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  * A class to manage the set of {@link RollingWindow}s. This class is the
@@ -46,25 +47,93 @@ public class RollingWindowManager {
   public static final Logger LOG = LoggerFactory.getLogger(
       RollingWindowManager.class);
 
-  private int windowLenMs;
-  private int bucketsPerWindow; // e.g., 10 buckets per minute
-  private int topUsersCnt; // e.g., report top 10 metrics
+  private final int windowLenMs;
+  private final int bucketsPerWindow; // e.g., 10 buckets per minute
+  private final int topUsersCnt; // e.g., report top 10 metrics
+
+  static private class RollingWindowMap extends
+      ConcurrentHashMap<String, RollingWindow> {
+    private static final long serialVersionUID = -6785807073237052051L;
+  }
 
   /**
-   * Create a metric name composed of the command and user
-   *
-   * @param command the command executed
-   * @param user    the user
-   * @return a composed metric name
+   * Represents a snapshot of the rolling window. It contains one Op per 
+   * operation in the window, with ranked users for each Op.
    */
-  @VisibleForTesting
-  public static String createMetricName(String command, String user) {
-    return command + "." + user;
+  public static class TopWindow {
+    private final int windowMillis;
+    private final List<Op> top;
+
+    public TopWindow(int windowMillis) {
+      this.windowMillis = windowMillis;
+      this.top = Lists.newArrayList();
+    }
+
+    public void addOp(Op op) {
+      top.add(op);
+    }
+
+    public int getWindowLenMs() {
+      return windowMillis;
+    }
+
+    public List<Op> getOps() {
+      return top;
+    }
   }
 
-  static private class RollingWindowMap extends
-      ConcurrentHashMap<String, RollingWindow> {
-    private static final long serialVersionUID = -6785807073237052051L;
+  /**
+   * Represents an operation within a TopWindow. It contains a ranked 
+   * set of the top users for the operation.
+   */
+  public static class Op {
+    private final String opType;
+    private final List<User> topUsers;
+    private final long totalCount;
+
+    public Op(String opType, long totalCount) {
+      this.opType = opType;
+      this.topUsers = Lists.newArrayList();
+      this.totalCount = totalCount;
+    }
+
+    public void addUser(User u) {
+      topUsers.add(u);
+    }
+
+    public String getOpType() {
+      return opType;
+    }
+
+    public List<User> getTopUsers() {
+      return topUsers;
+    }
+
+    public long getTotalCount() {
+      return totalCount;
+    }
+  }
+
+  /**
+   * Represents a user who called an Op within a TopWindow. Specifies the 
+   * user and the number of times the user called the operation.
+   */
+  public static class User {
+    private final String user;
+    private final long count;
+
+    public User(String user, long count) {
+      this.user = user;
+      this.count = count;
+    }
+
+    public String getUser() {
+      return user;
+    }
+
+    public long getCount() {
+      return count;
+    }
   }
 
   /**
@@ -75,8 +144,9 @@ public class RollingWindowManager {
   public ConcurrentHashMap<String, RollingWindowMap> metricMap =
       new ConcurrentHashMap<String, RollingWindowMap>();
 
-  public RollingWindowManager(Configuration conf, long reportingPeriodMs) {
-    windowLenMs = (int) reportingPeriodMs;
+  public RollingWindowManager(Configuration conf, int reportingPeriodMs) {
+    
+    windowLenMs = reportingPeriodMs;
     bucketsPerWindow =
         conf.getInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY,
             DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_DEFAULT);
@@ -112,53 +182,71 @@ public class RollingWindowManager {
    * Take a snapshot of current top users in the past period.
    *
    * @param time the current time
-   * @return a map between the top metrics and their values. The user is encoded
-   * in the metric name. Refer to {@link RollingWindowManager#createMetricName} for
-   * the actual format.
+   * @return a TopWindow describing the top users for each metric in the 
+   * window.
    */
-  public MetricValueMap snapshot(long time) {
-    MetricValueMap map = new MetricValueMap();
-    Set<String> metricNames = metricMap.keySet();
-    LOG.debug("iterating in reported metrics, size={} values={}",
-        metricNames.size(), metricNames);
-    for (Map.Entry<String,RollingWindowMap> rwEntry: metricMap.entrySet()) {
-      String metricName = rwEntry.getKey();
-      RollingWindowMap rollingWindows = rwEntry.getValue();
-      TopN topN = new TopN(topUsersCnt);
-      Iterator<Map.Entry<String, RollingWindow>> iterator =
-          rollingWindows.entrySet().iterator();
-      while (iterator.hasNext()) {
-        Map.Entry<String, RollingWindow> entry = iterator.next();
-        String userName = entry.getKey();
-        RollingWindow aWindow = entry.getValue();
-        long windowSum = aWindow.getSum(time);
-        // do the gc here
-        if (windowSum == 0) {
-          LOG.debug("gc window of metric: {} userName: {}",
-              metricName, userName);
-          iterator.remove();
-          continue;
-        }
-        LOG.debug("offer window of metric: {} userName: {} sum: {}",
-            metricName, userName, windowSum);
-        topN.offer(new NameValuePair(userName, windowSum));
-      }
-      int n = topN.size();
-      LOG.info("topN size for command " + metricName + " is: " + n);
-      if (n == 0) {
+  public TopWindow snapshot(long time) {
+    TopWindow window = new TopWindow(windowLenMs);
+    if (LOG.isDebugEnabled()) {
+      Set<String> metricNames = metricMap.keySet();
+      LOG.debug("iterating in reported metrics, size={} values={}",
+          metricNames.size(), metricNames);
+    }
+    for (Map.Entry<String, RollingWindowMap> entry : metricMap.entrySet()) {
+      String metricName = entry.getKey();
+      RollingWindowMap rollingWindows = entry.getValue();
+      TopN topN = getTopUsersForMetric(time, metricName, rollingWindows);
+      final int size = topN.size();
+      if (size == 0) {
         continue;
       }
-      String allMetricName =
-          createMetricName(metricName, TopConf.ALL_USERS);
-      map.put(allMetricName, Long.valueOf(topN.total));
-      for (int i = 0; i < n; i++) {
-        NameValuePair userEntry = topN.poll();
-        String userMetricName =
-            createMetricName(metricName, userEntry.name);
-        map.put(userMetricName, Long.valueOf(userEntry.value));
+      Op op = new Op(metricName, topN.getTotal());
+      window.addOp(op);
+      // Reverse the users from the TopUsers using a stack, 
+      // since we'd like them sorted in descending rather than ascending order
+      Stack<NameValuePair> reverse = new Stack<NameValuePair>();
+      for (int i = 0; i < size; i++) {
+        reverse.push(topN.poll());
       }
+      for (int i = 0; i < size; i++) {
+        NameValuePair userEntry = reverse.pop();
+        User user = new User(userEntry.name, Long.valueOf(userEntry.value));
+        op.addUser(user);
+      }
+    }
+    return window;
+  }
+
+  /**
+   * Calculates the top N users over a time interval.
+   * 
+   * @param time the current time
+   * @param metricName Name of metric
+   * @return
+   */
+  private TopN getTopUsersForMetric(long time, String metricName, 
+      RollingWindowMap rollingWindows) {
+    TopN topN = new TopN(topUsersCnt);
+    Iterator<Map.Entry<String, RollingWindow>> iterator =
+        rollingWindows.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Map.Entry<String, RollingWindow> entry = iterator.next();
+      String userName = entry.getKey();
+      RollingWindow aWindow = entry.getValue();
+      long windowSum = aWindow.getSum(time);
+      // do the gc here
+      if (windowSum == 0) {
+        LOG.debug("gc window of metric: {} userName: {}",
+            metricName, userName);
+        iterator.remove();
+        continue;
+      }
+      LOG.debug("offer window of metric: {} userName: {} sum: {}",
+          metricName, userName, windowSum);
+      topN.offer(new NameValuePair(userName, windowSum));
     }
-    return map;
+    LOG.info("topN size for command {} is: {}", metricName, topN.size());
+    return topN;
   }
 
   /**
@@ -190,7 +278,8 @@ public class RollingWindowManager {
   }
 
   /**
-   * A pair of a name and its corresponding value
+   * A pair of a name and its corresponding value. Defines a custom 
+   * comparator so the TopN PriorityQueue sorts based on the count.
    */
   static private class NameValuePair implements Comparable<NameValuePair> {
     String name;
@@ -254,12 +343,4 @@ public class RollingWindowManager {
       return total;
     }
   }
-
-  /**
-   * A mapping from metric names to their absolute values and their percentage
-   */
-  @InterfaceAudience.Private
-  public static class MetricValueMap extends HashMap<String, Number> {
-    private static final long serialVersionUID = 8936732010242400171L;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7b9248/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemMBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemMBean.java
index 39e1165..3703c2d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemMBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemMBean.java
@@ -96,6 +96,8 @@ public class TestFSNamesystemMBean {
             "MaxObjects"));
         Integer numStaleStorages = (Integer) (mbs.getAttribute(
             mxbeanNameFsns, "NumStaleStorages"));
+        String topUsers =
+            (String) (mbs.getAttribute(mxbeanNameFsns, "TopUserOpCounts"));
 
         // Metrics that belong to "NameNodeInfo".
         // These are metrics that FSNamesystem registers directly with MBeanServer.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7b9248/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
index 03ade90..c649621 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
@@ -26,9 +26,12 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
+import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
 import org.apache.hadoop.util.VersionInfo;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.Test;
 import org.mortbay.util.ajax.JSON;
 
@@ -38,10 +41,15 @@ import java.io.File;
 import java.lang.management.ManagementFactory;
 import java.net.URI;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op;
+import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.TopWindow;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -257,4 +265,112 @@ public class TestNameNodeMXBean {
       }
     }
   }
+
+  @Test(timeout=120000)
+  @SuppressWarnings("unchecked")
+  public void testTopUsers() throws Exception {
+    final Configuration conf = new Configuration();
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+      cluster.waitActive();
+      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+      ObjectName mxbeanNameFsns = new ObjectName(
+          "Hadoop:service=NameNode,name=FSNamesystemState");
+      FileSystem fs = cluster.getFileSystem();
+      final Path path = new Path("/");
+      final int NUM_OPS = 10;
+      for (int i=0; i< NUM_OPS; i++) {
+        fs.listStatus(path);
+        fs.setTimes(path, 0, 1);
+      }
+      String topUsers =
+          (String) (mbs.getAttribute(mxbeanNameFsns, "TopUserOpCounts"));
+      ObjectMapper mapper = new ObjectMapper();
+      Map<String, Object> map = mapper.readValue(topUsers, Map.class);
+      assertTrue("Could not find map key timestamp", 
+          map.containsKey("timestamp"));
+      assertTrue("Could not find map key windows", map.containsKey("windows"));
+      List<Map<String, List<Map<String, Object>>>> windows =
+          (List<Map<String, List<Map<String, Object>>>>) map.get("windows");
+      assertEquals("Unexpected num windows", 3, windows.size());
+      for (Map<String, List<Map<String, Object>>> window : windows) {
+        final List<Map<String, Object>> ops = window.get("ops");
+        assertEquals("Unexpected num ops", 3, ops.size());
+        for (Map<String, Object> op: ops) {
+          final long count = Long.parseLong(op.get("totalCount").toString());
+          final String opType = op.get("opType").toString();
+          final int expected;
+          if (opType.equals(TopConf.ALL_CMDS)) {
+            expected = 2*NUM_OPS;
+          } else {
+            expected = NUM_OPS;
+          }
+          assertEquals("Unexpected total count", expected, count);
+        }
+      }
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  @Test(timeout=120000)
+  public void testTopUsersDisabled() throws Exception {
+    final Configuration conf = new Configuration();
+    // Disable nntop
+    conf.setBoolean(DFSConfigKeys.NNTOP_ENABLED_KEY, false);
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+      cluster.waitActive();
+      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+      ObjectName mxbeanNameFsns = new ObjectName(
+          "Hadoop:service=NameNode,name=FSNamesystemState");
+      FileSystem fs = cluster.getFileSystem();
+      final Path path = new Path("/");
+      final int NUM_OPS = 10;
+      for (int i=0; i< NUM_OPS; i++) {
+        fs.listStatus(path);
+        fs.setTimes(path, 0, 1);
+      }
+      String topUsers =
+          (String) (mbs.getAttribute(mxbeanNameFsns, "TopUserOpCounts"));
+      assertNull("Did not expect to find TopUserOpCounts bean!", topUsers);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  @Test(timeout=120000)
+  public void testTopUsersNoPeriods() throws Exception {
+    final Configuration conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.NNTOP_ENABLED_KEY, true);
+    conf.set(DFSConfigKeys.NNTOP_WINDOWS_MINUTES_KEY, "");
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+      cluster.waitActive();
+      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+      ObjectName mxbeanNameFsns = new ObjectName(
+          "Hadoop:service=NameNode,name=FSNamesystemState");
+      FileSystem fs = cluster.getFileSystem();
+      final Path path = new Path("/");
+      final int NUM_OPS = 10;
+      for (int i=0; i< NUM_OPS; i++) {
+        fs.listStatus(path);
+        fs.setTimes(path, 0, 1);
+      }
+      String topUsers =
+          (String) (mbs.getAttribute(mxbeanNameFsns, "TopUserOpCounts"));
+      assertNotNull("Expected TopUserOpCounts bean!", topUsers);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7b9248/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
index c028a4a..6c37822 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
@@ -47,10 +47,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
-import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
-import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
 import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
-import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.metrics2.MetricsSource;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -58,7 +55,6 @@ import org.apache.hadoop.test.MetricsAsserts;
 import org.apache.hadoop.util.Time;
 import org.apache.log4j.Level;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -93,11 +89,6 @@ public class TestNameNodeMetrics {
     CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
     ((Log4JLogger)LogFactory.getLog(MetricsAsserts.class))
       .getLogger().setLevel(Level.DEBUG);
-    /**
-     * need it to test {@link #testTopAuditLogger}
-     */
-    CONF.set(DFS_NAMENODE_AUDIT_LOGGERS_KEY,
-        TopAuditLogger.class.getName());
   }
   
   private MiniDFSCluster cluster;
@@ -112,7 +103,6 @@ public class TestNameNodeMetrics {
   
   @Before
   public void setUp() throws Exception {
-    TopMetrics.reset();//reset the static init done by prev test
     cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(DATANODE_COUNT).build();
     cluster.waitActive();
     namesystem = cluster.getNamesystem();
@@ -465,53 +455,4 @@ public class TestNameNodeMetrics {
     assertQuantileGauges("Syncs1s", rb);
     assertQuantileGauges("BlockReport1s", rb);
   }
-
-  /**
-   * Test whether {@link TopMetrics} is registered with metrics system
-   * @throws Exception
-   */
-  @Test
-  public void testTopMetrics() throws Exception {
-    final String testUser = "NNTopTestUser";
-    final String testOp = "NNTopTestOp";
-    final String metricName =
-        RollingWindowManager.createMetricName(testOp, testUser);
-    TopMetrics.getInstance().report(testUser, testOp);
-    final String regName = TopConf.TOP_METRICS_REGISTRATION_NAME;
-    MetricsRecordBuilder rb = getMetrics(regName);
-    assertGauge(metricName, 1L, rb);
-  }
-
-  /**
-   * Test whether {@link TopAuditLogger} is registered as an audit logger
-   * @throws Exception
-   */
-  @Test
-  public void testTopAuditLogger() throws Exception {
-    //note: the top audit logger should already be set in conf
-    //issue one command, any command is fine
-    FileSystem fs = cluster.getFileSystem();
-    long time = System.currentTimeMillis();
-    fs.setTimes(new Path("/"), time, time);
-    //the command should be reflected in the total count of all users
-    final String testUser = TopConf.ALL_USERS;
-    final String testOp = TopConf.CMD_TOTAL;
-    final String metricName =
-        RollingWindowManager.createMetricName(testOp, testUser);
-    final String regName = TopConf.TOP_METRICS_REGISTRATION_NAME;
-    MetricsRecordBuilder rb = getMetrics(regName);
-    assertGaugeGreaterThan(metricName, 1L, rb);
-  }
-
-  /**
-   * Assert a long gauge metric greater than
-   * @param name  of the metric
-   * @param expected  minimum expected value of the metric
-   * @param rb  the record builder mock used to getMetrics
-   */
-  public static void assertGaugeGreaterThan(String name, long expected,
-                                 MetricsRecordBuilder rb) {
-    Assert.assertTrue("Bad value for metric " + name,
-        expected <= MetricsAsserts.getLongGauge(name, rb));
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7b9248/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java
index de21714..494ed08 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java
@@ -17,16 +17,19 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.top.window;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.MetricValueMap;
+import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op;
+import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.TopWindow;
+import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.User;
+import static org.junit.Assert.assertEquals;
 
 public class TestRollingWindowManager {
 
@@ -61,33 +64,39 @@ public class TestRollingWindowManager {
     for (int i = 0; i < users.length; i++)
       manager.recordMetric(time, "close", users[i], i + 1);
     time++;
-    MetricValueMap tops = manager.snapshot(time);
+    TopWindow tops = manager.snapshot(time);
 
-    assertEquals("The number of returned top metrics is invalid",
-        2 * (N_TOP_USERS + 1), tops.size());
-    int userIndex = users.length - 2;
-    String metricName = RollingWindowManager.createMetricName("open",
-        users[userIndex]);
-    boolean includes = tops.containsKey(metricName);
-    assertTrue("The order of entries in top metrics is wrong", includes);
-    assertEquals("The reported value by top is different from recorded one",
-        (userIndex + 1) * 2, ((Long) tops.get(metricName)).longValue());
+    assertEquals("Unexpected number of ops", 2, tops.getOps().size());
+    for (Op op : tops.getOps()) {
+      final List<User> topUsers = op.getTopUsers();
+      assertEquals("Unexpected number of users", N_TOP_USERS, topUsers.size());
+      if (op.getOpType() == "open") {
+        for (int i = 0; i < topUsers.size(); i++) {
+          User user = topUsers.get(i);
+          assertEquals("Unexpected count for user " + user.getUser(),
+              (users.length-i)*2, user.getCount());
+        }
+        // Closed form of sum(range(2,42,2))
+        assertEquals("Unexpected total count for op", 
+            (2+(users.length*2))*(users.length/2),
+            op.getTotalCount());
+      }
+    }
 
     // move the window forward not to see the "open" results
     time += WINDOW_LEN_MS - 2;
-    // top should not include only "close" results
     tops = manager.snapshot(time);
-    assertEquals("The number of returned top metrics is invalid",
-        N_TOP_USERS + 1, tops.size());
-    includes = tops.containsKey(metricName);
-    assertFalse("After rolling, the top list still includes the stale metrics",
-        includes);
-
-    metricName = RollingWindowManager.createMetricName("close",
-        users[userIndex]);
-    includes = tops.containsKey(metricName);
-    assertTrue("The order of entries in top metrics is wrong", includes);
-    assertEquals("The reported value by top is different from recorded one",
-        (userIndex + 1), ((Long) tops.get(metricName)).longValue());
+    assertEquals("Unexpected number of ops", 1, tops.getOps().size());
+    final Op op = tops.getOps().get(0);
+    assertEquals("Should only see close ops", "close", op.getOpType());
+    final List<User> topUsers = op.getTopUsers();
+    for (int i = 0; i < topUsers.size(); i++) {
+      User user = topUsers.get(i);
+      assertEquals("Unexpected count for user " + user.getUser(),
+          (users.length-i), user.getCount());
+    }
+    // Closed form of sum(range(1,21))
+    assertEquals("Unexpected total count for op",
+        (1 + users.length) * (users.length / 2), op.getTotalCount());
   }
 }