You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2014/11/18 02:34:09 UTC

hadoop git commit: HDFS-6982. nntop: top­-like tool for name node users. (Maysam Yabandeh via wang)

Repository: hadoop
Updated Branches:
  refs/heads/trunk 2fce6d614 -> dcb8e2442


HDFS-6982. nntop: top­-like tool for name node users. (Maysam Yabandeh via wang)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/dcb8e244
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/dcb8e244
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/dcb8e244

Branch: refs/heads/trunk
Commit: dcb8e24427b02e2f3ff9a12d2eb1eb878e3443bb
Parents: 2fce6d6
Author: Andrew Wang <wa...@apache.org>
Authored: Mon Nov 17 17:31:42 2014 -0800
Committer: Andrew Wang <wa...@apache.org>
Committed: Mon Nov 17 17:33:42 2014 -0800

----------------------------------------------------------------------
 .../hadoop/security/UserGroupInformation.java   |  17 +-
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  15 ++
 .../hdfs/server/namenode/FSNamesystem.java      |  21 +-
 .../server/namenode/top/TopAuditLogger.java     |  74 ++++++
 .../hdfs/server/namenode/top/TopConf.java       |  61 +++++
 .../server/namenode/top/metrics/TopMetrics.java | 265 +++++++++++++++++++
 .../namenode/top/window/RollingWindow.java      | 189 +++++++++++++
 .../top/window/RollingWindowManager.java        | 265 +++++++++++++++++++
 .../src/main/resources/hdfs-default.xml         |  28 ++
 .../hdfs/server/namenode/TestAuditLogger.java   |  29 +-
 .../namenode/metrics/TestNameNodeMetrics.java   |  61 +++++
 .../namenode/top/window/TestRollingWindow.java  |  84 ++++++
 .../top/window/TestRollingWindowManager.java    |  93 +++++++
 14 files changed, 1200 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/dcb8e244/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
index 7a99391..0541f9d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
@@ -787,7 +787,22 @@ public class UserGroupInformation {
     }
     return loginUser;
   }
-  
+
+  /**
+   * remove the login method that is followed by a space from the username
+   * e.g. "jack (auth:SIMPLE)" -> "jack"
+   *
+   * @param userName
+   * @return userName without login method
+   */
+  public static String trimLoginMethod(String userName) {
+    int spaceIndex = userName.indexOf(' ');
+    if (spaceIndex >= 0) {
+      userName = userName.substring(0, spaceIndex);
+    }
+    return userName;
+  }
+
   /**
    * Log in a user using the given subject
    * @parma subject the subject to use when logging in a user, or null to 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dcb8e244/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 8cbff7a..903b890 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -267,6 +267,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-6663. Admin command to track file and locations from block id.
     (Chen He via kihwal)
 
+    HDFS-6982. nntop: top­-like tool for name node users.
+    (Maysam Yabandeh via wang)
+
   IMPROVEMENTS
 
     HDFS-7055. Add tracing to DFSInputStream (cmccabe)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dcb8e244/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index acf5ec6..af18f4d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -741,4 +741,19 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String IGNORE_SECURE_PORTS_FOR_TESTING_KEY =
       "ignore.secure.ports.for.testing";
   public static final boolean IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT = false;
+
+  // nntop Configurations
+  public static final String NNTOP_ENABLED_KEY =
+      "dfs.namenode.top.enabled";
+  public static final boolean NNTOP_ENABLED_DEFAULT = true;
+  public static final String NNTOP_BUCKETS_PER_WINDOW_KEY =
+      "dfs.namenode.top.window.num.buckets";
+  public static final int NNTOP_BUCKETS_PER_WINDOW_DEFAULT = 10;
+  public static final String NNTOP_NUM_USERS_KEY =
+      "dfs.namenode.top.num.users";
+  public static final int NNTOP_NUM_USERS_DEFAULT = 10;
+  // comma separated list of nntop reporting periods in minutes
+  public static final String NNTOP_WINDOWS_MINUTES_KEY =
+      "dfs.namenode.top.windows.minutes";
+  public static final String[] NNTOP_WINDOWS_MINUTES_DEFAULT = {"1","5","25"};
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dcb8e244/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index f1ea818..4598dda 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -239,6 +239,9 @@ import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Co
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Status;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
+import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
+import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
+import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
 import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -881,7 +884,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       throw re;
     }
   }
-  
+
+  @VisibleForTesting
+  public List<AuditLogger> getAuditLoggers() {
+    return auditLoggers;
+  }
+
   @VisibleForTesting
   public RetryCache getRetryCache() {
     return retryCache;
@@ -970,6 +978,17 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     if (auditLoggers.isEmpty()) {
       auditLoggers.add(new DefaultAuditLogger());
     }
+
+    // Add audit logger to calculate top users
+    if (conf.getBoolean(DFSConfigKeys.NNTOP_ENABLED_KEY,
+        DFSConfigKeys.NNTOP_ENABLED_DEFAULT)) {
+      String sessionId = conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY);
+      TopConf nntopConf = new TopConf(conf);
+      TopMetrics.initSingleton(conf, NamenodeRole.NAMENODE.name(), sessionId,
+          nntopConf.nntopReportingPeriodsMs);
+      auditLoggers.add(new TopAuditLogger());
+    }
+
     return Collections.unmodifiableList(auditLoggers);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dcb8e244/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopAuditLogger.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopAuditLogger.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopAuditLogger.java
new file mode 100644
index 0000000..4f26b17
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopAuditLogger.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.top;
+
+import java.net.InetAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hdfs.server.namenode.AuditLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
+
+/**
+ * An {@link AuditLogger} that sends logged data directly to the metrics
+ * systems. It is used when the top service is used directly by the name node
+ */
+@InterfaceAudience.Private
+public class TopAuditLogger implements AuditLogger {
+  public static final Logger LOG = LoggerFactory.getLogger(TopAuditLogger.class);
+
+  @Override
+  public void initialize(Configuration conf) {
+  }
+
+  @Override
+  public void logAuditEvent(boolean succeeded, String userName,
+      InetAddress addr, String cmd, String src, String dst, FileStatus status) {
+
+    TopMetrics instance = TopMetrics.getInstance();
+    if (instance != null) {
+      instance.report(succeeded, userName, addr, cmd, src, dst, status);
+    } else {
+      LOG.error("TopMetrics is not initialized yet!");
+    }
+
+    if (LOG.isDebugEnabled()) {
+      final StringBuilder sb = new StringBuilder();
+      sb.append("allowed=").append(succeeded).append("\t");
+      sb.append("ugi=").append(userName).append("\t");
+      sb.append("ip=").append(addr).append("\t");
+      sb.append("cmd=").append(cmd).append("\t");
+      sb.append("src=").append(src).append("\t");
+      sb.append("dst=").append(dst).append("\t");
+      if (null == status) {
+        sb.append("perm=null");
+      } else {
+        sb.append("perm=");
+        sb.append(status.getOwner()).append(":");
+        sb.append(status.getGroup()).append(":");
+        sb.append(status.getPermission());
+      }
+      LOG.debug("------------------- logged event for top service: " + sb);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dcb8e244/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopConf.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopConf.java
new file mode 100644
index 0000000..0f4ebac
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopConf.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.top;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import com.google.common.base.Preconditions;
+
+/**
+ * This class is a common place for NNTop configuration.
+ */
+@InterfaceAudience.Private
+public final class TopConf {
+
+  public static final String TOP_METRICS_REGISTRATION_NAME = "topusers";
+  public static final String TOP_METRICS_RECORD_NAME = "topparam";
+  /**
+   * A meta command representing the total number of commands
+   */
+  public static final String CMD_TOTAL = "total";
+  /**
+   * A meta user representing all users
+   */
+  public static String ALL_USERS = "ALL";
+
+  /**
+   * nntop reporting periods in milliseconds
+   */
+  public final long[] nntopReportingPeriodsMs;
+
+  public TopConf(Configuration conf) {
+    String[] periodsStr = conf.getTrimmedStrings(
+        DFSConfigKeys.NNTOP_WINDOWS_MINUTES_KEY,
+        DFSConfigKeys.NNTOP_WINDOWS_MINUTES_DEFAULT);
+    nntopReportingPeriodsMs = new long[periodsStr.length];
+    for (int i = 0; i < periodsStr.length; i++) {
+      nntopReportingPeriodsMs[i] = Integer.parseInt(periodsStr[i]) *
+          60L * 1000L; //min to ms
+    }
+    for (long aPeriodMs: nntopReportingPeriodsMs) {
+      Preconditions.checkArgument(aPeriodMs >= 60L * 1000L,
+          "minimum reporting period is 1 min!");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dcb8e244/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java
new file mode 100644
index 0000000..e8a4e23
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.top.metrics;
+
+import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
+import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
+import static org.apache.hadoop.metrics2.lib.Interns.info;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
+import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
+import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.MetricValueMap;
+
+/***
+ * The interface to the top metrics
+ * <p/>
+ * The producers use the {@link #report} method to report events and the
+ * consumers use {@link #getMetrics(MetricsCollector, boolean)} to retrieve the
+ * current top metrics. The default consumer is JMX but it could be any other
+ * user interface.
+ * <p/>
+ * Thread-safe: relies on thread-safety of RollingWindowManager
+ */
+@InterfaceAudience.Private
+public class TopMetrics implements MetricsSource {
+  public static final Logger LOG = LoggerFactory.getLogger(TopMetrics.class);
+
+  enum Singleton {
+    INSTANCE;
+
+    volatile TopMetrics impl = null;
+
+    synchronized TopMetrics init(Configuration conf, String processName,
+        String sessionId, long[] reportingPeriods) {
+      if (impl == null) {
+        impl =
+            create(conf, processName, sessionId, reportingPeriods,
+                DefaultMetricsSystem.instance());
+      }
+      logConf(conf);
+      return impl;
+    }
+  }
+
+  private static void logConf(Configuration conf) {
+    LOG.info("NNTop conf: " + DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY +
+        " = " +  conf.get(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY));
+    LOG.info("NNTop conf: " + DFSConfigKeys.NNTOP_NUM_USERS_KEY +
+        " = " +  conf.get(DFSConfigKeys.NNTOP_NUM_USERS_KEY));
+    LOG.info("NNTop conf: " + DFSConfigKeys.NNTOP_WINDOWS_MINUTES_KEY +
+        " = " +  conf.get(DFSConfigKeys.NNTOP_WINDOWS_MINUTES_KEY));
+  }
+
+  /**
+   * Return only the shortest periods for default
+   * TODO: make it configurable
+   */
+  final boolean smallestOnlyDefault = true;
+
+  /**
+   * The smallest of reporting periods
+   */
+  long smallestPeriod = Long.MAX_VALUE;
+
+  /**
+   * processName and sessionId might later be leveraged later when we aggregate
+   * report from multiple federated name nodes
+   */
+  final String processName, sessionId;
+
+  /**
+   * A map from reporting periods to WindowManager. Thread-safety is provided by
+   * the fact that the mapping is not changed after construction.
+   */
+  final Map<Long, RollingWindowManager> rollingWindowManagers =
+      new HashMap<Long, RollingWindowManager>();
+
+  TopMetrics(Configuration conf, String processName, String sessionId,
+      long[] reportingPeriods) {
+    this.processName = processName;
+    this.sessionId = sessionId;
+    for (int i = 0; i < reportingPeriods.length; i++) {
+      smallestPeriod = Math.min(smallestPeriod, reportingPeriods[i]);
+      rollingWindowManagers.put(reportingPeriods[i], new RollingWindowManager(
+          conf, reportingPeriods[i]));
+    }
+  }
+
+  public static TopMetrics create(Configuration conf, String processName,
+      String sessionId, long[] reportingPeriods, MetricsSystem ms) {
+    return ms.register(TopConf.TOP_METRICS_REGISTRATION_NAME,
+        "top metrics of the namenode in a last period of time", new TopMetrics(
+            conf, processName, sessionId, reportingPeriods));
+  }
+
+  public static TopMetrics initSingleton(Configuration conf,
+      String processName, String sessionId, long[] reportingPeriods) {
+    return Singleton.INSTANCE.init(conf, processName, sessionId,
+        reportingPeriods);
+  }
+
+  public static TopMetrics getInstance() {
+    TopMetrics topMetrics = Singleton.INSTANCE.impl;
+    Preconditions.checkArgument(topMetrics != null,
+          "The TopMetric singleton instance is not initialized."
+              + " Have you called initSingleton first?");
+    return topMetrics;
+  }
+
+  /**
+   * In testing, the previous initialization should be reset if the entire
+   * metric system is reinitialized
+   */
+  @VisibleForTesting
+  public static void reset() {
+    Singleton.INSTANCE.impl = null;
+  }
+
+  @Override
+  public void getMetrics(MetricsCollector collector, boolean all) {
+    long realTime = Time.monotonicNow();
+    getMetrics(smallestOnlyDefault, realTime, collector, all);
+  }
+
+  public void getMetrics(boolean smallestOnly, long currTime,
+      MetricsCollector collector, boolean all) {
+    for (Entry<Long, RollingWindowManager> entry : rollingWindowManagers
+        .entrySet()) {
+      if (!smallestOnly || smallestPeriod == entry.getKey()) {
+        getMetrics(currTime, collector, entry.getKey(), entry.getValue(), all);
+      }
+    }
+  }
+
+  /**
+   * Get metrics for a particular recording period and its corresponding
+   * {@link RollingWindowManager}
+   * <p/>
+   *
+   * @param collector the metric collector
+   * @param period the reporting period
+   * @param rollingWindowManager the window manager corresponding to the
+   *          reporting period
+   * @param all currently ignored
+   */
+  void getMetrics(long currTime, MetricsCollector collector, Long period,
+      RollingWindowManager rollingWindowManager, boolean all) {
+    MetricsRecordBuilder rb =
+        collector.addRecord(createTopMetricsRecordName(period))
+            .setContext("namenode").tag(ProcessName, processName)
+            .tag(SessionId, sessionId);
+
+    MetricValueMap snapshotMetrics = rollingWindowManager.snapshot(currTime);
+    LOG.debug("calling snapshot, result size is: " + snapshotMetrics.size());
+    for (Map.Entry<String, Number> entry : snapshotMetrics.entrySet()) {
+      String key = entry.getKey();
+      Number value = entry.getValue();
+      LOG.debug("checking an entry: key: {} value: {}", key, value);
+      long min = period / 1000L / 60L; //ms -> min
+      String desc = "top user of name node in the past " + min + " minutes";
+
+      if (value instanceof Integer) {
+        rb.addGauge(info(key, desc), (Integer) value);
+      } else if (value instanceof Long) {
+        rb.addGauge(info(key, desc), (Long) value);
+      } else if (value instanceof Float) {
+        rb.addGauge(info(key, desc), (Float) value);
+      } else if (value instanceof Double) {
+        rb.addGauge(info(key, desc), (Double) value);
+      } else {
+        LOG.warn("Unsupported metric type: " + value.getClass());
+      }
+    }
+    LOG.debug("END iterating over metrics, result size is: {}",
+        snapshotMetrics.size());
+  }
+
+  /**
+   * Pick the same information that DefaultAuditLogger does before writing to a
+   * log file. This is to be consistent when {@link TopMetrics} is charged with
+   * data read back from log files instead of being invoked directly by the
+   * FsNamesystem
+   *
+   * @param succeeded
+   * @param userName
+   * @param addr
+   * @param cmd
+   * @param src
+   * @param dst
+   * @param status
+   */
+  public void report(boolean succeeded, String userName, InetAddress addr,
+      String cmd, String src, String dst, FileStatus status) {
+    //currently we nntop makes use of only the username and the command
+    report(userName, cmd);
+  }
+
+  public void report(String userName, String cmd) {
+    long currTime = Time.monotonicNow();
+    report(currTime, userName, cmd);
+  }
+
+  public void report(long currTime, String userName, String cmd) {
+    LOG.debug("a metric is reported: cmd: {} user: {}", cmd, userName);
+    userName = UserGroupInformation.trimLoginMethod(userName);
+    try {
+      for (RollingWindowManager rollingWindowManager : rollingWindowManagers
+          .values()) {
+        rollingWindowManager.recordMetric(currTime, cmd, userName, 1);
+        rollingWindowManager.recordMetric(currTime,
+            TopConf.CMD_TOTAL, userName, 1);
+      }
+    } catch (Throwable t) {
+      LOG.error("An error occurred while reflecting the event in top service, "
+          + "event: (time,cmd,userName)=(" + currTime + "," + cmd + ","
+          + userName);
+    }
+  }
+
+  /***
+   *
+   * @param period the reporting period length in ms
+   * @return
+   */
+  public static String createTopMetricsRecordName(Long period) {
+    return TopConf.TOP_METRICS_RECORD_NAME + "-" + period;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dcb8e244/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindow.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindow.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindow.java
new file mode 100644
index 0000000..63ff125
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindow.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.top.window;
+
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A class for exposing a rolling window view on the event that occur over time.
+ * Events are reported based on occurrence time. The total number of events in
+ * the last period covered by the rolling window can be retrieved by the
+ * {@link #getSum(long)} method.
+ * <p/>
+ *
+ * Assumptions:
+ * <p/>
+ *
+ * (1) Concurrent invocation of {@link #incAt} method are possible
+ * <p/>
+ *
+ * (2) The time parameter of two consecutive invocation of {@link #incAt} could
+ * be in any given order
+ * <p/>
+ *
+ * (3) The buffering delays are not more than the window length, i.e., after two
+ * consecutive invocation {@link #incAt(long time1, long)} and
+ * {@link #incAt(long time2, long)}, time1 < time2 || time1 - time2 < windowLenMs.
+ * This assumption helps avoiding unnecessary synchronizations.
+ * <p/>
+ *
+ * Thread-safety is built in the {@link RollingWindow.Bucket}
+ */
+@InterfaceAudience.Private
+public class RollingWindow {
+  private static final Logger LOG = LoggerFactory.getLogger(RollingWindow.class);
+
+  /**
+   * Each window is composed of buckets, which offer a trade-off between
+   * accuracy and space complexity: the lower the number of buckets, the less
+   * memory is required by the rolling window but more inaccuracy is possible in
+   * reading window total values.
+   */
+  Bucket[] buckets;
+  final int windowLenMs;
+  final int bucketSize;
+
+  /**
+   * @param windowLenMs The period that is covered by the window. This period must
+   *          be more than the buffering delays.
+   * @param numBuckets number of buckets in the window
+   */
+  RollingWindow(int windowLenMs, int numBuckets) {
+    buckets = new Bucket[numBuckets];
+    for (int i = 0; i < numBuckets; i++) {
+      buckets[i] = new Bucket();
+    }
+    this.windowLenMs = windowLenMs;
+    this.bucketSize = windowLenMs / numBuckets;
+    if (this.bucketSize % bucketSize != 0) {
+      throw new IllegalArgumentException(
+          "The bucket size in the rolling window is not integer: windowLenMs= "
+              + windowLenMs + " numBuckets= " + numBuckets);
+    }
+  }
+
+  /**
+   * When an event occurs at the specified time, this method reflects that in
+   * the rolling window.
+   * <p/>
+   *
+   * @param time the time at which the event occurred
+   * @param delta the delta that will be added to the window
+   */
+  public void incAt(long time, long delta) {
+    int bi = computeBucketIndex(time);
+    Bucket bucket = buckets[bi];
+    // If the last time the bucket was updated is out of the scope of the
+    // rolling window, reset the bucket.
+    if (bucket.isStaleNow(time)) {
+      bucket.safeReset(time);
+    }
+    bucket.inc(delta);
+  }
+
+  private int computeBucketIndex(long time) {
+    int positionOnWindow = (int) (time % windowLenMs);
+    int bucketIndex = positionOnWindow * buckets.length / windowLenMs;
+    return bucketIndex;
+  }
+
+  /**
+   * Thread-safety is provided by synchronization when resetting the update time
+   * as well as atomic fields.
+   */
+  private class Bucket {
+    AtomicLong value = new AtomicLong(0);
+    AtomicLong updateTime = new AtomicLong(0);
+
+    /**
+     * Check whether the last time that the bucket was updated is no longer
+     * covered by rolling window.
+     *
+     * @param time the current time
+     * @return true if the bucket state is stale
+     */
+    boolean isStaleNow(long time) {
+      long utime = updateTime.get();
+      return time - utime >= windowLenMs;
+    }
+
+    /**
+     * Safely reset the bucket state considering concurrent updates (inc) and
+     * resets.
+     *
+     * @param time the current time
+     */
+    void safeReset(long time) {
+      // At any point in time, only one thread is allowed to reset the
+      // bucket
+      synchronized (this) {
+        if (isStaleNow(time)) {
+          // reset the value before setting the time, it allows other
+          // threads to safely assume that the value is updated if the
+          // time is not stale
+          value.set(0);
+          updateTime.set(time);
+        }
+        // else a concurrent thread has already reset it: do nothing
+      }
+    }
+
+    /**
+     * Increment the bucket. It assumes that staleness check is already
+     * performed. We do not need to update the {@link #updateTime} because as
+     * long as the {@link #updateTime} belongs to the current view of the
+     * rolling window, the algorithm works fine.
+     */
+    void inc(long delta) {
+      value.addAndGet(delta);
+    }
+  }
+
+  /**
+   * Get value represented by this window at the specified time
+   * <p/>
+   *
+   * If time lags behind the latest update time, the new updates are still
+   * included in the sum
+   *
+   * @param time
+   * @return number of events occurred in the past period
+   */
+  public long getSum(long time) {
+    long sum = 0;
+    for (Bucket bucket : buckets) {
+      boolean stale = bucket.isStaleNow(time);
+      if (!stale) {
+        sum += bucket.value.get();
+      }
+      if (LOG.isDebugEnabled()) {
+        long bucketTime = bucket.updateTime.get();
+        String timeStr = new Date(bucketTime).toString();
+        LOG.debug("Sum: + " + sum + " Bucket: updateTime: " + timeStr + " ("
+            + bucketTime + ") isStale " + stale + " at " + time);
+      }
+    }
+    return sum;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dcb8e244/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java
new file mode 100644
index 0000000..d818cce
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.top.window;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A class to manage the set of {@link RollingWindow}s. This class is the
+ * interface of metrics system to the {@link RollingWindow}s to retrieve the
+ * current top metrics.
+ * <p/>
+ * Thread-safety is provided by each {@link RollingWindow} being thread-safe as
+ * well as {@link ConcurrentHashMap} for the collection of them.
+ */
+@InterfaceAudience.Private
+public class RollingWindowManager {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      RollingWindowManager.class);
+
+  private int windowLenMs;
+  private int bucketsPerWindow; // e.g., 10 buckets per minute
+  private int topUsersCnt; // e.g., report top 10 metrics
+
+  /**
+   * Create a metric name composed of the command and user
+   *
+   * @param command the command executed
+   * @param user    the user
+   * @return a composed metric name
+   */
+  @VisibleForTesting
+  public static String createMetricName(String command, String user) {
+    return command + "." + user;
+  }
+
+  static private class RollingWindowMap extends
+      ConcurrentHashMap<String, RollingWindow> {
+    private static final long serialVersionUID = -6785807073237052051L;
+  }
+
+  /**
+   * A mapping from each reported metric to its {@link RollingWindowMap} that
+   * maintains the set of {@link RollingWindow}s for the users that have
+   * operated on that metric.
+   */
+  public ConcurrentHashMap<String, RollingWindowMap> metricMap =
+      new ConcurrentHashMap<String, RollingWindowMap>();
+
+  public RollingWindowManager(Configuration conf, long reportingPeriodMs) {
+    windowLenMs = (int) reportingPeriodMs;
+    bucketsPerWindow =
+        conf.getInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY,
+            DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_DEFAULT);
+    Preconditions.checkArgument(bucketsPerWindow > 0,
+        "a window should have at least one bucket");
+    Preconditions.checkArgument(bucketsPerWindow <= windowLenMs,
+        "the minimum size of a bucket is 1 ms");
+    //same-size buckets
+    Preconditions.checkArgument(windowLenMs % bucketsPerWindow == 0,
+        "window size must be a multiplication of number of buckets");
+    topUsersCnt =
+        conf.getInt(DFSConfigKeys.NNTOP_NUM_USERS_KEY,
+            DFSConfigKeys.NNTOP_NUM_USERS_DEFAULT);
+    Preconditions.checkArgument(topUsersCnt > 0,
+        "the number of requested top users must be at least 1");
+  }
+
+  /**
+   * Called when the metric command is changed by "delta" units at time "time"
+   * via user "user"
+   *
+   * @param time the time of the event
+   * @param command the metric that is updated, e.g., the operation name
+   * @param user the user that updated the metric
+   * @param delta the amount of change in the metric, e.g., +1
+   */
+  public void recordMetric(long time, String command, String user, long delta) {
+    RollingWindow window = getRollingWindow(command, user);
+    window.incAt(time, delta);
+  }
+
+  /**
+   * Take a snapshot of current top users in the past period.
+   *
+   * @param time the current time
+   * @return a map between the top metrics and their values. The user is encoded
+   * in the metric name. Refer to {@link RollingWindowManager#createMetricName} for
+   * the actual format.
+   */
+  public MetricValueMap snapshot(long time) {
+    MetricValueMap map = new MetricValueMap();
+    Set<String> metricNames = metricMap.keySet();
+    LOG.debug("iterating in reported metrics, size={} values={}",
+        metricNames.size(), metricNames);
+    for (Map.Entry<String,RollingWindowMap> rwEntry: metricMap.entrySet()) {
+      String metricName = rwEntry.getKey();
+      RollingWindowMap rollingWindows = rwEntry.getValue();
+      TopN topN = new TopN(topUsersCnt);
+      Iterator<Map.Entry<String, RollingWindow>> iterator =
+          rollingWindows.entrySet().iterator();
+      while (iterator.hasNext()) {
+        Map.Entry<String, RollingWindow> entry = iterator.next();
+        String userName = entry.getKey();
+        RollingWindow aWindow = entry.getValue();
+        long windowSum = aWindow.getSum(time);
+        // do the gc here
+        if (windowSum == 0) {
+          LOG.debug("gc window of metric: {} userName: {}",
+              metricName, userName);
+          iterator.remove();
+          continue;
+        }
+        LOG.debug("offer window of metric: {} userName: {} sum: {}",
+            metricName, userName, windowSum);
+        topN.offer(new NameValuePair(userName, windowSum));
+      }
+      int n = topN.size();
+      LOG.info("topN size for command " + metricName + " is: " + n);
+      if (n == 0) {
+        continue;
+      }
+      String allMetricName =
+          createMetricName(metricName, TopConf.ALL_USERS);
+      map.put(allMetricName, Long.valueOf(topN.total));
+      for (int i = 0; i < n; i++) {
+        NameValuePair userEntry = topN.poll();
+        String userMetricName =
+            createMetricName(metricName, userEntry.name);
+        map.put(userMetricName, Long.valueOf(userEntry.value));
+      }
+    }
+    return map;
+  }
+
+  /**
+   * Get the rolling window specified by metric and user.
+   *
+   * @param metric the updated metric
+   * @param user the user that updated the metric
+   * @return the rolling window
+   */
+  private RollingWindow getRollingWindow(String metric, String user) {
+    RollingWindowMap rwMap = metricMap.get(metric);
+    if (rwMap == null) {
+      rwMap = new RollingWindowMap();
+      RollingWindowMap prevRwMap = metricMap.putIfAbsent(metric, rwMap);
+      if (prevRwMap != null) {
+        rwMap = prevRwMap;
+      }
+    }
+    RollingWindow window = rwMap.get(user);
+    if (window != null) {
+      return window;
+    }
+    window = new RollingWindow(windowLenMs, bucketsPerWindow);
+    RollingWindow prevWindow = rwMap.putIfAbsent(user, window);
+    if (prevWindow != null) {
+      window = prevWindow;
+    }
+    return window;
+  }
+
+  /**
+   * A pair of a name and its corresponding value
+   */
+  static private class NameValuePair implements Comparable<NameValuePair> {
+    String name;
+    long value;
+
+    public NameValuePair(String metricName, long value) {
+      this.name = metricName;
+      this.value = value;
+    }
+
+    @Override
+    public int compareTo(NameValuePair other) {
+      return (int) (value - other.value);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (other instanceof NameValuePair) {
+        return compareTo((NameValuePair)other) == 0;
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return Long.valueOf(value).hashCode();
+    }
+  }
+
+  /**
+   * A fixed-size priority queue, used to retrieve top-n of offered entries.
+   */
+  static private class TopN extends PriorityQueue<NameValuePair> {
+    private static final long serialVersionUID = 5134028249611535803L;
+    int n; // > 0
+    private long total = 0;
+
+    TopN(int n) {
+      super(n);
+      this.n = n;
+    }
+
+    @Override
+    public boolean offer(NameValuePair entry) {
+      updateTotal(entry.value);
+      if (size() == n) {
+        NameValuePair smallest = peek();
+        if (smallest.value >= entry.value) {
+          return false;
+        }
+        poll(); // remove smallest
+      }
+      return super.offer(entry);
+    }
+
+    private void updateTotal(long value) {
+      total += value;
+    }
+
+    public long getTotal() {
+      return total;
+    }
+  }
+
+  /**
+   * A mapping from metric names to their absolute values and their percentage
+   */
+  @InterfaceAudience.Private
+  public static class MetricValueMap extends HashMap<String, Number> {
+    private static final long serialVersionUID = 8936732010242400171L;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dcb8e244/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 31145ba..06d7ba8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2224,4 +2224,32 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.namenode.top.enabled</name>
+  <value>true</value>
+  <description>Enable nntop: reporting top users on namenode
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.top.window.num.buckets</name>
+  <value>10</value>
+  <description>Number of buckets in the rolling window implementation of nntop
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.top.num.users</name>
+  <value>10</value>
+  <description>Number of top users returned by the top tool
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.top.windows.minutes</name>
+  <value>1,5,25</value>
+  <description>comma separated list of nntop reporting periods in minutes
+  </description>
+</property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dcb8e244/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java
index e1e1c67..c91cd75 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java
@@ -20,10 +20,9 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.NNTOP_ENABLED_KEY;
 import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Matchers.anyString;
 
@@ -43,6 +42,7 @@ import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
 import org.apache.hadoop.hdfs.web.resources.GetOpParam;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
@@ -94,6 +94,29 @@ public class TestAuditLogger {
     }
   }
 
+  /**
+   * Tests that TopAuditLogger can be disabled
+   */
+  @Test
+  public void testDisableTopAuditLogger() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(NNTOP_ENABLED_KEY, false);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+
+    try {
+      cluster.waitClusterUp();
+      List<AuditLogger> auditLoggers =
+          cluster.getNameNode().getNamesystem().getAuditLoggers();
+      for (AuditLogger auditLogger : auditLoggers) {
+        assertFalse(
+            "top audit logger is still hooked in after it is disabled",
+            auditLogger instanceof TopAuditLogger);
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   @Test
   public void testWebHdfsAuditLogger() throws IOException, URISyntaxException {
     Configuration conf = new HdfsConfiguration();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dcb8e244/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
index 1afbcea..c028a4a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.metrics;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
 import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
 import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
 import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges;
@@ -46,6 +47,10 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
+import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
+import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
+import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.metrics2.MetricsSource;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -53,6 +58,7 @@ import org.apache.hadoop.test.MetricsAsserts;
 import org.apache.hadoop.util.Time;
 import org.apache.log4j.Level;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -87,6 +93,11 @@ public class TestNameNodeMetrics {
     CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
     ((Log4JLogger)LogFactory.getLog(MetricsAsserts.class))
       .getLogger().setLevel(Level.DEBUG);
+    /**
+     * need it to test {@link #testTopAuditLogger}
+     */
+    CONF.set(DFS_NAMENODE_AUDIT_LOGGERS_KEY,
+        TopAuditLogger.class.getName());
   }
   
   private MiniDFSCluster cluster;
@@ -101,6 +112,7 @@ public class TestNameNodeMetrics {
   
   @Before
   public void setUp() throws Exception {
+    TopMetrics.reset();//reset the static init done by prev test
     cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(DATANODE_COUNT).build();
     cluster.waitActive();
     namesystem = cluster.getNamesystem();
@@ -453,4 +465,53 @@ public class TestNameNodeMetrics {
     assertQuantileGauges("Syncs1s", rb);
     assertQuantileGauges("BlockReport1s", rb);
   }
+
+  /**
+   * Test whether {@link TopMetrics} is registered with metrics system
+   * @throws Exception
+   */
+  @Test
+  public void testTopMetrics() throws Exception {
+    final String testUser = "NNTopTestUser";
+    final String testOp = "NNTopTestOp";
+    final String metricName =
+        RollingWindowManager.createMetricName(testOp, testUser);
+    TopMetrics.getInstance().report(testUser, testOp);
+    final String regName = TopConf.TOP_METRICS_REGISTRATION_NAME;
+    MetricsRecordBuilder rb = getMetrics(regName);
+    assertGauge(metricName, 1L, rb);
+  }
+
+  /**
+   * Test whether {@link TopAuditLogger} is registered as an audit logger
+   * @throws Exception
+   */
+  @Test
+  public void testTopAuditLogger() throws Exception {
+    //note: the top audit logger should already be set in conf
+    //issue one command, any command is fine
+    FileSystem fs = cluster.getFileSystem();
+    long time = System.currentTimeMillis();
+    fs.setTimes(new Path("/"), time, time);
+    //the command should be reflected in the total count of all users
+    final String testUser = TopConf.ALL_USERS;
+    final String testOp = TopConf.CMD_TOTAL;
+    final String metricName =
+        RollingWindowManager.createMetricName(testOp, testUser);
+    final String regName = TopConf.TOP_METRICS_REGISTRATION_NAME;
+    MetricsRecordBuilder rb = getMetrics(regName);
+    assertGaugeGreaterThan(metricName, 1L, rb);
+  }
+
+  /**
+   * Assert a long gauge metric greater than
+   * @param name  of the metric
+   * @param expected  minimum expected value of the metric
+   * @param rb  the record builder mock used to getMetrics
+   */
+  public static void assertGaugeGreaterThan(String name, long expected,
+                                 MetricsRecordBuilder rb) {
+    Assert.assertTrue("Bad value for metric " + name,
+        expected <= MetricsAsserts.getLongGauge(name, rb));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dcb8e244/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindow.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindow.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindow.java
new file mode 100644
index 0000000..804c641
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindow.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.top.window;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRollingWindow {
+
+  final int WINDOW_LEN = 60000;
+  final int BUCKET_CNT = 10;
+  final int BUCKET_LEN = WINDOW_LEN / BUCKET_CNT;
+
+  @Test
+  public void testBasics() {
+    RollingWindow window = new RollingWindow(WINDOW_LEN, BUCKET_CNT);
+    long time = 1;
+    Assert.assertEquals("The initial sum of rolling window must be 0", 0,
+        window.getSum(time));
+    time = WINDOW_LEN + BUCKET_LEN * 3 / 2;
+    Assert.assertEquals("The initial sum of rolling window must be 0", 0,
+        window.getSum(time));
+
+    window.incAt(time, 5);
+    Assert.assertEquals(
+        "The sum of rolling window does not reflect the recent update", 5,
+        window.getSum(time));
+
+    time += BUCKET_LEN;
+    window.incAt(time, 6);
+    Assert.assertEquals(
+        "The sum of rolling window does not reflect the recent update", 11,
+        window.getSum(time));
+
+    time += WINDOW_LEN - BUCKET_LEN;
+    Assert.assertEquals(
+        "The sum of rolling window does not reflect rolling effect", 6,
+        window.getSum(time));
+
+    time += BUCKET_LEN;
+    Assert.assertEquals(
+        "The sum of rolling window does not reflect rolling effect", 0,
+        window.getSum(time));
+  }
+
+  @Test
+  public void testReorderedAccess() {
+    RollingWindow window = new RollingWindow(WINDOW_LEN, BUCKET_CNT);
+    long time = 2 * WINDOW_LEN + BUCKET_LEN * 3 / 2;
+    window.incAt(time, 5);
+
+    time++;
+    Assert.assertEquals(
+        "The sum of rolling window does not reflect the recent update", 5,
+        window.getSum(time));
+
+    long reorderedTime = time - 2 * BUCKET_LEN;
+    window.incAt(reorderedTime, 6);
+    Assert.assertEquals(
+        "The sum of rolling window does not reflect the reordered update", 11,
+        window.getSum(time));
+
+    time = reorderedTime + WINDOW_LEN;
+    Assert.assertEquals(
+        "The sum of rolling window does not reflect rolling effect", 5,
+        window.getSum(time));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dcb8e244/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java
new file mode 100644
index 0000000..de21714
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.top.window;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.MetricValueMap;
+
+public class TestRollingWindowManager {
+
+  Configuration conf;
+  RollingWindowManager manager;
+  String[] users;
+  final static int MIN_2_MS = 60000;
+
+  final int WINDOW_LEN_MS = 1 * MIN_2_MS;
+  final int BUCKET_CNT = 10;
+  final int N_TOP_USERS = 10;
+  final int BUCKET_LEN = WINDOW_LEN_MS / BUCKET_CNT;
+
+  @Before
+  public void init() {
+    conf = new Configuration();
+    conf.setInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY, BUCKET_CNT);
+    conf.setInt(DFSConfigKeys.NNTOP_NUM_USERS_KEY, N_TOP_USERS);
+    manager = new RollingWindowManager(conf, WINDOW_LEN_MS);
+    users = new String[2 * N_TOP_USERS];
+    for (int i = 0; i < users.length; i++) {
+      users[i] = "user" + i;
+    }
+  }
+
+  @Test
+  public void testTops() {
+    long time = WINDOW_LEN_MS + BUCKET_LEN * 3 / 2;
+    for (int i = 0; i < users.length; i++)
+      manager.recordMetric(time, "open", users[i], (i + 1) * 2);
+    time++;
+    for (int i = 0; i < users.length; i++)
+      manager.recordMetric(time, "close", users[i], i + 1);
+    time++;
+    MetricValueMap tops = manager.snapshot(time);
+
+    assertEquals("The number of returned top metrics is invalid",
+        2 * (N_TOP_USERS + 1), tops.size());
+    int userIndex = users.length - 2;
+    String metricName = RollingWindowManager.createMetricName("open",
+        users[userIndex]);
+    boolean includes = tops.containsKey(metricName);
+    assertTrue("The order of entries in top metrics is wrong", includes);
+    assertEquals("The reported value by top is different from recorded one",
+        (userIndex + 1) * 2, ((Long) tops.get(metricName)).longValue());
+
+    // move the window forward not to see the "open" results
+    time += WINDOW_LEN_MS - 2;
+    // top should not include only "close" results
+    tops = manager.snapshot(time);
+    assertEquals("The number of returned top metrics is invalid",
+        N_TOP_USERS + 1, tops.size());
+    includes = tops.containsKey(metricName);
+    assertFalse("After rolling, the top list still includes the stale metrics",
+        includes);
+
+    metricName = RollingWindowManager.createMetricName("close",
+        users[userIndex]);
+    includes = tops.containsKey(metricName);
+    assertTrue("The order of entries in top metrics is wrong", includes);
+    assertEquals("The reported value by top is different from recorded one",
+        (userIndex + 1), ((Long) tops.get(metricName)).longValue());
+  }
+}