You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2018/08/29 23:38:26 UTC

[1/3] hbase git commit: HBASE-15728 Add Per-Table metrics back

Repository: hbase
Updated Branches:
  refs/heads/branch-1 492d69daa -> fb74f215b
  refs/heads/branch-2 737ac4847 -> 72a29211c
  refs/heads/master fcd883b5d -> e903ae5d5


HBASE-15728 Add Per-Table metrics back

Signed-off-by: Andrew Purtell <ap...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e903ae5d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e903ae5d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e903ae5d

Branch: refs/heads/master
Commit: e903ae5d5ec98e71c2c28e2cbd55309a17e34612
Parents: fcd883b
Author: Xu Cang <xc...@salesforce.com>
Authored: Wed Aug 29 15:08:29 2018 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Wed Aug 29 15:21:00 2018 -0700

----------------------------------------------------------------------
 .../regionserver/MetricsRegionServerSource.java |   6 +-
 .../regionserver/MetricsRegionWrapper.java      |   2 +
 .../MetricsTableAggregateSource.java            |   8 +-
 .../hbase/regionserver/MetricsTableSource.java  |  90 +++++-
 .../MetricsTableWrapperAggregate.java           |  60 +++-
 .../MetricsRegionServerSourceImpl.java          |   3 +-
 .../MetricsTableAggregateSourceImpl.java        |  43 ++-
 .../regionserver/MetricsTableSourceImpl.java    | 291 +++++++++++++++++--
 .../regionserver/MetricsTableWrapperStub.java   | 110 +++++++
 .../TestMetricsRegionSourceImpl.java            |   5 +
 .../TestMetricsTableSourceImpl.java             |  59 +---
 .../hadoop/hbase/regionserver/HRegion.java      |   3 +-
 .../hbase/regionserver/HRegionServer.java       |   4 +-
 .../hadoop/hbase/regionserver/HStore.java       |   8 +-
 .../hbase/regionserver/MetricsRegionServer.java |  30 +-
 .../regionserver/MetricsRegionWrapperImpl.java  |   5 +
 .../hadoop/hbase/regionserver/MetricsTable.java |  54 +++-
 .../MetricsTableWrapperAggregateImpl.java       | 224 ++++++++------
 .../regionserver/MetricsRegionWrapperStub.java  |   5 +
 .../regionserver/MetricsTableWrapperStub.java   |  67 -----
 .../regionserver/TestMetricsRegionServer.java   |  12 +-
 .../regionserver/TestMetricsTableAggregate.java | 121 ++++++--
 22 files changed, 909 insertions(+), 301 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e903ae5d/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
index 77367e7..b94ee2d 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
@@ -254,6 +254,9 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo
           "action at the RPC Server (Sums 'readRequestsCount' and 'writeRequestsCount'); counts" +
           "once per access whether a Put of 1M rows or a Get that returns 1M Results";
   String READ_REQUEST_COUNT = "readRequestCount";
+  String FILTERED_READ_REQUEST_COUNT = "filteredReadRequestCount";
+  String FILTERED_READ_REQUEST_COUNT_DESC =
+      "Number of read requests this region server has answered.";
   String READ_REQUEST_COUNT_DESC =
       "Number of read requests with non-empty Results that this RegionServer has answered.";
   String READ_REQUEST_RATE_PER_SECOND = "readRequestRatePerSecond";
@@ -262,9 +265,6 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo
   String CP_REQUEST_COUNT = "cpRequestCount";
   String CP_REQUEST_COUNT_DESC =
       "Number of coprocessor service requests this region server has answered.";
-  String FILTERED_READ_REQUEST_COUNT = "filteredReadRequestCount";
-  String FILTERED_READ_REQUEST_COUNT_DESC =
-    "Number of filtered read requests this RegionServer has answered.";
   String WRITE_REQUEST_COUNT = "writeRequestCount";
   String WRITE_REQUEST_COUNT_DESC =
       "Number of mutation requests this RegionServer has answered.";

http://git-wip-us.apache.org/repos/asf/hbase/blob/e903ae5d/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java
index 268f31f..df776b1 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java
@@ -107,6 +107,8 @@ public interface MetricsRegionWrapper {
    */
   long getWriteRequestCount();
 
+  long getTotalRequestCount();
+
   long getNumFilesCompacted();
 
   long getNumBytesCompacted();

http://git-wip-us.apache.org/repos/asf/hbase/blob/e903ae5d/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSource.java
index 39e2372..f746c98 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSource.java
@@ -52,18 +52,16 @@ public interface MetricsTableAggregateSource extends BaseSource {
   String NUMBER_OF_TABLES_DESC = "Number of tables in the metrics system";
 
   /**
-   * Register a MetricsTableSource as being open.
-   *
+   * Returns MetricsTableSource registered for the table. Creates one if not defined.
    * @param table The table name
-   * @param source the source for the table being opened.
    */
-  void register(String table, MetricsTableSource source);
+  MetricsTableSource getOrCreateTableSource(String table, MetricsTableWrapperAggregate wrapper);
 
   /**
    * Remove a table's source. This is called when regions of a table are closed.
    *
    * @param table The table name
    */
-  void deregister(String table);
+  void deleteTableSource(String table);
 
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e903ae5d/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSource.java
index c58fa01..8b62741 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSource.java
@@ -20,25 +20,15 @@ package org.apache.hadoop.hbase.regionserver;
 
 import org.apache.yetus.audience.InterfaceAudience;
 
+import java.io.Closeable;
+
 /**
  * This interface will be implemented to allow region server to push table metrics into
  * MetricsRegionAggregateSource that will in turn push data to the Hadoop metrics system.
  */
 @InterfaceAudience.Private
-public interface MetricsTableSource extends Comparable<MetricsTableSource> {
-
-  String READ_REQUEST_COUNT = "readRequestCount";
-  String READ_REQUEST_COUNT_DESC = "Number of read requests";
-  String CP_REQUEST_COUNT = "cpRequestCount";
-  String CP_REQUEST_COUNT_DESC = "Number of coprocessor service requests";
-  String WRITE_REQUEST_COUNT = "writeRequestCount";
-  String WRITE_REQUEST_COUNT_DESC = "Number of write requests";
-  String TOTAL_REQUEST_COUNT = "totalRequestCount";
-  String TOTAL_REQUEST_COUNT_DESC = "Number of total requests";
-  String MEMSTORE_SIZE = "memstoreSize";
-  String MEMSTORE_SIZE_DESC = "The size of memory stores";
-  String STORE_FILE_SIZE = "storeFileSize";
-  String STORE_FILE_SIZE_DESC = "The size of store files size";
+public interface MetricsTableSource extends Comparable<MetricsTableSource>, Closeable {
+
   String TABLE_SIZE = "tableSize";
   String TABLE_SIZE_DESC = "Total size of the table in the region server";
 
@@ -47,11 +37,83 @@ public interface MetricsTableSource extends Comparable<MetricsTableSource> {
   /**
    * Close the table's metrics as all the region are closing.
    */
+  @Override
   void close();
 
+  void registerMetrics();
+
   /**
    * Get the aggregate source to which this reports.
    */
   MetricsTableAggregateSource getAggregateSource();
 
+  /**
+   * Update the split transaction time histogram
+   * @param t time it took, in milliseconds
+   */
+  void updateSplitTime(long t);
+
+  /**
+   * Increment number of a requested splits
+   */
+  void incrSplitRequest();
+
+  /**
+   * Increment number of successful splits
+   */
+  void incrSplitSuccess();
+
+  /**
+   * Update the flush time histogram
+   * @param t time it took, in milliseconds
+   */
+  void updateFlushTime(long t);
+
+  /**
+   * Update the flush memstore size histogram
+   * @param bytes the number of bytes in the memstore
+   */
+  void updateFlushMemstoreSize(long bytes);
+
+  /**
+   * Update the flush output file size histogram
+   * @param bytes the number of bytes in the output file
+   */
+  void updateFlushOutputSize(long bytes);
+
+  /**
+   * Update the compaction time histogram, both major and minor
+   * @param isMajor whether compaction is a major compaction
+   * @param t time it took, in milliseconds
+   */
+  void updateCompactionTime(boolean isMajor, long t);
+
+  /**
+   * Update the compaction input number of files histogram
+   * @param isMajor whether compaction is a major compaction
+   * @param c number of files
+   */
+  void updateCompactionInputFileCount(boolean isMajor, long c);
+
+  /**
+   * Update the compaction total input file size histogram
+   * @param isMajor whether compaction is a major compaction
+   * @param bytes the number of bytes of the compaction input file
+   */
+  void updateCompactionInputSize(boolean isMajor, long bytes);
+
+  /**
+   * Update the compaction output number of files histogram
+   * @param isMajor whether compaction is a major compaction
+   * @param c number of files
+   */
+  void updateCompactionOutputFileCount(boolean isMajor, long c);
+
+  /**
+   * Update the compaction total output file size
+   * @param isMajor whether compaction is a major compaction
+   * @param bytes the number of bytes of the compaction input file
+   */
+  void updateCompactionOutputSize(boolean isMajor, long bytes);
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e903ae5d/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregate.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregate.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregate.java
index aaa2460..48b3a2c 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregate.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregate.java
@@ -30,7 +30,7 @@ public interface MetricsTableWrapperAggregate {
   /**
    * Get the number of read requests that have been issued against this table
    */
-  long getReadRequestsCount(String table);
+  long getReadRequestCount(String table);
 
   /**
    * Get the number of CoprocessorService requests that have been issued against this table
@@ -38,27 +38,75 @@ public interface MetricsTableWrapperAggregate {
   long getCpRequestsCount(String table);
 
   /**
-   * Get the number of write requests that have been issued against this table
+   * Get the total number of filtered read requests that have been issued against this table
    */
-  long getWriteRequestsCount(String table);
+  long getFilteredReadRequestCount(String table);
+  /**
+   * Get the number of write requests that have been issued for this table
+   */
+  long getWriteRequestCount(String table);
 
   /**
-   * Get the total number of requests that have been issued against this table
+   * Get the total number of requests that have been issued for this table
    */
   long getTotalRequestsCount(String table);
 
   /**
    * Get the memory store size against this table
    */
-  long getMemStoresSize(String table);
+  long getMemStoreSize(String table);
 
   /**
    * Get the store file size against this table
    */
-  long getStoreFilesSize(String table);
+  long getStoreFileSize(String table);
 
   /**
    * Get the table region size against this table
    */
   long getTableSize(String table);
+
+
+  /**
+   * Get the average region size for this table
+   */
+  long getAvgRegionSize(String table);
+
+  /**
+   * Get the number of regions hosted on for this table
+   */
+  long getNumRegions(String table);
+
+  /**
+   * Get the number of stores hosted on for this table
+   */
+  long getNumStores(String table);
+
+  /**
+   * Get the number of store files hosted for this table
+   */
+  long getNumStoreFiles(String table);
+
+  /**
+   * @return Max age of store files for this table
+   */
+  long getMaxStoreFileAge(String table);
+
+  /**
+   * @return Min age of store files for this table
+   */
+  long getMinStoreFileAge(String table);
+
+  /**
+   *  @return Average age of store files for this table
+   */
+  long getAvgStoreFileAge(String table);
+
+  /**
+   *  @return Number of reference files for this table
+   */
+  long getNumReferenceFiles(String table);
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e903ae5d/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
index 7bed36c..44dbc79 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
@@ -54,9 +54,10 @@ public class MetricsRegionServerSourceImpl
   private final MutableFastCounter slowGet;
   private final MutableFastCounter slowIncrement;
   private final MutableFastCounter slowAppend;
+
+  // split related metrics
   private final MutableFastCounter splitRequest;
   private final MutableFastCounter splitSuccess;
-
   private final MetricHistogram splitTimeHisto;
 
   // flush related metrics

http://git-wip-us.apache.org/repos/asf/hbase/blob/e903ae5d/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java
index dc91964..363ddd2 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
 
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
 import org.apache.hadoop.hbase.metrics.Interns;
 import org.apache.hadoop.metrics2.MetricsCollector;
@@ -46,22 +47,46 @@ public class MetricsTableAggregateSourceImpl extends BaseSourceImpl
     super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
   }
 
-  @Override
-  public void register(String table, MetricsTableSource source) {
-    tableSources.put(table, source);
+  private void register(MetricsTableSource source) {
+    synchronized (this) {
+      source.registerMetrics();
+    }
   }
 
   @Override
-  public void deregister(String table) {
+  public void deleteTableSource(String table) {
     try {
-      tableSources.remove(table);
+      synchronized (this) {
+        MetricsTableSource source = tableSources.remove(table);
+        if (source != null) {
+          source.close();
+        }
+      }
     } catch (Exception e) {
       // Ignored. If this errors out it means that someone is double
-      // closing the region source and the region is already nulled out.
-      LOG.info(
-        "Error trying to remove " + table + " from " + this.getClass().getSimpleName(),
-        e);
+      // closing the user source and the user metrics is already nulled out.
+      LOG.info("Error trying to remove " + table + " from " + getClass().getSimpleName(), e);
+    }
+  }
+
+  @Override
+  public MetricsTableSource getOrCreateTableSource(String table,
+      MetricsTableWrapperAggregate wrapper) {
+    MetricsTableSource source = tableSources.get(table);
+    if (source != null) {
+      return source;
+    }
+    source = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
+      .createTable(table, wrapper);
+    MetricsTableSource prev = tableSources.putIfAbsent(table, source);
+
+    if (prev != null) {
+      return prev;
+    } else {
+      // register the new metrics now
+      register(source);
     }
+    return source;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/e903ae5d/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSourceImpl.java
index 0733179..9ac62cd 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSourceImpl.java
@@ -23,11 +23,57 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.metrics.Interns;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricHistogram;
 import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableFastCounter;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTED_INPUT_BYTES;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTED_INPUT_BYTES_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTED_OUTPUT_BYTES;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTED_OUTPUT_BYTES_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTION_INPUT_FILE_COUNT;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTION_INPUT_FILE_COUNT_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTION_INPUT_SIZE;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTION_INPUT_SIZE_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTION_OUTPUT_FILE_COUNT;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTION_OUTPUT_FILE_COUNT_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTION_OUTPUT_SIZE;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTION_OUTPUT_SIZE_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTION_TIME;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTION_TIME_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.FLUSHED_MEMSTORE_BYTES;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.FLUSHED_MEMSTORE_BYTES_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.FLUSHED_OUTPUT_BYTES;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.FLUSHED_OUTPUT_BYTES_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.FLUSH_MEMSTORE_SIZE;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.FLUSH_MEMSTORE_SIZE_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.FLUSH_OUTPUT_SIZE;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.FLUSH_OUTPUT_SIZE_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.FLUSH_TIME;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.FLUSH_TIME_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTED_INPUT_BYTES;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTED_INPUT_BYTES_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTED_OUTPUT_BYTES;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTED_OUTPUT_BYTES_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTION_INPUT_FILE_COUNT;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTION_INPUT_FILE_COUNT_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTION_INPUT_SIZE;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTION_INPUT_SIZE_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTION_OUTPUT_FILE_COUNT;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTION_OUTPUT_FILE_COUNT_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTION_OUTPUT_SIZE;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTION_OUTPUT_SIZE_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTION_TIME;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTION_TIME_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_KEY;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_REQUEST_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_REQUEST_KEY;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_SUCCESS_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_SUCCESS_KEY;
+
 @InterfaceAudience.Private
 public class MetricsTableSourceImpl implements MetricsTableSource {
 
@@ -46,12 +92,41 @@ public class MetricsTableSourceImpl implements MetricsTableSource {
   private final TableName tableName;
   private final int hashCode;
 
+  // split related metrics
+  private MutableFastCounter splitRequest;
+  private MutableFastCounter splitSuccess;
+  private MetricHistogram splitTimeHisto;
+
+  // flush related metrics
+  private MetricHistogram flushTimeHisto;
+  private MetricHistogram flushMemstoreSizeHisto;
+  private MetricHistogram flushOutputSizeHisto;
+  private MutableFastCounter flushedMemstoreBytes;
+  private MutableFastCounter flushedOutputBytes;
+
+  // compaction related metrics
+  private MetricHistogram compactionTimeHisto;
+  private MetricHistogram compactionInputFileCountHisto;
+  private MetricHistogram compactionInputSizeHisto;
+  private MetricHistogram compactionOutputFileCountHisto;
+  private MetricHistogram compactionOutputSizeHisto;
+  private MutableFastCounter compactedInputBytes;
+  private MutableFastCounter compactedOutputBytes;
+
+  private MetricHistogram majorCompactionTimeHisto;
+  private MetricHistogram majorCompactionInputFileCountHisto;
+  private MetricHistogram majorCompactionInputSizeHisto;
+  private MetricHistogram majorCompactionOutputFileCountHisto;
+  private MetricHistogram majorCompactionOutputSizeHisto;
+  private MutableFastCounter majorCompactedInputBytes;
+  private MutableFastCounter majorCompactedOutputBytes;
+
   public MetricsTableSourceImpl(String tblName,
       MetricsTableAggregateSourceImpl aggregate, MetricsTableWrapperAggregate tblWrapperAgg) {
     LOG.debug("Creating new MetricsTableSourceImpl for table ");
     this.tableName = TableName.valueOf(tblName);
     this.agg = aggregate;
-    agg.register(tblName, this);
+
     this.tableWrapperAgg = tblWrapperAgg;
     this.registry = agg.getMetricsRegistry();
     this.tableNamePrefix = "Namespace_" + this.tableName.getNamespaceAsString() +
@@ -60,6 +135,79 @@ public class MetricsTableSourceImpl implements MetricsTableSource {
   }
 
   @Override
+  public synchronized void registerMetrics() {
+    flushTimeHisto = registry.newTimeHistogram(tableNamePrefix + FLUSH_TIME, FLUSH_TIME_DESC);
+    flushMemstoreSizeHisto =
+        registry.newSizeHistogram(tableNamePrefix + FLUSH_MEMSTORE_SIZE, FLUSH_MEMSTORE_SIZE_DESC);
+    flushOutputSizeHisto =
+        registry.newSizeHistogram(tableNamePrefix + FLUSH_OUTPUT_SIZE, FLUSH_OUTPUT_SIZE_DESC);
+    flushedOutputBytes =
+        registry.newCounter(tableNamePrefix + FLUSHED_OUTPUT_BYTES, FLUSHED_OUTPUT_BYTES_DESC, 0L);
+    flushedMemstoreBytes = registry.newCounter(tableNamePrefix + FLUSHED_MEMSTORE_BYTES,
+      FLUSHED_MEMSTORE_BYTES_DESC, 0L);
+
+    compactionTimeHisto =
+        registry.newTimeHistogram(tableNamePrefix + COMPACTION_TIME, COMPACTION_TIME_DESC);
+    compactionInputFileCountHisto = registry.newHistogram(
+      tableNamePrefix + COMPACTION_INPUT_FILE_COUNT, COMPACTION_INPUT_FILE_COUNT_DESC);
+    compactionInputSizeHisto = registry.newSizeHistogram(tableNamePrefix + COMPACTION_INPUT_SIZE,
+      COMPACTION_INPUT_SIZE_DESC);
+    compactionOutputFileCountHisto = registry.newHistogram(
+      tableNamePrefix + COMPACTION_OUTPUT_FILE_COUNT, COMPACTION_OUTPUT_FILE_COUNT_DESC);
+    compactionOutputSizeHisto = registry.newSizeHistogram(tableNamePrefix + COMPACTION_OUTPUT_SIZE,
+      COMPACTION_OUTPUT_SIZE_DESC);
+    compactedInputBytes = registry.newCounter(tableNamePrefix + COMPACTED_INPUT_BYTES,
+      COMPACTED_INPUT_BYTES_DESC, 0L);
+    compactedOutputBytes = registry.newCounter(tableNamePrefix + COMPACTED_OUTPUT_BYTES,
+      COMPACTED_OUTPUT_BYTES_DESC, 0L);
+
+    majorCompactionTimeHisto = registry.newTimeHistogram(tableNamePrefix + MAJOR_COMPACTION_TIME,
+      MAJOR_COMPACTION_TIME_DESC);
+    majorCompactionInputFileCountHisto = registry.newHistogram(
+      tableNamePrefix + MAJOR_COMPACTION_INPUT_FILE_COUNT, MAJOR_COMPACTION_INPUT_FILE_COUNT_DESC);
+    majorCompactionInputSizeHisto = registry.newSizeHistogram(
+      tableNamePrefix + MAJOR_COMPACTION_INPUT_SIZE, MAJOR_COMPACTION_INPUT_SIZE_DESC);
+    majorCompactionOutputFileCountHisto =
+        registry.newHistogram(tableNamePrefix + MAJOR_COMPACTION_OUTPUT_FILE_COUNT,
+          MAJOR_COMPACTION_OUTPUT_FILE_COUNT_DESC);
+    majorCompactionOutputSizeHisto = registry.newSizeHistogram(
+      tableNamePrefix + MAJOR_COMPACTION_OUTPUT_SIZE, MAJOR_COMPACTION_OUTPUT_SIZE_DESC);
+    majorCompactedInputBytes = registry.newCounter(tableNamePrefix + MAJOR_COMPACTED_INPUT_BYTES,
+      MAJOR_COMPACTED_INPUT_BYTES_DESC, 0L);
+    majorCompactedOutputBytes = registry.newCounter(tableNamePrefix + MAJOR_COMPACTED_OUTPUT_BYTES,
+      MAJOR_COMPACTED_OUTPUT_BYTES_DESC, 0L);
+
+    splitTimeHisto = registry.newTimeHistogram(tableNamePrefix + SPLIT_KEY);
+    splitRequest = registry.newCounter(tableNamePrefix + SPLIT_REQUEST_KEY, SPLIT_REQUEST_DESC, 0L);
+    splitSuccess = registry.newCounter(tableNamePrefix + SPLIT_SUCCESS_KEY, SPLIT_SUCCESS_DESC, 0L);
+  }
+
+  private void deregisterMetrics() {
+    registry.removeHistogramMetrics(tableNamePrefix + FLUSH_TIME);
+    registry.removeHistogramMetrics(tableNamePrefix + FLUSH_MEMSTORE_SIZE);
+    registry.removeHistogramMetrics(tableNamePrefix + FLUSH_OUTPUT_SIZE);
+    registry.removeMetric(tableNamePrefix + FLUSHED_OUTPUT_BYTES);
+    registry.removeMetric(tableNamePrefix + FLUSHED_MEMSTORE_BYTES);
+    registry.removeHistogramMetrics(tableNamePrefix + COMPACTION_TIME);
+    registry.removeHistogramMetrics(tableNamePrefix + COMPACTION_INPUT_FILE_COUNT);
+    registry.removeHistogramMetrics(tableNamePrefix + COMPACTION_INPUT_SIZE);
+    registry.removeHistogramMetrics(tableNamePrefix + COMPACTION_OUTPUT_FILE_COUNT);
+    registry.removeHistogramMetrics(tableNamePrefix + COMPACTION_OUTPUT_SIZE);
+    registry.removeMetric(tableNamePrefix + COMPACTED_INPUT_BYTES);
+    registry.removeMetric(tableNamePrefix + COMPACTED_OUTPUT_BYTES);
+    registry.removeHistogramMetrics(tableNamePrefix + MAJOR_COMPACTION_TIME);
+    registry.removeHistogramMetrics(tableNamePrefix + MAJOR_COMPACTION_INPUT_FILE_COUNT);
+    registry.removeHistogramMetrics(tableNamePrefix + MAJOR_COMPACTION_INPUT_SIZE);
+    registry.removeHistogramMetrics(tableNamePrefix + MAJOR_COMPACTION_OUTPUT_FILE_COUNT);
+    registry.removeHistogramMetrics(tableNamePrefix + MAJOR_COMPACTION_OUTPUT_SIZE);
+    registry.removeMetric(tableNamePrefix + MAJOR_COMPACTED_INPUT_BYTES);
+    registry.removeMetric(tableNamePrefix + MAJOR_COMPACTED_OUTPUT_BYTES);
+    registry.removeHistogramMetrics(tableNamePrefix + SPLIT_KEY);
+    registry.removeMetric(tableNamePrefix + SPLIT_REQUEST_KEY);
+    registry.removeMetric(tableNamePrefix + SPLIT_SUCCESS_KEY);
+  }
+
+  @Override
   public void close() {
     boolean wasClosed = closed.getAndSet(true);
 
@@ -70,7 +218,7 @@ public class MetricsTableSourceImpl implements MetricsTableSource {
 
     // Before removing the metrics remove this table from the aggregate table bean.
     // This should mean that it's unlikely that snapshot and close happen at the same time.
-    agg.deregister(tableName.getNameAsString());
+    agg.deleteTableSource(tableName.getNameAsString());
 
     // While it's un-likely that snapshot and close happen at the same time it's still possible.
     // So grab the lock to ensure that all calls to snapshot are done before we remove the metrics
@@ -78,6 +226,7 @@ public class MetricsTableSourceImpl implements MetricsTableSource {
       if (LOG.isTraceEnabled()) {
         LOG.trace("Removing table Metrics for table ");
       }
+      deregisterMetrics();
       tableWrapperAgg = null;
     }
   }
@@ -122,27 +271,55 @@ public class MetricsTableSourceImpl implements MetricsTableSource {
       }
 
       if (this.tableWrapperAgg != null) {
-        mrb.addCounter(Interns.info(tableNamePrefix + MetricsTableSource.READ_REQUEST_COUNT,
-          MetricsTableSource.READ_REQUEST_COUNT_DESC),
-          tableWrapperAgg.getReadRequestsCount(tableName.getNameAsString()));
-        mrb.addCounter(Interns.info(tableNamePrefix + MetricsTableSource.CP_REQUEST_COUNT,
-          MetricsTableSource.CP_REQUEST_COUNT_DESC),
+        mrb.addCounter(Interns.info(tableNamePrefix + MetricsRegionServerSource.CP_REQUEST_COUNT,
+            MetricsRegionServerSource.CP_REQUEST_COUNT_DESC),
           tableWrapperAgg.getCpRequestsCount(tableName.getNameAsString()));
-        mrb.addCounter(Interns.info(tableNamePrefix + MetricsTableSource.WRITE_REQUEST_COUNT,
-          MetricsTableSource.WRITE_REQUEST_COUNT_DESC),
-          tableWrapperAgg.getWriteRequestsCount(tableName.getNameAsString()));
-        mrb.addCounter(Interns.info(tableNamePrefix + MetricsTableSource.TOTAL_REQUEST_COUNT,
-          MetricsTableSource.TOTAL_REQUEST_COUNT_DESC),
-          tableWrapperAgg.getTotalRequestsCount(tableName.getNameAsString()));
-        mrb.addGauge(Interns.info(tableNamePrefix + MetricsTableSource.MEMSTORE_SIZE,
-          MetricsTableSource.MEMSTORE_SIZE_DESC),
-          tableWrapperAgg.getMemStoresSize(tableName.getNameAsString()));
-        mrb.addGauge(Interns.info(tableNamePrefix + MetricsTableSource.STORE_FILE_SIZE,
-          MetricsTableSource.STORE_FILE_SIZE_DESC),
-          tableWrapperAgg.getStoreFilesSize(tableName.getNameAsString()));
+        mrb.addCounter(Interns.info(tableNamePrefix + MetricsRegionServerSource.READ_REQUEST_COUNT,
+            MetricsRegionServerSource.READ_REQUEST_COUNT_DESC),
+            tableWrapperAgg.getReadRequestCount(tableName.getNameAsString()));
+        mrb.addCounter(
+            Interns.info(tableNamePrefix + MetricsRegionServerSource.FILTERED_READ_REQUEST_COUNT,
+                MetricsRegionServerSource.FILTERED_READ_REQUEST_COUNT_DESC),
+            tableWrapperAgg.getFilteredReadRequestCount(tableName.getNameAsString()));
+        mrb.addCounter(Interns.info(tableNamePrefix + MetricsRegionServerSource.WRITE_REQUEST_COUNT,
+            MetricsRegionServerSource.WRITE_REQUEST_COUNT_DESC),
+            tableWrapperAgg.getWriteRequestCount(tableName.getNameAsString()));
+        mrb.addCounter(Interns.info(tableNamePrefix + MetricsRegionServerSource.TOTAL_REQUEST_COUNT,
+            MetricsRegionServerSource.TOTAL_REQUEST_COUNT_DESC),
+            tableWrapperAgg.getTotalRequestsCount(tableName.getNameAsString()));
+        mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.MEMSTORE_SIZE,
+            MetricsRegionServerSource.MEMSTORE_SIZE_DESC),
+            tableWrapperAgg.getMemStoreSize(tableName.getNameAsString()));
+        mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.STOREFILE_COUNT,
+            MetricsRegionServerSource.STOREFILE_COUNT_DESC),
+            tableWrapperAgg.getNumStoreFiles(tableName.getNameAsString()));
+        mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.STOREFILE_SIZE,
+            MetricsRegionServerSource.STOREFILE_SIZE_DESC),
+            tableWrapperAgg.getStoreFileSize(tableName.getNameAsString()));
         mrb.addGauge(Interns.info(tableNamePrefix + MetricsTableSource.TABLE_SIZE,
           MetricsTableSource.TABLE_SIZE_DESC),
           tableWrapperAgg.getTableSize(tableName.getNameAsString()));
+        mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.AVERAGE_REGION_SIZE,
+            MetricsRegionServerSource.AVERAGE_REGION_SIZE_DESC),
+            tableWrapperAgg.getAvgRegionSize(tableName.getNameAsString()));
+        mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.REGION_COUNT,
+            MetricsRegionServerSource.REGION_COUNT_DESC),
+            tableWrapperAgg.getNumRegions(tableName.getNameAsString()));
+        mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.STORE_COUNT,
+            MetricsRegionServerSource.STORE_COUNT_DESC),
+            tableWrapperAgg.getNumStores(tableName.getNameAsString()));
+        mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.MAX_STORE_FILE_AGE,
+            MetricsRegionServerSource.MAX_STORE_FILE_AGE_DESC),
+            tableWrapperAgg.getMaxStoreFileAge(tableName.getNameAsString()));
+        mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.MIN_STORE_FILE_AGE,
+            MetricsRegionServerSource.MIN_STORE_FILE_AGE_DESC),
+            tableWrapperAgg.getMinStoreFileAge(tableName.getNameAsString()));
+        mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.AVG_STORE_FILE_AGE,
+            MetricsRegionServerSource.AVG_STORE_FILE_AGE_DESC),
+            tableWrapperAgg.getAvgStoreFileAge(tableName.getNameAsString()));
+        mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.NUM_REFERENCE_FILES,
+            MetricsRegionServerSource.NUM_REFERENCE_FILES_DESC),
+            tableWrapperAgg.getNumReferenceFiles(tableName.getNameAsString()));
       }
     }
   }
@@ -177,4 +354,80 @@ public class MetricsTableSourceImpl implements MetricsTableSource {
   public String getTableNamePrefix() {
     return tableNamePrefix;
   }
+
+  @Override
+  public void incrSplitRequest() {
+    splitRequest.incr();
+  }
+
+  @Override
+  public void incrSplitSuccess() {
+    splitSuccess.incr();
+  }
+
+  @Override
+  public void updateSplitTime(long t) {
+    splitTimeHisto.add(t);
+  }
+
+  @Override
+  public void updateFlushTime(long t) {
+    flushTimeHisto.add(t);
+  }
+
+  @Override
+  public void updateFlushMemstoreSize(long bytes) {
+    flushMemstoreSizeHisto.add(bytes);
+    flushedMemstoreBytes.incr(bytes);
+  }
+
+  @Override
+  public void updateFlushOutputSize(long bytes) {
+    flushOutputSizeHisto.add(bytes);
+    flushedOutputBytes.incr(bytes);
+  }
+
+  @Override
+  public void updateCompactionTime(boolean isMajor, long t) {
+    compactionTimeHisto.add(t);
+    if (isMajor) {
+      majorCompactionTimeHisto.add(t);
+    }
+  }
+
+  @Override
+  public void updateCompactionInputFileCount(boolean isMajor, long c) {
+    compactionInputFileCountHisto.add(c);
+    if (isMajor) {
+      majorCompactionInputFileCountHisto.add(c);
+    }
+  }
+
+  @Override
+  public void updateCompactionInputSize(boolean isMajor, long bytes) {
+    compactionInputSizeHisto.add(bytes);
+    compactedInputBytes.incr(bytes);
+    if (isMajor) {
+      majorCompactionInputSizeHisto.add(bytes);
+      majorCompactedInputBytes.incr(bytes);
+    }
+  }
+
+  @Override
+  public void updateCompactionOutputFileCount(boolean isMajor, long c) {
+    compactionOutputFileCountHisto.add(c);
+    if (isMajor) {
+      majorCompactionOutputFileCountHisto.add(c);
+    }
+  }
+
+  @Override
+  public void updateCompactionOutputSize(boolean isMajor, long bytes) {
+    compactionOutputSizeHisto.add(bytes);
+    compactedOutputBytes.incr(bytes);
+    if (isMajor) {
+      majorCompactionOutputSizeHisto.add(bytes);
+      majorCompactedOutputBytes.incr(bytes);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e903ae5d/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java
new file mode 100644
index 0000000..4ba0548
--- /dev/null
+++ b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+public class MetricsTableWrapperStub implements MetricsTableWrapperAggregate {
+
+  private String tableName;
+
+  public MetricsTableWrapperStub(String tableName) {
+    this.tableName = tableName;
+  }
+
+  @Override
+  public long getReadRequestCount(String table) {
+    return 10;
+  }
+
+  @Override
+  public long getWriteRequestCount(String table) {
+    return 20;
+  }
+
+  @Override
+  public long getTotalRequestsCount(String table) {
+    return 30;
+  }
+
+  @Override
+  public long getFilteredReadRequestCount(String table) {
+    return 40;
+  }
+
+  @Override
+  public long getMemStoreSize(String table) {
+    return 1000;
+  }
+
+  @Override
+  public long getStoreFileSize(String table) {
+    return 2000;
+  }
+
+  @Override
+  public long getTableSize(String table) {
+    return 3000;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  @Override
+  public long getNumRegions(String table) {
+    return 11;
+  }
+
+  @Override
+  public long getNumStores(String table) {
+    return 22;
+  }
+
+  @Override
+  public long getNumStoreFiles(String table) {
+    return 33;
+  }
+
+  @Override
+  public long getMaxStoreFileAge(String table) {
+    return 44;
+  }
+
+  @Override
+  public long getMinStoreFileAge(String table) {
+    return 55;
+  }
+
+  @Override
+  public long getAvgStoreFileAge(String table) {
+    return 66;
+  }
+
+  @Override
+  public long getNumReferenceFiles(String table) {
+    return 77;
+  }
+
+  @Override
+  public long getAvgRegionSize(String table) {
+    return 88;
+  }
+
+  @Override
+  public long getCpRequestsCount(String table) {return 99;}
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e903ae5d/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java
index 4953b62..6ff8375 100644
--- a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java
+++ b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java
@@ -201,5 +201,10 @@ public class TestMetricsRegionSourceImpl {
     public long getMaxFlushQueueSize() {
       return 0;
     }
+
+    @Override
+    public long getTotalRequestCount() {
+      return 0;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e903ae5d/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableSourceImpl.java b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableSourceImpl.java
index 4c2b40c..11177ed 100644
--- a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableSourceImpl.java
+++ b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableSourceImpl.java
@@ -44,10 +44,13 @@ public class TestMetricsTableSourceImpl {
     MetricsRegionServerSourceFactory metricsFact =
         CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class);
 
-    MetricsTableSource one = metricsFact.createTable("ONETABLE", new TableWrapperStub("ONETABLE"));
-    MetricsTableSource oneClone = metricsFact.createTable("ONETABLE",
-            new TableWrapperStub("ONETABLE"));
-    MetricsTableSource two = metricsFact.createTable("TWOTABLE", new TableWrapperStub("TWOTABLE"));
+    MetricsTableSource one = metricsFact.createTable(
+        "ONETABLE", new MetricsTableWrapperStub("ONETABLE"));
+    MetricsTableSource oneClone = metricsFact.createTable(
+        "ONETABLE",
+            new MetricsTableWrapperStub("ONETABLE"));
+    MetricsTableSource two = metricsFact.createTable(
+        "TWOTABLE", new MetricsTableWrapperStub("TWOTABLE"));
 
     assertEquals(0, one.compareTo(oneClone));
     assertEquals(one.hashCode(), oneClone.hashCode());
@@ -70,54 +73,8 @@ public class TestMetricsTableSourceImpl {
   public void testGetTableMetrics() {
     MetricsTableSource oneTbl =
         CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
-        .createTable("ONETABLE", new TableWrapperStub("ONETABLE"));
+        .createTable("ONETABLE", new MetricsTableWrapperStub("ONETABLE"));
     assertEquals("ONETABLE", oneTbl.getTableName());
   }
 
-  static class TableWrapperStub implements MetricsTableWrapperAggregate {
-    private String tableName;
-
-    public TableWrapperStub(String tableName) {
-      this.tableName = tableName;
-    }
-
-    @Override
-    public long getReadRequestsCount(String table) {
-      return 10;
-    }
-
-    @Override
-    public long getCpRequestsCount(String table) {
-      return 15;
-    }
-
-    @Override
-    public long getWriteRequestsCount(String table) {
-      return 20;
-    }
-
-    @Override
-    public long getTotalRequestsCount(String table) {
-      return 30;
-    }
-
-    @Override
-    public long getMemStoresSize(String table) {
-      return 1000;
-    }
-
-    @Override
-    public long getStoreFilesSize(String table) {
-      return 2000;
-    }
-
-    @Override
-    public long getTableSize(String table) {
-      return 3000;
-    }
-
-    public String getTableName() {
-      return tableName;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e903ae5d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 4a86a03..85785e2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2823,7 +2823,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     status.setStatus(msg);
 
     if (rsServices != null && rsServices.getMetrics() != null) {
-      rsServices.getMetrics().updateFlush(time - startTime,
+      rsServices.getMetrics().updateFlush(getTableDescriptor().getTableName().getNameAsString(),
+          time - startTime,
           mss.getDataSize(), flushedOutputFileSize);
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e903ae5d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 85175be..aec94d4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1524,9 +1524,9 @@ public class HRegionServer extends HasThread implements
       // This call sets up an initialized replication and WAL. Later we start it up.
       setupWALAndReplication();
       // Init in here rather than in constructor after thread name has been set
-      this.metricsRegionServer = new MetricsRegionServer(
-          new MetricsRegionServerWrapperImpl(this), conf);
       this.metricsTable = new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
+      this.metricsRegionServer = new MetricsRegionServer(
+          new MetricsRegionServerWrapperImpl(this), conf, metricsTable);
       // Now that we have a metrics source, start the pause monitor
       this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource());
       pauseMonitor.start();

http://git-wip-us.apache.org/repos/asf/hbase/blob/e903ae5d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index a040184..418eecc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1430,9 +1430,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
     long now = EnvironmentEdgeManager.currentTime();
     if (region.getRegionServerServices() != null
         && region.getRegionServerServices().getMetrics() != null) {
-      region.getRegionServerServices().getMetrics().updateCompaction(cr.isMajor(),
-          now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(),
-          outputBytes);
+      region.getRegionServerServices().getMetrics().updateCompaction(
+          region.getTableDescriptor().getTableName().getNameAsString(),
+          cr.isMajor(), now - compactionStartTime, cr.getFiles().size(),
+          newFiles.size(), cr.getSize(), outputBytes);
+
     }
 
     logCompactionEndMessage(cr, sfs, now, compactionStartTime);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e903ae5d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
index 3ff6131..21534ce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
@@ -46,16 +46,19 @@ public class MetricsRegionServer {
   private MetricsRegionServerSource serverSource;
   private MetricsRegionServerWrapper regionServerWrapper;
   private RegionServerTableMetrics tableMetrics;
+  private final MetricsTable metricsTable;
   private MetricsRegionServerQuotaSource quotaSource;
 
   private MetricRegistry metricRegistry;
   private Timer bulkLoadTimer;
 
-  public MetricsRegionServer(MetricsRegionServerWrapper regionServerWrapper, Configuration conf) {
+  public MetricsRegionServer(MetricsRegionServerWrapper regionServerWrapper, Configuration conf,
+      MetricsTable metricsTable) {
     this(regionServerWrapper,
         CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
             .createServer(regionServerWrapper),
-        createTableMetrics(conf));
+        createTableMetrics(conf),
+        metricsTable);
 
     // Create hbase-metrics module based metrics. The registry should already be registered by the
     // MetricsRegionServerSource
@@ -69,10 +72,12 @@ public class MetricsRegionServer {
 
   MetricsRegionServer(MetricsRegionServerWrapper regionServerWrapper,
                       MetricsRegionServerSource serverSource,
-                      RegionServerTableMetrics tableMetrics) {
+                      RegionServerTableMetrics tableMetrics,
+                      MetricsTable metricsTable) {
     this.regionServerWrapper = regionServerWrapper;
     this.serverSource = serverSource;
     this.tableMetrics = tableMetrics;
+    this.metricsTable = metricsTable;
   }
 
   /**
@@ -196,19 +201,34 @@ public class MetricsRegionServer {
     serverSource.incrSplitSuccess();
   }
 
-  public void updateFlush(long t, long memstoreSize, long fileSize) {
+  public void updateFlush(String table, long t, long memstoreSize, long fileSize) {
     serverSource.updateFlushTime(t);
     serverSource.updateFlushMemStoreSize(memstoreSize);
     serverSource.updateFlushOutputSize(fileSize);
+
+    if (table != null) {
+      metricsTable.updateFlushTime(table, memstoreSize);
+      metricsTable.updateFlushMemstoreSize(table, memstoreSize);
+      metricsTable.updateFlushOutputSize(table, fileSize);
+    }
+
   }
 
-  public void updateCompaction(boolean isMajor, long t, int inputFileCount, int outputFileCount,
+  public void updateCompaction(String table, boolean isMajor, long t, int inputFileCount, int outputFileCount,
       long inputBytes, long outputBytes) {
     serverSource.updateCompactionTime(isMajor, t);
     serverSource.updateCompactionInputFileCount(isMajor, inputFileCount);
     serverSource.updateCompactionOutputFileCount(isMajor, outputFileCount);
     serverSource.updateCompactionInputSize(isMajor, inputBytes);
     serverSource.updateCompactionOutputSize(isMajor, outputBytes);
+
+    if (table != null) {
+      metricsTable.updateCompactionTime(table, isMajor, t);
+      metricsTable.updateCompactionInputFileCount(table, isMajor, inputFileCount);
+      metricsTable.updateCompactionOutputFileCount(table, isMajor, outputFileCount);
+      metricsTable.updateCompactionInputSize(table, isMajor, inputBytes);
+      metricsTable.updateCompactionOutputSize(table, isMajor, outputBytes);
+    }
   }
 
   public void updateBulkLoad(long millis) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/e903ae5d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
index b293cdf..871dbd8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
@@ -167,6 +167,11 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
   }
 
   @Override
+  public long getTotalRequestCount() {
+    return getReadRequestCount() + getWriteRequestCount();
+  }
+
+  @Override
   public long getNumCompactionsFailed() {
     return this.region.compactionsFailed.sum();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e903ae5d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTable.java
index b853c75..a3f0dff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTable.java
@@ -24,19 +24,67 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 public class MetricsTable {
   private final MetricsTableAggregateSource tableSourceAgg;
-  private MetricsTableWrapperAggregate tableWrapperAgg;
+  private MetricsTableWrapperAggregate wrapper;
 
   public MetricsTable(final MetricsTableWrapperAggregate wrapper) {
     tableSourceAgg = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
                                              .getTableAggregate();
-    this.tableWrapperAgg = wrapper;
+    this.wrapper = wrapper;
   }
 
   public MetricsTableWrapperAggregate getTableWrapperAgg() {
-    return tableWrapperAgg;
+    return wrapper;
   }
 
   public MetricsTableAggregateSource getTableSourceAgg() {
     return tableSourceAgg;
   }
+
+  public void incrSplitRequest(String table) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper).incrSplitRequest();
+  }
+
+  public void incrSplitSuccess(String table) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper).incrSplitSuccess();
+  }
+
+  public void updateSplitTime(String table, long t) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper).updateSplitTime(t);
+  }
+
+  public void updateFlushTime(String table, long t) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper).updateFlushTime(t);
+  }
+
+  public void updateFlushMemstoreSize(String table, long bytes) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper).updateFlushMemstoreSize(bytes);
+  }
+
+  public void updateFlushOutputSize(String table, long bytes) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper).updateFlushOutputSize(bytes);
+  }
+
+  public void updateCompactionTime(String table, boolean isMajor, long t) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper).updateCompactionTime(isMajor, t);
+  }
+
+  public void updateCompactionInputFileCount(String table, boolean isMajor, long c) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper)
+      .updateCompactionInputFileCount(isMajor, c);
+  }
+
+  public void updateCompactionInputSize(String table, boolean isMajor, long bytes) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper)
+      .updateCompactionInputSize(isMajor, bytes);
+  }
+
+  public void updateCompactionOutputFileCount(String table, boolean isMajor, long c) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper)
+      .updateCompactionOutputFileCount(isMajor, c);
+  }
+
+  public void updateCompactionOutputSize(String table, boolean isMajor, long bytes) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper)
+      .updateCompactionOutputSize(isMajor, bytes);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e903ae5d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java
index 9c4e9d7..6b97390 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java
@@ -43,7 +43,8 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr
   private Runnable runnable;
   private long period;
   private ScheduledFuture<?> tableMetricsUpdateTask;
-  private ConcurrentHashMap<TableName, MetricsTableValues> metricsTableMap = new ConcurrentHashMap<>();
+  private ConcurrentHashMap<TableName, MetricsTableValues> metricsTableMap
+    = new ConcurrentHashMap<>();
 
   public MetricsTableWrapperAggregateImpl(final HRegionServer regionServer) {
     this.regionServer = regionServer;
@@ -51,8 +52,8 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr
       HConstants.DEFAULT_REGIONSERVER_METRICS_PERIOD) + 1000;
     this.executor = CompatibilitySingletonFactory.getInstance(MetricsExecutor.class).getExecutor();
     this.runnable = new TableMetricsWrapperRunnable();
-    this.tableMetricsUpdateTask = this.executor.scheduleWithFixedDelay(this.runnable, period, this.period,
-      TimeUnit.MILLISECONDS);
+    this.tableMetricsUpdateTask = this.executor.scheduleWithFixedDelay(this.runnable, period,
+      this.period, TimeUnit.MILLISECONDS);
   }
 
   public class TableMetricsWrapperRunnable implements Runnable {
@@ -62,35 +63,43 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr
       Map<TableName, MetricsTableValues> localMetricsTableMap = new HashMap<>();
 
       for (Region r : regionServer.getOnlineRegionsLocalContext()) {
-        TableName tbl= r.getTableDescriptor().getTableName();
-        MetricsTableValues metricsTable = localMetricsTableMap.get(tbl);
-        if (metricsTable == null) {
-          metricsTable = new MetricsTableValues();
-          localMetricsTableMap.put(tbl, metricsTable);
+        TableName tbl = r.getTableDescriptor().getTableName();
+        MetricsTableValues mt = localMetricsTableMap.get(tbl);
+        if (mt == null) {
+          mt = new MetricsTableValues();
+          localMetricsTableMap.put(tbl, mt);
         }
-        long tempStorefilesSize = 0;
-        for (Store store : r.getStores()) {
-          tempStorefilesSize += store.getStorefilesSize();
+        if (r.getStores() != null) {
+          for (Store store : r.getStores()) {
+            mt.storeFileCount += store.getStorefilesCount();
+            mt.memstoreSize += (store.getMemStoreSize().getDataSize() +
+              store.getMemStoreSize().getHeapSize() + store.getMemStoreSize().getOffHeapSize());
+            mt.storeFileSize += store.getStorefilesSize();
+            mt.referenceFileCount += store.getNumReferenceFiles();
+
+            mt.maxStoreFileAge = Math.max(mt.maxStoreFileAge, store.getMaxStoreFileAge().getAsLong());
+            mt.minStoreFileAge = Math.min(mt.minStoreFileAge, store.getMinStoreFileAge().getAsLong());
+            mt.totalStoreFileAge = (long)store.getAvgStoreFileAge().getAsDouble() *
+                store.getStorefilesCount();
+            mt.storeCount += 1;
+          }
+          mt.regionCount += 1;
+
+          mt.readRequestCount += r.getReadRequestsCount();
+          mt.filteredReadRequestCount += getFilteredReadRequestCount(tbl.getNameAsString());
+          mt.writeRequestCount += r.getWriteRequestsCount();
+
         }
-        metricsTable.setMemStoresSize(metricsTable.getMemStoresSize() + r.getMemStoreDataSize());
-        metricsTable.setStoreFilesSize(metricsTable.getStoreFilesSize() + tempStorefilesSize);
-        metricsTable.setTableSize(metricsTable.getMemStoresSize() + metricsTable.getStoreFilesSize());
-        metricsTable.setReadRequestsCount(metricsTable.getReadRequestsCount() + r.getReadRequestsCount());
-        metricsTable.setCpRequestsCount(metricsTable.getCpRequestsCount() + r.getCpRequestsCount());
-        metricsTable.setWriteRequestsCount(metricsTable.getWriteRequestsCount() + r.getWriteRequestsCount());
-        metricsTable.setTotalRequestsCount(metricsTable.getReadRequestsCount()
-            + metricsTable.getWriteRequestsCount() + metricsTable.getCpRequestsCount());
       }
 
-      for(Map.Entry<TableName, MetricsTableValues> entry : localMetricsTableMap.entrySet()) {
+      for (Map.Entry<TableName, MetricsTableValues> entry : localMetricsTableMap.entrySet()) {
         TableName tbl = entry.getKey();
         if (metricsTableMap.get(tbl) == null) {
-          MetricsTableSource tableSource = CompatibilitySingletonFactory
-              .getInstance(MetricsRegionServerSourceFactory.class).createTable(tbl.getNameAsString(),
-                MetricsTableWrapperAggregateImpl.this);
+          // this will add the Wrapper to the list of TableMetrics
           CompatibilitySingletonFactory
-          .getInstance(MetricsRegionServerSourceFactory.class).getTableAggregate()
-          .register(tbl.getNameAsString(), tableSource);
+              .getInstance(MetricsRegionServerSourceFactory.class)
+              .getTableAggregate()
+              .getOrCreateTableSource(tbl.getNameAsString(), MetricsTableWrapperAggregateImpl.this);
         }
         metricsTableMap.put(entry.getKey(), entry.getValue());
       }
@@ -99,7 +108,7 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr
       MetricsTableAggregateSource agg = CompatibilitySingletonFactory
           .getInstance(MetricsRegionServerSourceFactory.class).getTableAggregate();
       for (TableName table : existingTableNames) {
-        agg.deregister(table.getNameAsString());
+        agg.deleteTableSource(table.getNameAsString());
         if (metricsTableMap.get(table) != null) {
           metricsTableMap.remove(table);
         }
@@ -108,12 +117,12 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr
   }
 
   @Override
-  public long getReadRequestsCount(String table) {
+  public long getReadRequestCount(String table) {
     MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
     if (metricsTable == null) {
       return 0;
     } else {
-      return metricsTable.getReadRequestsCount();
+      return metricsTable.readRequestCount;
     }
   }
 
@@ -123,17 +132,25 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr
     if (metricsTable == null) {
       return 0;
     } else {
-      return metricsTable.getCpRequestsCount();
+      return metricsTable.cpRequestCount;
     }
   }
 
+  public long getFilteredReadRequestCount(String table) {
+    MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
+    if (metricsTable == null) {
+      return 0;
+    }
+    return metricsTable.filteredReadRequestCount;
+  }
+
   @Override
-  public long getWriteRequestsCount(String table) {
+  public long getWriteRequestCount(String table) {
     MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
     if (metricsTable == null) {
       return 0;
     } else {
-      return metricsTable.getWriteRequestsCount();
+      return metricsTable.writeRequestCount;
     }
   }
 
@@ -143,27 +160,27 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr
     if (metricsTable == null) {
       return 0;
     } else {
-      return metricsTable.getTotalRequestsCount();
+      return metricsTable.readRequestCount + metricsTable.writeRequestCount;
     }
   }
 
   @Override
-  public long getMemStoresSize(String table) {
+  public long getMemStoreSize(String table) {
     MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
     if (metricsTable == null) {
       return 0;
     } else {
-      return metricsTable.getMemStoresSize();
+      return metricsTable.memstoreSize;
     }
   }
 
   @Override
-  public long getStoreFilesSize(String table) {
+  public long getStoreFileSize(String table) {
     MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
     if (metricsTable == null) {
       return 0;
     } else {
-      return metricsTable.getStoreFilesSize();
+      return metricsTable.storeFileSize;
     }
   }
 
@@ -173,80 +190,113 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr
     if (metricsTable == null) {
       return 0;
     } else {
-      return metricsTable.getTableSize();
+      return metricsTable.memstoreSize + metricsTable.storeFileSize;
     }
   }
 
-  @Override
-  public void close() throws IOException {
-    tableMetricsUpdateTask.cancel(true);
-  }
-
-  private static class MetricsTableValues {
-
-    private long totalRequestsCount;
-    private long readRequestsCount;
-    private long cpRequestsCount;
-    private long writeRequestsCount;
-    private long memstoresSize;
-    private long storeFilesSize;
-    private long tableSize;
-
-    public long getTotalRequestsCount() {
-      return totalRequestsCount;
-    }
-
-    public void setTotalRequestsCount(long totalRequestsCount) {
-      this.totalRequestsCount = totalRequestsCount;
-    }
-
-    public long getReadRequestsCount() {
-      return readRequestsCount;
+  public long getNumRegions(String table) {
+    MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
+    if (metricsTable == null) {
+      return 0;
     }
+    return metricsTable.regionCount;
+  }
 
-    public void setReadRequestsCount(long readRequestsCount) {
-      this.readRequestsCount = readRequestsCount;
+  @Override
+  public long getNumStores(String table) {
+    MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
+    if (metricsTable == null) {
+      return 0;
     }
+    return metricsTable.storeCount;
+  }
 
-    public long getCpRequestsCount() {
-      return cpRequestsCount;
+  @Override
+  public long getNumStoreFiles(String table) {
+    MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
+    if (metricsTable == null) {
+      return 0;
     }
+    return metricsTable.storeFileCount;
+  }
 
-    public void setCpRequestsCount(long cpRequestsCount) {
-      this.cpRequestsCount = cpRequestsCount;
+  @Override
+  public long getMaxStoreFileAge(String table) {
+    MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
+    if (metricsTable == null) {
+      return 0;
     }
+    return metricsTable.maxStoreFileAge;
+  }
 
-    public long getWriteRequestsCount() {
-      return writeRequestsCount;
+  @Override
+  public long getMinStoreFileAge(String table) {
+    MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
+    if (metricsTable == null) {
+      return 0;
     }
+    return metricsTable.minStoreFileAge == Long.MAX_VALUE ? 0 : metricsTable.minStoreFileAge;
+  }
 
-    public void setWriteRequestsCount(long writeRequestsCount) {
-      this.writeRequestsCount = writeRequestsCount;
+  @Override
+  public long getAvgStoreFileAge(String table) {
+    MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
+    if (metricsTable == null) {
+      return 0;
     }
 
-    public long getMemStoresSize() {
-      return memstoresSize;
-    }
+    return metricsTable.storeFileCount == 0
+        ? 0
+        : (metricsTable.totalStoreFileAge / metricsTable.storeFileCount);
+  }
 
-    public void setMemStoresSize(long memstoresSize) {
-      this.memstoresSize = memstoresSize;
+  @Override
+  public long getNumReferenceFiles(String table) {
+    MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
+    if (metricsTable == null) {
+      return 0;
     }
+    return metricsTable.referenceFileCount;
+  }
 
-    public long getStoreFilesSize() {
-      return storeFilesSize;
+  @Override
+  public long getAvgRegionSize(String table) {
+    MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
+    if (metricsTable == null) {
+      return 0;
     }
+    return metricsTable.regionCount == 0
+        ? 0
+        : (metricsTable.memstoreSize + metricsTable.storeFileSize) / metricsTable.regionCount;
+  }
 
-    public void setStoreFilesSize(long storeFilesSize) {
-      this.storeFilesSize = storeFilesSize;
+  public long getCpRequestCount(String table) {
+    MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
+    if (metricsTable == null) {
+      return 0;
     }
+    return metricsTable.cpRequestCount;
+  }
 
-    public long getTableSize() {
-      return tableSize;
-    }
+  @Override
+  public void close() throws IOException {
+    tableMetricsUpdateTask.cancel(true);
+  }
 
-    public void setTableSize(long tableSize) {
-      this.tableSize = tableSize;
-    }
+  private static class MetricsTableValues {
+    long readRequestCount;
+    long filteredReadRequestCount;
+    long writeRequestCount;
+    long memstoreSize;
+    long regionCount;
+    long storeCount;
+    long storeFileCount;
+    long storeFileSize;
+    long maxStoreFileAge;
+    long minStoreFileAge = Long.MAX_VALUE;
+    long totalStoreFileAge;
+    long referenceFileCount;
+    long cpRequestCount;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e903ae5d/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java
index b15ffc2..e544206 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java
@@ -167,4 +167,9 @@ public class MetricsRegionWrapperStub implements MetricsRegionWrapper {
   public long getMaxFlushQueueSize() {
     return 6;
   }
+
+  @Override
+  public long getTotalRequestCount() {
+    return 0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e903ae5d/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java
deleted file mode 100644
index 785115b..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.regionserver;
-
-public class MetricsTableWrapperStub implements MetricsTableWrapperAggregate {
-
-  private String tableName;
-
-  public MetricsTableWrapperStub(String tableName) {
-    this.tableName = tableName;
-  }
-
-  @Override
-  public long getReadRequestsCount(String table) {
-    return 10;
-  }
-
-  @Override
-  public long getCpRequestsCount(String table) {
-    return 15;
-  }
-
-  @Override
-  public long getWriteRequestsCount(String table) {
-    return 20;
-  }
-
-  @Override
-  public long getTotalRequestsCount(String table) {
-    return 30;
-  }
-
-  @Override
-  public long getMemStoresSize(String table) {
-    return 1000;
-  }
-
-  @Override
-  public long getStoreFilesSize(String table) {
-    return 2000;
-  }
-
-  @Override
-  public long getTableSize(String table) {
-    return 3000;
-  }
-
-  public String getTableName() {
-    return tableName;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e903ae5d/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java
index 369c206..fa8ea46 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java
@@ -57,7 +57,7 @@ public class TestMetricsRegionServer {
   @Before
   public void setUp() {
     wrapper = new MetricsRegionServerWrapperStub();
-    rsm = new MetricsRegionServer(wrapper, new Configuration(false));
+    rsm = new MetricsRegionServer(wrapper, new Configuration(false), null);
     serverSource = rsm.getMetricsSource();
   }
 
@@ -185,14 +185,14 @@ public class TestMetricsRegionServer {
 
   @Test
   public void testFlush() {
-    rsm.updateFlush(1, 2, 3);
+    rsm.updateFlush(null, 1, 2, 3);
     HELPER.assertCounter("flushTime_num_ops", 1, serverSource);
     HELPER.assertCounter("flushMemstoreSize_num_ops", 1, serverSource);
     HELPER.assertCounter("flushOutputSize_num_ops", 1, serverSource);
     HELPER.assertCounter("flushedMemstoreBytes", 2, serverSource);
     HELPER.assertCounter("flushedOutputBytes", 3, serverSource);
 
-    rsm.updateFlush(10, 20, 30);
+    rsm.updateFlush(null, 10, 20, 30);
     HELPER.assertCounter("flushTimeNumOps", 2, serverSource);
     HELPER.assertCounter("flushMemstoreSize_num_ops", 2, serverSource);
     HELPER.assertCounter("flushOutputSize_num_ops", 2, serverSource);
@@ -202,7 +202,7 @@ public class TestMetricsRegionServer {
 
   @Test
   public void testCompaction() {
-    rsm.updateCompaction(false, 1, 2, 3, 4, 5);
+    rsm.updateCompaction(null, false, 1, 2, 3, 4, 5);
     HELPER.assertCounter("compactionTime_num_ops", 1, serverSource);
     HELPER.assertCounter("compactionInputFileCount_num_ops", 1, serverSource);
     HELPER.assertCounter("compactionInputSize_num_ops", 1, serverSource);
@@ -210,7 +210,7 @@ public class TestMetricsRegionServer {
     HELPER.assertCounter("compactedInputBytes", 4, serverSource);
     HELPER.assertCounter("compactedoutputBytes", 5, serverSource);
 
-    rsm.updateCompaction(false, 10, 20, 30, 40, 50);
+    rsm.updateCompaction(null, false, 10, 20, 30, 40, 50);
     HELPER.assertCounter("compactionTime_num_ops", 2, serverSource);
     HELPER.assertCounter("compactionInputFileCount_num_ops", 2, serverSource);
     HELPER.assertCounter("compactionInputSize_num_ops", 2, serverSource);
@@ -219,7 +219,7 @@ public class TestMetricsRegionServer {
     HELPER.assertCounter("compactedoutputBytes", 55, serverSource);
 
     // do major compaction
-    rsm.updateCompaction(true, 100, 200, 300, 400, 500);
+    rsm.updateCompaction(null, true, 100, 200, 300, 400, 500);
 
     HELPER.assertCounter("compactionTime_num_ops", 3, serverSource);
     HELPER.assertCounter("compactionInputFileCount_num_ops", 3, serverSource);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e903ae5d/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableAggregate.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableAggregate.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableAggregate.java
index 969358d..b19d4b0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableAggregate.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableAggregate.java
@@ -18,12 +18,14 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CompatibilityFactory;
-import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.test.MetricsAssertHelper;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -33,27 +35,108 @@ public class TestMetricsTableAggregate {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestMetricsTableAggregate.class);
+    HBaseClassTestRule.forClass(TestMetricsTableAggregate.class);
 
   public static MetricsAssertHelper HELPER =
-      CompatibilityFactory.getInstance(MetricsAssertHelper.class);
+    CompatibilityFactory.getInstance(MetricsAssertHelper.class);
+
+  private String tableName = "testTableMetrics";
+  private String pre = "Namespace_default_table_" + tableName + "_metric_";
+
+  private MetricsTableWrapperStub tableWrapper;
+  private MetricsTable mt;
+  private MetricsRegionServerWrapper rsWrapper;
+  private MetricsRegionServer rsm;
+  private MetricsTableAggregateSource agg;
+
+  @BeforeClass
+  public static void classSetUp() {
+    HELPER.init();
+  }
+
+  @Before
+  public void setUp() {
+    tableWrapper = new MetricsTableWrapperStub(tableName);
+    mt = new MetricsTable(tableWrapper);
+    rsWrapper = new MetricsRegionServerWrapperStub();
+    Configuration conf = new Configuration();
+    rsm = new MetricsRegionServer(rsWrapper, conf, mt);
+    agg = mt.getTableSourceAgg();
+  }
 
   @Test
-  public void testTableWrapperAggregateMetrics() throws IOException {
-    String tableName = "testTableMetrics";
-    MetricsTableWrapperStub tableWrapper = new MetricsTableWrapperStub(tableName);
-    CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
-        .createTable(tableName, tableWrapper);
-    MetricsTableAggregateSource agg = CompatibilitySingletonFactory
-        .getInstance(MetricsRegionServerSourceFactory.class).getTableAggregate();
-
-    HELPER.assertCounter("Namespace_default_table_testTableMetrics_metric_readRequestCount", 10, agg);
-    HELPER.assertCounter("Namespace_default_table_testTableMetrics_metric_cpRequestCount", 15, agg);
-    HELPER.assertCounter("Namespace_default_table_testTableMetrics_metric_writeRequestCount", 20, agg);
-    HELPER.assertCounter("Namespace_default_table_testTableMetrics_metric_totalRequestCount", 30, agg);
-
-    HELPER.assertGauge("Namespace_default_table_testTableMetrics_metric_memstoreSize", 1000, agg);
-    HELPER.assertGauge("Namespace_default_table_testTableMetrics_metric_storeFileSize", 2000, agg);
-    HELPER.assertGauge("Namespace_default_table_testTableMetrics_metric_tableSize", 3000, agg);
+  public void testRequestMetrics() throws IOException {
+    HELPER.assertCounter(pre + "readRequestCount", 10, agg);
+    HELPER.assertCounter(pre + "writeRequestCount", 20, agg);
+    HELPER.assertCounter(pre + "totalRequestCount", 30, agg);
   }
+
+  @Test
+  public void testRegionAndStoreMetrics() throws IOException {
+    HELPER.assertGauge(pre + "memstoreSize", 1000, agg);
+    HELPER.assertGauge(pre + "storeFileSize", 2000, agg);
+    HELPER.assertGauge(pre + "tableSize", 3000, agg);
+
+    HELPER.assertGauge(pre + "regionCount", 11, agg);
+    HELPER.assertGauge(pre + "storeCount", 22, agg);
+    HELPER.assertGauge(pre + "storeFileCount", 33, agg);
+    HELPER.assertGauge(pre + "maxStoreFileAge", 44, agg);
+    HELPER.assertGauge(pre + "minStoreFileAge", 55, agg);
+    HELPER.assertGauge(pre + "avgStoreFileAge", 66, agg);
+    HELPER.assertGauge(pre + "numReferenceFiles", 77, agg);
+    HELPER.assertGauge(pre + "averageRegionSize", 88, agg);
+  }
+
+  public void testFlush() {
+    rsm.updateFlush(tableName, 1, 2, 3);
+    HELPER.assertCounter(pre + "flushTime_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "flushMemstoreSize_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "flushOutputSize_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "flushedMemstoreBytes", 2, agg);
+    HELPER.assertCounter(pre + "flushedOutputBytes", 3, agg);
+
+    rsm.updateFlush(tableName, 10, 20, 30);
+    HELPER.assertCounter(pre + "flushTime_num_ops", 2, agg);
+    HELPER.assertCounter(pre + "flushMemstoreSize_num_ops", 2, agg);
+    HELPER.assertCounter(pre + "flushOutputSize_num_ops", 2, agg);
+    HELPER.assertCounter(pre + "flushedMemstoreBytes", 22, agg);
+    HELPER.assertCounter(pre + "flushedOutputBytes", 33, agg);
+  }
+
+  @Test
+  public void testCompaction() {
+    rsm.updateCompaction(tableName, false, 1, 2, 3, 4, 5);
+    HELPER.assertCounter(pre + "compactionTime_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "compactionInputFileCount_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "compactionInputSize_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "compactionOutputFileCount_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "compactedInputBytes", 4, agg);
+    HELPER.assertCounter(pre + "compactedoutputBytes", 5, agg);
+
+    rsm.updateCompaction(tableName, false, 10, 20, 30, 40, 50);
+    HELPER.assertCounter(pre + "compactionTime_num_ops", 2, agg);
+    HELPER.assertCounter(pre + "compactionInputFileCount_num_ops", 2, agg);
+    HELPER.assertCounter(pre + "compactionInputSize_num_ops", 2, agg);
+    HELPER.assertCounter(pre + "compactionOutputFileCount_num_ops", 2, agg);
+    HELPER.assertCounter(pre + "compactedInputBytes", 44, agg);
+    HELPER.assertCounter(pre + "compactedoutputBytes", 55, agg);
+
+    // do major compaction
+    rsm.updateCompaction(tableName, true, 100, 200, 300, 400, 500);
+
+    HELPER.assertCounter(pre + "compactionTime_num_ops", 3, agg);
+    HELPER.assertCounter(pre + "compactionInputFileCount_num_ops", 3, agg);
+    HELPER.assertCounter(pre + "compactionInputSize_num_ops", 3, agg);
+    HELPER.assertCounter(pre + "compactionOutputFileCount_num_ops", 3, agg);
+    HELPER.assertCounter(pre + "compactedInputBytes", 444, agg);
+    HELPER.assertCounter(pre + "compactedoutputBytes", 555, agg);
+
+    HELPER.assertCounter(pre + "majorCompactionTime_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "majorCompactionInputFileCount_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "majorCompactionInputSize_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "majorCompactionOutputFileCount_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "majorCompactedInputBytes", 400, agg);
+    HELPER.assertCounter(pre + "majorCompactedoutputBytes", 500, agg);
+  }
+
 }


[2/3] hbase git commit: HBASE-15728 Add Per-Table metrics back

Posted by ap...@apache.org.
HBASE-15728 Add Per-Table metrics back

Signed-off-by: Andrew Purtell <ap...@apache.org>

Conflicts:
	hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
	hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSource.java
	hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregate.java
	hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSourceImpl.java
	hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java
	hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableSourceImpl.java
	hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
	hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableAggregate.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/72a29211
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/72a29211
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/72a29211

Branch: refs/heads/branch-2
Commit: 72a29211cc3bb2d848d1744aa63481856962a189
Parents: 737ac48
Author: Xu Cang <xc...@salesforce.com>
Authored: Wed Aug 29 15:08:29 2018 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Wed Aug 29 16:27:15 2018 -0700

----------------------------------------------------------------------
 .../regionserver/MetricsRegionServerSource.java |   6 +-
 .../regionserver/MetricsRegionWrapper.java      |   2 +
 .../MetricsTableAggregateSource.java            |   8 +-
 .../hbase/regionserver/MetricsTableSource.java  |  88 +++++-
 .../MetricsTableWrapperAggregate.java           |  58 +++-
 .../MetricsRegionServerSourceImpl.java          |   3 +-
 .../MetricsTableAggregateSourceImpl.java        |  43 ++-
 .../regionserver/MetricsTableSourceImpl.java    | 287 +++++++++++++++++--
 .../regionserver/MetricsTableWrapperStub.java   | 107 +++++++
 .../TestMetricsRegionSourceImpl.java            |   5 +
 .../TestMetricsTableSourceImpl.java             |  54 +---
 .../hadoop/hbase/regionserver/HRegion.java      |   3 +-
 .../hbase/regionserver/HRegionServer.java       |   4 +-
 .../hadoop/hbase/regionserver/HStore.java       |   8 +-
 .../hbase/regionserver/MetricsRegionServer.java |  30 +-
 .../regionserver/MetricsRegionWrapperImpl.java  |   5 +
 .../hadoop/hbase/regionserver/MetricsTable.java |  54 +++-
 .../MetricsTableWrapperAggregateImpl.java       | 241 ++++++++++------
 .../regionserver/MetricsRegionWrapperStub.java  |   5 +
 .../regionserver/MetricsTableWrapperStub.java   |  62 ----
 .../regionserver/TestMetricsRegionServer.java   |  12 +-
 .../regionserver/TestMetricsTableAggregate.java | 120 ++++++--
 22 files changed, 920 insertions(+), 285 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/72a29211/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
index 9c0a7b4..fc9cde7 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
@@ -254,14 +254,14 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo
           "action at the RPC Server (Sums 'readRequestsCount' and 'writeRequestsCount'); counts" +
           "once per access whether a Put of 1M rows or a Get that returns 1M Results";
   String READ_REQUEST_COUNT = "readRequestCount";
+  String FILTERED_READ_REQUEST_COUNT = "filteredReadRequestCount";
+  String FILTERED_READ_REQUEST_COUNT_DESC =
+      "Number of read requests this region server has answered.";
   String READ_REQUEST_COUNT_DESC =
       "Number of read requests with non-empty Results that this RegionServer has answered.";
   String READ_REQUEST_RATE_PER_SECOND = "readRequestRatePerSecond";
   String READ_REQUEST_RATE_DESC =
       "Rate of answering the read requests by this region server per second.";
-  String FILTERED_READ_REQUEST_COUNT = "filteredReadRequestCount";
-  String FILTERED_READ_REQUEST_COUNT_DESC =
-    "Number of filtered read requests this RegionServer has answered.";
   String WRITE_REQUEST_COUNT = "writeRequestCount";
   String WRITE_REQUEST_COUNT_DESC =
       "Number of mutation requests this RegionServer has answered.";

http://git-wip-us.apache.org/repos/asf/hbase/blob/72a29211/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java
index 491c1a9..5e98c61 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java
@@ -102,6 +102,8 @@ public interface MetricsRegionWrapper {
    */
   long getWriteRequestCount();
 
+  long getTotalRequestCount();
+
   long getNumFilesCompacted();
 
   long getNumBytesCompacted();

http://git-wip-us.apache.org/repos/asf/hbase/blob/72a29211/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSource.java
index 39e2372..f746c98 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSource.java
@@ -52,18 +52,16 @@ public interface MetricsTableAggregateSource extends BaseSource {
   String NUMBER_OF_TABLES_DESC = "Number of tables in the metrics system";
 
   /**
-   * Register a MetricsTableSource as being open.
-   *
+   * Returns MetricsTableSource registered for the table. Creates one if not defined.
    * @param table The table name
-   * @param source the source for the table being opened.
    */
-  void register(String table, MetricsTableSource source);
+  MetricsTableSource getOrCreateTableSource(String table, MetricsTableWrapperAggregate wrapper);
 
   /**
    * Remove a table's source. This is called when regions of a table are closed.
    *
    * @param table The table name
    */
-  void deregister(String table);
+  void deleteTableSource(String table);
 
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/72a29211/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSource.java
index b8476bf..8b62741 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSource.java
@@ -20,23 +20,15 @@ package org.apache.hadoop.hbase.regionserver;
 
 import org.apache.yetus.audience.InterfaceAudience;
 
+import java.io.Closeable;
+
 /**
  * This interface will be implemented to allow region server to push table metrics into
  * MetricsRegionAggregateSource that will in turn push data to the Hadoop metrics system.
  */
 @InterfaceAudience.Private
-public interface MetricsTableSource extends Comparable<MetricsTableSource> {
-
-  String READ_REQUEST_COUNT = "readRequestCount";
-  String READ_REQUEST_COUNT_DESC = "Number of read requests";
-  String WRITE_REQUEST_COUNT = "writeRequestCount";
-  String WRITE_REQUEST_COUNT_DESC = "Number of write requests";
-  String TOTAL_REQUEST_COUNT = "totalRequestCount";
-  String TOTAL_REQUEST_COUNT_DESC = "Number of total requests";
-  String MEMSTORE_SIZE = "memstoreSize";
-  String MEMSTORE_SIZE_DESC = "The size of memory stores";
-  String STORE_FILE_SIZE = "storeFileSize";
-  String STORE_FILE_SIZE_DESC = "The size of store files size";
+public interface MetricsTableSource extends Comparable<MetricsTableSource>, Closeable {
+
   String TABLE_SIZE = "tableSize";
   String TABLE_SIZE_DESC = "Total size of the table in the region server";
 
@@ -45,11 +37,83 @@ public interface MetricsTableSource extends Comparable<MetricsTableSource> {
   /**
    * Close the table's metrics as all the region are closing.
    */
+  @Override
   void close();
 
+  void registerMetrics();
+
   /**
    * Get the aggregate source to which this reports.
    */
   MetricsTableAggregateSource getAggregateSource();
 
+  /**
+   * Update the split transaction time histogram
+   * @param t time it took, in milliseconds
+   */
+  void updateSplitTime(long t);
+
+  /**
+   * Increment number of a requested splits
+   */
+  void incrSplitRequest();
+
+  /**
+   * Increment number of successful splits
+   */
+  void incrSplitSuccess();
+
+  /**
+   * Update the flush time histogram
+   * @param t time it took, in milliseconds
+   */
+  void updateFlushTime(long t);
+
+  /**
+   * Update the flush memstore size histogram
+   * @param bytes the number of bytes in the memstore
+   */
+  void updateFlushMemstoreSize(long bytes);
+
+  /**
+   * Update the flush output file size histogram
+   * @param bytes the number of bytes in the output file
+   */
+  void updateFlushOutputSize(long bytes);
+
+  /**
+   * Update the compaction time histogram, both major and minor
+   * @param isMajor whether compaction is a major compaction
+   * @param t time it took, in milliseconds
+   */
+  void updateCompactionTime(boolean isMajor, long t);
+
+  /**
+   * Update the compaction input number of files histogram
+   * @param isMajor whether compaction is a major compaction
+   * @param c number of files
+   */
+  void updateCompactionInputFileCount(boolean isMajor, long c);
+
+  /**
+   * Update the compaction total input file size histogram
+   * @param isMajor whether compaction is a major compaction
+   * @param bytes the number of bytes of the compaction input file
+   */
+  void updateCompactionInputSize(boolean isMajor, long bytes);
+
+  /**
+   * Update the compaction output number of files histogram
+   * @param isMajor whether compaction is a major compaction
+   * @param c number of files
+   */
+  void updateCompactionOutputFileCount(boolean isMajor, long c);
+
+  /**
+   * Update the compaction total output file size
+   * @param isMajor whether compaction is a major compaction
+   * @param bytes the number of bytes of the compaction input file
+   */
+  void updateCompactionOutputSize(boolean isMajor, long bytes);
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/72a29211/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregate.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregate.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregate.java
index e1d2ab2..bf8b4c9 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregate.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregate.java
@@ -30,30 +30,78 @@ public interface MetricsTableWrapperAggregate {
   /**
    * Get the number of read requests that have been issued against this table
    */
-  long getReadRequestsCount(String table);
+  long getReadRequestCount(String table);
 
   /**
    * Get the number of write requests that have been issued against this table
    */
-  long getWriteRequestsCount(String table);
+  long getFilteredReadRequestCount(String table);
+  /**
+   * Get the number of write requests that have been issued for this table
+   */
+  long getWriteRequestCount(String table);
 
   /**
-   * Get the total number of requests that have been issued against this table
+   * Get the total number of requests that have been issued for this table
    */
   long getTotalRequestsCount(String table);
 
   /**
    * Get the memory store size against this table
    */
-  long getMemStoresSize(String table);
+  long getMemStoreSize(String table);
 
   /**
    * Get the store file size against this table
    */
-  long getStoreFilesSize(String table);
+  long getStoreFileSize(String table);
 
   /**
    * Get the table region size against this table
    */
   long getTableSize(String table);
+
+
+  /**
+   * Get the average region size for this table
+   */
+  long getAvgRegionSize(String table);
+
+  /**
+   * Get the number of regions hosted on for this table
+   */
+  long getNumRegions(String table);
+
+  /**
+   * Get the number of stores hosted on for this table
+   */
+  long getNumStores(String table);
+
+  /**
+   * Get the number of store files hosted for this table
+   */
+  long getNumStoreFiles(String table);
+
+  /**
+   * @return Max age of store files for this table
+   */
+  long getMaxStoreFileAge(String table);
+
+  /**
+   * @return Min age of store files for this table
+   */
+  long getMinStoreFileAge(String table);
+
+  /**
+   *  @return Average age of store files for this table
+   */
+  long getAvgStoreFileAge(String table);
+
+  /**
+   *  @return Number of reference files for this table
+   */
+  long getNumReferenceFiles(String table);
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/72a29211/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
index 88d9e06..2551d17 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
@@ -54,9 +54,10 @@ public class MetricsRegionServerSourceImpl
   private final MutableFastCounter slowGet;
   private final MutableFastCounter slowIncrement;
   private final MutableFastCounter slowAppend;
+
+  // split related metrics
   private final MutableFastCounter splitRequest;
   private final MutableFastCounter splitSuccess;
-
   private final MetricHistogram splitTimeHisto;
 
   // flush related metrics

http://git-wip-us.apache.org/repos/asf/hbase/blob/72a29211/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java
index dc91964..363ddd2 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
 
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
 import org.apache.hadoop.hbase.metrics.Interns;
 import org.apache.hadoop.metrics2.MetricsCollector;
@@ -46,22 +47,46 @@ public class MetricsTableAggregateSourceImpl extends BaseSourceImpl
     super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
   }
 
-  @Override
-  public void register(String table, MetricsTableSource source) {
-    tableSources.put(table, source);
+  private void register(MetricsTableSource source) {
+    synchronized (this) {
+      source.registerMetrics();
+    }
   }
 
   @Override
-  public void deregister(String table) {
+  public void deleteTableSource(String table) {
     try {
-      tableSources.remove(table);
+      synchronized (this) {
+        MetricsTableSource source = tableSources.remove(table);
+        if (source != null) {
+          source.close();
+        }
+      }
     } catch (Exception e) {
       // Ignored. If this errors out it means that someone is double
-      // closing the region source and the region is already nulled out.
-      LOG.info(
-        "Error trying to remove " + table + " from " + this.getClass().getSimpleName(),
-        e);
+      // closing the user source and the user metrics is already nulled out.
+      LOG.info("Error trying to remove " + table + " from " + getClass().getSimpleName(), e);
+    }
+  }
+
+  @Override
+  public MetricsTableSource getOrCreateTableSource(String table,
+      MetricsTableWrapperAggregate wrapper) {
+    MetricsTableSource source = tableSources.get(table);
+    if (source != null) {
+      return source;
+    }
+    source = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
+      .createTable(table, wrapper);
+    MetricsTableSource prev = tableSources.putIfAbsent(table, source);
+
+    if (prev != null) {
+      return prev;
+    } else {
+      // register the new metrics now
+      register(source);
     }
+    return source;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/72a29211/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSourceImpl.java
index 37653cd..da78a2c 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSourceImpl.java
@@ -23,11 +23,57 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.metrics.Interns;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricHistogram;
 import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableFastCounter;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTED_INPUT_BYTES;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTED_INPUT_BYTES_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTED_OUTPUT_BYTES;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTED_OUTPUT_BYTES_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTION_INPUT_FILE_COUNT;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTION_INPUT_FILE_COUNT_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTION_INPUT_SIZE;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTION_INPUT_SIZE_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTION_OUTPUT_FILE_COUNT;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTION_OUTPUT_FILE_COUNT_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTION_OUTPUT_SIZE;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTION_OUTPUT_SIZE_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTION_TIME;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTION_TIME_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.FLUSHED_MEMSTORE_BYTES;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.FLUSHED_MEMSTORE_BYTES_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.FLUSHED_OUTPUT_BYTES;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.FLUSHED_OUTPUT_BYTES_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.FLUSH_MEMSTORE_SIZE;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.FLUSH_MEMSTORE_SIZE_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.FLUSH_OUTPUT_SIZE;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.FLUSH_OUTPUT_SIZE_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.FLUSH_TIME;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.FLUSH_TIME_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTED_INPUT_BYTES;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTED_INPUT_BYTES_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTED_OUTPUT_BYTES;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTED_OUTPUT_BYTES_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTION_INPUT_FILE_COUNT;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTION_INPUT_FILE_COUNT_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTION_INPUT_SIZE;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTION_INPUT_SIZE_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTION_OUTPUT_FILE_COUNT;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTION_OUTPUT_FILE_COUNT_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTION_OUTPUT_SIZE;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTION_OUTPUT_SIZE_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTION_TIME;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTION_TIME_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_KEY;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_REQUEST_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_REQUEST_KEY;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_SUCCESS_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_SUCCESS_KEY;
+
 @InterfaceAudience.Private
 public class MetricsTableSourceImpl implements MetricsTableSource {
 
@@ -46,12 +92,41 @@ public class MetricsTableSourceImpl implements MetricsTableSource {
   private final TableName tableName;
   private final int hashCode;
 
+  // split related metrics
+  private MutableFastCounter splitRequest;
+  private MutableFastCounter splitSuccess;
+  private MetricHistogram splitTimeHisto;
+
+  // flush related metrics
+  private MetricHistogram flushTimeHisto;
+  private MetricHistogram flushMemstoreSizeHisto;
+  private MetricHistogram flushOutputSizeHisto;
+  private MutableFastCounter flushedMemstoreBytes;
+  private MutableFastCounter flushedOutputBytes;
+
+  // compaction related metrics
+  private MetricHistogram compactionTimeHisto;
+  private MetricHistogram compactionInputFileCountHisto;
+  private MetricHistogram compactionInputSizeHisto;
+  private MetricHistogram compactionOutputFileCountHisto;
+  private MetricHistogram compactionOutputSizeHisto;
+  private MutableFastCounter compactedInputBytes;
+  private MutableFastCounter compactedOutputBytes;
+
+  private MetricHistogram majorCompactionTimeHisto;
+  private MetricHistogram majorCompactionInputFileCountHisto;
+  private MetricHistogram majorCompactionInputSizeHisto;
+  private MetricHistogram majorCompactionOutputFileCountHisto;
+  private MetricHistogram majorCompactionOutputSizeHisto;
+  private MutableFastCounter majorCompactedInputBytes;
+  private MutableFastCounter majorCompactedOutputBytes;
+
   public MetricsTableSourceImpl(String tblName,
       MetricsTableAggregateSourceImpl aggregate, MetricsTableWrapperAggregate tblWrapperAgg) {
     LOG.debug("Creating new MetricsTableSourceImpl for table ");
     this.tableName = TableName.valueOf(tblName);
     this.agg = aggregate;
-    agg.register(tblName, this);
+
     this.tableWrapperAgg = tblWrapperAgg;
     this.registry = agg.getMetricsRegistry();
     this.tableNamePrefix = "Namespace_" + this.tableName.getNamespaceAsString() +
@@ -60,6 +135,79 @@ public class MetricsTableSourceImpl implements MetricsTableSource {
   }
 
   @Override
+  public synchronized void registerMetrics() {
+    flushTimeHisto = registry.newTimeHistogram(tableNamePrefix + FLUSH_TIME, FLUSH_TIME_DESC);
+    flushMemstoreSizeHisto =
+        registry.newSizeHistogram(tableNamePrefix + FLUSH_MEMSTORE_SIZE, FLUSH_MEMSTORE_SIZE_DESC);
+    flushOutputSizeHisto =
+        registry.newSizeHistogram(tableNamePrefix + FLUSH_OUTPUT_SIZE, FLUSH_OUTPUT_SIZE_DESC);
+    flushedOutputBytes =
+        registry.newCounter(tableNamePrefix + FLUSHED_OUTPUT_BYTES, FLUSHED_OUTPUT_BYTES_DESC, 0L);
+    flushedMemstoreBytes = registry.newCounter(tableNamePrefix + FLUSHED_MEMSTORE_BYTES,
+      FLUSHED_MEMSTORE_BYTES_DESC, 0L);
+
+    compactionTimeHisto =
+        registry.newTimeHistogram(tableNamePrefix + COMPACTION_TIME, COMPACTION_TIME_DESC);
+    compactionInputFileCountHisto = registry.newHistogram(
+      tableNamePrefix + COMPACTION_INPUT_FILE_COUNT, COMPACTION_INPUT_FILE_COUNT_DESC);
+    compactionInputSizeHisto = registry.newSizeHistogram(tableNamePrefix + COMPACTION_INPUT_SIZE,
+      COMPACTION_INPUT_SIZE_DESC);
+    compactionOutputFileCountHisto = registry.newHistogram(
+      tableNamePrefix + COMPACTION_OUTPUT_FILE_COUNT, COMPACTION_OUTPUT_FILE_COUNT_DESC);
+    compactionOutputSizeHisto = registry.newSizeHistogram(tableNamePrefix + COMPACTION_OUTPUT_SIZE,
+      COMPACTION_OUTPUT_SIZE_DESC);
+    compactedInputBytes = registry.newCounter(tableNamePrefix + COMPACTED_INPUT_BYTES,
+      COMPACTED_INPUT_BYTES_DESC, 0L);
+    compactedOutputBytes = registry.newCounter(tableNamePrefix + COMPACTED_OUTPUT_BYTES,
+      COMPACTED_OUTPUT_BYTES_DESC, 0L);
+
+    majorCompactionTimeHisto = registry.newTimeHistogram(tableNamePrefix + MAJOR_COMPACTION_TIME,
+      MAJOR_COMPACTION_TIME_DESC);
+    majorCompactionInputFileCountHisto = registry.newHistogram(
+      tableNamePrefix + MAJOR_COMPACTION_INPUT_FILE_COUNT, MAJOR_COMPACTION_INPUT_FILE_COUNT_DESC);
+    majorCompactionInputSizeHisto = registry.newSizeHistogram(
+      tableNamePrefix + MAJOR_COMPACTION_INPUT_SIZE, MAJOR_COMPACTION_INPUT_SIZE_DESC);
+    majorCompactionOutputFileCountHisto =
+        registry.newHistogram(tableNamePrefix + MAJOR_COMPACTION_OUTPUT_FILE_COUNT,
+          MAJOR_COMPACTION_OUTPUT_FILE_COUNT_DESC);
+    majorCompactionOutputSizeHisto = registry.newSizeHistogram(
+      tableNamePrefix + MAJOR_COMPACTION_OUTPUT_SIZE, MAJOR_COMPACTION_OUTPUT_SIZE_DESC);
+    majorCompactedInputBytes = registry.newCounter(tableNamePrefix + MAJOR_COMPACTED_INPUT_BYTES,
+      MAJOR_COMPACTED_INPUT_BYTES_DESC, 0L);
+    majorCompactedOutputBytes = registry.newCounter(tableNamePrefix + MAJOR_COMPACTED_OUTPUT_BYTES,
+      MAJOR_COMPACTED_OUTPUT_BYTES_DESC, 0L);
+
+    splitTimeHisto = registry.newTimeHistogram(tableNamePrefix + SPLIT_KEY);
+    splitRequest = registry.newCounter(tableNamePrefix + SPLIT_REQUEST_KEY, SPLIT_REQUEST_DESC, 0L);
+    splitSuccess = registry.newCounter(tableNamePrefix + SPLIT_SUCCESS_KEY, SPLIT_SUCCESS_DESC, 0L);
+  }
+
+  private void deregisterMetrics() {
+    registry.removeHistogramMetrics(tableNamePrefix + FLUSH_TIME);
+    registry.removeHistogramMetrics(tableNamePrefix + FLUSH_MEMSTORE_SIZE);
+    registry.removeHistogramMetrics(tableNamePrefix + FLUSH_OUTPUT_SIZE);
+    registry.removeMetric(tableNamePrefix + FLUSHED_OUTPUT_BYTES);
+    registry.removeMetric(tableNamePrefix + FLUSHED_MEMSTORE_BYTES);
+    registry.removeHistogramMetrics(tableNamePrefix + COMPACTION_TIME);
+    registry.removeHistogramMetrics(tableNamePrefix + COMPACTION_INPUT_FILE_COUNT);
+    registry.removeHistogramMetrics(tableNamePrefix + COMPACTION_INPUT_SIZE);
+    registry.removeHistogramMetrics(tableNamePrefix + COMPACTION_OUTPUT_FILE_COUNT);
+    registry.removeHistogramMetrics(tableNamePrefix + COMPACTION_OUTPUT_SIZE);
+    registry.removeMetric(tableNamePrefix + COMPACTED_INPUT_BYTES);
+    registry.removeMetric(tableNamePrefix + COMPACTED_OUTPUT_BYTES);
+    registry.removeHistogramMetrics(tableNamePrefix + MAJOR_COMPACTION_TIME);
+    registry.removeHistogramMetrics(tableNamePrefix + MAJOR_COMPACTION_INPUT_FILE_COUNT);
+    registry.removeHistogramMetrics(tableNamePrefix + MAJOR_COMPACTION_INPUT_SIZE);
+    registry.removeHistogramMetrics(tableNamePrefix + MAJOR_COMPACTION_OUTPUT_FILE_COUNT);
+    registry.removeHistogramMetrics(tableNamePrefix + MAJOR_COMPACTION_OUTPUT_SIZE);
+    registry.removeMetric(tableNamePrefix + MAJOR_COMPACTED_INPUT_BYTES);
+    registry.removeMetric(tableNamePrefix + MAJOR_COMPACTED_OUTPUT_BYTES);
+    registry.removeHistogramMetrics(tableNamePrefix + SPLIT_KEY);
+    registry.removeMetric(tableNamePrefix + SPLIT_REQUEST_KEY);
+    registry.removeMetric(tableNamePrefix + SPLIT_SUCCESS_KEY);
+  }
+
+  @Override
   public void close() {
     boolean wasClosed = closed.getAndSet(true);
 
@@ -70,7 +218,7 @@ public class MetricsTableSourceImpl implements MetricsTableSource {
 
     // Before removing the metrics remove this table from the aggregate table bean.
     // This should mean that it's unlikely that snapshot and close happen at the same time.
-    agg.deregister(tableName.getNameAsString());
+    agg.deleteTableSource(tableName.getNameAsString());
 
     // While it's un-likely that snapshot and close happen at the same time it's still possible.
     // So grab the lock to ensure that all calls to snapshot are done before we remove the metrics
@@ -78,6 +226,7 @@ public class MetricsTableSourceImpl implements MetricsTableSource {
       if (LOG.isTraceEnabled()) {
         LOG.trace("Removing table Metrics for table ");
       }
+      deregisterMetrics();
       tableWrapperAgg = null;
     }
   }
@@ -122,24 +271,52 @@ public class MetricsTableSourceImpl implements MetricsTableSource {
       }
 
       if (this.tableWrapperAgg != null) {
-        mrb.addCounter(Interns.info(tableNamePrefix + MetricsTableSource.READ_REQUEST_COUNT,
-          MetricsTableSource.READ_REQUEST_COUNT_DESC),
-          tableWrapperAgg.getReadRequestsCount(tableName.getNameAsString()));
-        mrb.addCounter(Interns.info(tableNamePrefix + MetricsTableSource.WRITE_REQUEST_COUNT,
-          MetricsTableSource.WRITE_REQUEST_COUNT_DESC),
-          tableWrapperAgg.getWriteRequestsCount(tableName.getNameAsString()));
-        mrb.addCounter(Interns.info(tableNamePrefix + MetricsTableSource.TOTAL_REQUEST_COUNT,
-          MetricsTableSource.TOTAL_REQUEST_COUNT_DESC),
-          tableWrapperAgg.getTotalRequestsCount(tableName.getNameAsString()));
-        mrb.addGauge(Interns.info(tableNamePrefix + MetricsTableSource.MEMSTORE_SIZE,
-          MetricsTableSource.MEMSTORE_SIZE_DESC),
-          tableWrapperAgg.getMemStoresSize(tableName.getNameAsString()));
-        mrb.addGauge(Interns.info(tableNamePrefix + MetricsTableSource.STORE_FILE_SIZE,
-          MetricsTableSource.STORE_FILE_SIZE_DESC),
-          tableWrapperAgg.getStoreFilesSize(tableName.getNameAsString()));
+        mrb.addCounter(Interns.info(tableNamePrefix + MetricsRegionServerSource.READ_REQUEST_COUNT,
+            MetricsRegionServerSource.READ_REQUEST_COUNT_DESC),
+            tableWrapperAgg.getReadRequestCount(tableName.getNameAsString()));
+        mrb.addCounter(
+            Interns.info(tableNamePrefix + MetricsRegionServerSource.FILTERED_READ_REQUEST_COUNT,
+                MetricsRegionServerSource.FILTERED_READ_REQUEST_COUNT_DESC),
+            tableWrapperAgg.getFilteredReadRequestCount(tableName.getNameAsString()));
+        mrb.addCounter(Interns.info(tableNamePrefix + MetricsRegionServerSource.WRITE_REQUEST_COUNT,
+            MetricsRegionServerSource.WRITE_REQUEST_COUNT_DESC),
+            tableWrapperAgg.getWriteRequestCount(tableName.getNameAsString()));
+        mrb.addCounter(Interns.info(tableNamePrefix + MetricsRegionServerSource.TOTAL_REQUEST_COUNT,
+            MetricsRegionServerSource.TOTAL_REQUEST_COUNT_DESC),
+            tableWrapperAgg.getTotalRequestsCount(tableName.getNameAsString()));
+        mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.MEMSTORE_SIZE,
+            MetricsRegionServerSource.MEMSTORE_SIZE_DESC),
+            tableWrapperAgg.getMemStoreSize(tableName.getNameAsString()));
+        mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.STOREFILE_COUNT,
+            MetricsRegionServerSource.STOREFILE_COUNT_DESC),
+            tableWrapperAgg.getNumStoreFiles(tableName.getNameAsString()));
+        mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.STOREFILE_SIZE,
+            MetricsRegionServerSource.STOREFILE_SIZE_DESC),
+            tableWrapperAgg.getStoreFileSize(tableName.getNameAsString()));
         mrb.addGauge(Interns.info(tableNamePrefix + MetricsTableSource.TABLE_SIZE,
           MetricsTableSource.TABLE_SIZE_DESC),
           tableWrapperAgg.getTableSize(tableName.getNameAsString()));
+        mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.AVERAGE_REGION_SIZE,
+            MetricsRegionServerSource.AVERAGE_REGION_SIZE_DESC),
+            tableWrapperAgg.getAvgRegionSize(tableName.getNameAsString()));
+        mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.REGION_COUNT,
+            MetricsRegionServerSource.REGION_COUNT_DESC),
+            tableWrapperAgg.getNumRegions(tableName.getNameAsString()));
+        mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.STORE_COUNT,
+            MetricsRegionServerSource.STORE_COUNT_DESC),
+            tableWrapperAgg.getNumStores(tableName.getNameAsString()));
+        mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.MAX_STORE_FILE_AGE,
+            MetricsRegionServerSource.MAX_STORE_FILE_AGE_DESC),
+            tableWrapperAgg.getMaxStoreFileAge(tableName.getNameAsString()));
+        mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.MIN_STORE_FILE_AGE,
+            MetricsRegionServerSource.MIN_STORE_FILE_AGE_DESC),
+            tableWrapperAgg.getMinStoreFileAge(tableName.getNameAsString()));
+        mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.AVG_STORE_FILE_AGE,
+            MetricsRegionServerSource.AVG_STORE_FILE_AGE_DESC),
+            tableWrapperAgg.getAvgStoreFileAge(tableName.getNameAsString()));
+        mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.NUM_REFERENCE_FILES,
+            MetricsRegionServerSource.NUM_REFERENCE_FILES_DESC),
+            tableWrapperAgg.getNumReferenceFiles(tableName.getNameAsString()));
       }
     }
   }
@@ -174,4 +351,80 @@ public class MetricsTableSourceImpl implements MetricsTableSource {
   public String getTableNamePrefix() {
     return tableNamePrefix;
   }
+
+  @Override
+  public void incrSplitRequest() {
+    splitRequest.incr();
+  }
+
+  @Override
+  public void incrSplitSuccess() {
+    splitSuccess.incr();
+  }
+
+  @Override
+  public void updateSplitTime(long t) {
+    splitTimeHisto.add(t);
+  }
+
+  @Override
+  public void updateFlushTime(long t) {
+    flushTimeHisto.add(t);
+  }
+
+  @Override
+  public void updateFlushMemstoreSize(long bytes) {
+    flushMemstoreSizeHisto.add(bytes);
+    flushedMemstoreBytes.incr(bytes);
+  }
+
+  @Override
+  public void updateFlushOutputSize(long bytes) {
+    flushOutputSizeHisto.add(bytes);
+    flushedOutputBytes.incr(bytes);
+  }
+
+  @Override
+  public void updateCompactionTime(boolean isMajor, long t) {
+    compactionTimeHisto.add(t);
+    if (isMajor) {
+      majorCompactionTimeHisto.add(t);
+    }
+  }
+
+  @Override
+  public void updateCompactionInputFileCount(boolean isMajor, long c) {
+    compactionInputFileCountHisto.add(c);
+    if (isMajor) {
+      majorCompactionInputFileCountHisto.add(c);
+    }
+  }
+
+  @Override
+  public void updateCompactionInputSize(boolean isMajor, long bytes) {
+    compactionInputSizeHisto.add(bytes);
+    compactedInputBytes.incr(bytes);
+    if (isMajor) {
+      majorCompactionInputSizeHisto.add(bytes);
+      majorCompactedInputBytes.incr(bytes);
+    }
+  }
+
+  @Override
+  public void updateCompactionOutputFileCount(boolean isMajor, long c) {
+    compactionOutputFileCountHisto.add(c);
+    if (isMajor) {
+      majorCompactionOutputFileCountHisto.add(c);
+    }
+  }
+
+  @Override
+  public void updateCompactionOutputSize(boolean isMajor, long bytes) {
+    compactionOutputSizeHisto.add(bytes);
+    compactedOutputBytes.incr(bytes);
+    if (isMajor) {
+      majorCompactionOutputSizeHisto.add(bytes);
+      majorCompactedOutputBytes.incr(bytes);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/72a29211/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java
new file mode 100644
index 0000000..b9d3d1f
--- /dev/null
+++ b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+public class MetricsTableWrapperStub implements MetricsTableWrapperAggregate {
+
+  private String tableName;
+
+  public MetricsTableWrapperStub(String tableName) {
+    this.tableName = tableName;
+  }
+
+  @Override
+  public long getReadRequestCount(String table) {
+    return 10;
+  }
+
+  @Override
+  public long getWriteRequestCount(String table) {
+    return 20;
+  }
+
+  @Override
+  public long getTotalRequestsCount(String table) {
+    return 30;
+  }
+
+  @Override
+  public long getFilteredReadRequestCount(String table) {
+    return 40;
+  }
+
+  @Override
+  public long getMemStoreSize(String table) {
+    return 1000;
+  }
+
+  @Override
+  public long getStoreFileSize(String table) {
+    return 2000;
+  }
+
+  @Override
+  public long getTableSize(String table) {
+    return 3000;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  @Override
+  public long getNumRegions(String table) {
+    return 11;
+  }
+
+  @Override
+  public long getNumStores(String table) {
+    return 22;
+  }
+
+  @Override
+  public long getNumStoreFiles(String table) {
+    return 33;
+  }
+
+  @Override
+  public long getMaxStoreFileAge(String table) {
+    return 44;
+  }
+
+  @Override
+  public long getMinStoreFileAge(String table) {
+    return 55;
+  }
+
+  @Override
+  public long getAvgStoreFileAge(String table) {
+    return 66;
+  }
+
+  @Override
+  public long getNumReferenceFiles(String table) {
+    return 77;
+  }
+
+  @Override
+  public long getAvgRegionSize(String table) {
+    return 88;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/72a29211/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java
index a22c7a5..82bfc0b 100644
--- a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java
+++ b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java
@@ -196,5 +196,10 @@ public class TestMetricsRegionSourceImpl {
     public long getMaxFlushQueueSize() {
       return 0;
     }
+
+    @Override
+    public long getTotalRequestCount() {
+      return 0;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/72a29211/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableSourceImpl.java b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableSourceImpl.java
index 25fe532..11177ed 100644
--- a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableSourceImpl.java
+++ b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableSourceImpl.java
@@ -44,10 +44,13 @@ public class TestMetricsTableSourceImpl {
     MetricsRegionServerSourceFactory metricsFact =
         CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class);
 
-    MetricsTableSource one = metricsFact.createTable("ONETABLE", new TableWrapperStub("ONETABLE"));
-    MetricsTableSource oneClone = metricsFact.createTable("ONETABLE",
-            new TableWrapperStub("ONETABLE"));
-    MetricsTableSource two = metricsFact.createTable("TWOTABLE", new TableWrapperStub("TWOTABLE"));
+    MetricsTableSource one = metricsFact.createTable(
+        "ONETABLE", new MetricsTableWrapperStub("ONETABLE"));
+    MetricsTableSource oneClone = metricsFact.createTable(
+        "ONETABLE",
+            new MetricsTableWrapperStub("ONETABLE"));
+    MetricsTableSource two = metricsFact.createTable(
+        "TWOTABLE", new MetricsTableWrapperStub("TWOTABLE"));
 
     assertEquals(0, one.compareTo(oneClone));
     assertEquals(one.hashCode(), oneClone.hashCode());
@@ -70,49 +73,8 @@ public class TestMetricsTableSourceImpl {
   public void testGetTableMetrics() {
     MetricsTableSource oneTbl =
         CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
-        .createTable("ONETABLE", new TableWrapperStub("ONETABLE"));
+        .createTable("ONETABLE", new MetricsTableWrapperStub("ONETABLE"));
     assertEquals("ONETABLE", oneTbl.getTableName());
   }
 
-  static class TableWrapperStub implements MetricsTableWrapperAggregate {
-    private String tableName;
-
-    public TableWrapperStub(String tableName) {
-      this.tableName = tableName;
-    }
-
-    @Override
-    public long getReadRequestsCount(String table) {
-      return 10;
-    }
-
-    @Override
-    public long getWriteRequestsCount(String table) {
-      return 20;
-    }
-
-    @Override
-    public long getTotalRequestsCount(String table) {
-      return 30;
-    }
-
-    @Override
-    public long getMemStoresSize(String table) {
-      return 1000;
-    }
-
-    @Override
-    public long getStoreFilesSize(String table) {
-      return 2000;
-    }
-
-    @Override
-    public long getTableSize(String table) {
-      return 3000;
-    }
-
-    public String getTableName() {
-      return tableName;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/72a29211/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index cf3cde3..eaa2eea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2783,7 +2783,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     status.setStatus(msg);
 
     if (rsServices != null && rsServices.getMetrics() != null) {
-      rsServices.getMetrics().updateFlush(time - startTime,
+      rsServices.getMetrics().updateFlush(getTableDescriptor().getTableName().getNameAsString(),
+          time - startTime,
           mss.getDataSize(), flushedOutputFileSize);
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/72a29211/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 5cd6499..994cdb6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1505,9 +1505,9 @@ public class HRegionServer extends HasThread implements
       // This call sets up an initialized replication and WAL. Later we start it up.
       setupWALAndReplication();
       // Init in here rather than in constructor after thread name has been set
-      this.metricsRegionServer = new MetricsRegionServer(
-          new MetricsRegionServerWrapperImpl(this), conf);
       this.metricsTable = new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
+      this.metricsRegionServer = new MetricsRegionServer(
+          new MetricsRegionServerWrapperImpl(this), conf, metricsTable);
       // Now that we have a metrics source, start the pause monitor
       this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource());
       pauseMonitor.start();

http://git-wip-us.apache.org/repos/asf/hbase/blob/72a29211/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index ea699ea..314547e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1427,9 +1427,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
     long now = EnvironmentEdgeManager.currentTime();
     if (region.getRegionServerServices() != null
         && region.getRegionServerServices().getMetrics() != null) {
-      region.getRegionServerServices().getMetrics().updateCompaction(cr.isMajor(),
-          now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(),
-          outputBytes);
+      region.getRegionServerServices().getMetrics().updateCompaction(
+          region.getTableDescriptor().getTableName().getNameAsString(),
+          cr.isMajor(), now - compactionStartTime, cr.getFiles().size(),
+          newFiles.size(), cr.getSize(), outputBytes);
+
     }
 
     logCompactionEndMessage(cr, sfs, now, compactionStartTime);

http://git-wip-us.apache.org/repos/asf/hbase/blob/72a29211/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
index df50fa8..e6f65e7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
@@ -46,15 +46,18 @@ public class MetricsRegionServer {
   private MetricsRegionServerSource serverSource;
   private MetricsRegionServerWrapper regionServerWrapper;
   private RegionServerTableMetrics tableMetrics;
+  private final MetricsTable metricsTable;
 
   private MetricRegistry metricRegistry;
   private Timer bulkLoadTimer;
 
-  public MetricsRegionServer(MetricsRegionServerWrapper regionServerWrapper, Configuration conf) {
+  public MetricsRegionServer(MetricsRegionServerWrapper regionServerWrapper, Configuration conf,
+      MetricsTable metricsTable) {
     this(regionServerWrapper,
         CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
             .createServer(regionServerWrapper),
-        createTableMetrics(conf));
+        createTableMetrics(conf),
+        metricsTable);
 
     // Create hbase-metrics module based metrics. The registry should already be registered by the
     // MetricsRegionServerSource
@@ -66,10 +69,12 @@ public class MetricsRegionServer {
 
   MetricsRegionServer(MetricsRegionServerWrapper regionServerWrapper,
                       MetricsRegionServerSource serverSource,
-                      RegionServerTableMetrics tableMetrics) {
+                      RegionServerTableMetrics tableMetrics,
+                      MetricsTable metricsTable) {
     this.regionServerWrapper = regionServerWrapper;
     this.serverSource = serverSource;
     this.tableMetrics = tableMetrics;
+    this.metricsTable = metricsTable;
   }
 
   /**
@@ -193,19 +198,34 @@ public class MetricsRegionServer {
     serverSource.incrSplitSuccess();
   }
 
-  public void updateFlush(long t, long memstoreSize, long fileSize) {
+  public void updateFlush(String table, long t, long memstoreSize, long fileSize) {
     serverSource.updateFlushTime(t);
     serverSource.updateFlushMemStoreSize(memstoreSize);
     serverSource.updateFlushOutputSize(fileSize);
+
+    if (table != null) {
+      metricsTable.updateFlushTime(table, memstoreSize);
+      metricsTable.updateFlushMemstoreSize(table, memstoreSize);
+      metricsTable.updateFlushOutputSize(table, fileSize);
+    }
+
   }
 
-  public void updateCompaction(boolean isMajor, long t, int inputFileCount, int outputFileCount,
+  public void updateCompaction(String table, boolean isMajor, long t, int inputFileCount, int outputFileCount,
       long inputBytes, long outputBytes) {
     serverSource.updateCompactionTime(isMajor, t);
     serverSource.updateCompactionInputFileCount(isMajor, inputFileCount);
     serverSource.updateCompactionOutputFileCount(isMajor, outputFileCount);
     serverSource.updateCompactionInputSize(isMajor, inputBytes);
     serverSource.updateCompactionOutputSize(isMajor, outputBytes);
+
+    if (table != null) {
+      metricsTable.updateCompactionTime(table, isMajor, t);
+      metricsTable.updateCompactionInputFileCount(table, isMajor, inputFileCount);
+      metricsTable.updateCompactionOutputFileCount(table, isMajor, outputFileCount);
+      metricsTable.updateCompactionInputSize(table, isMajor, inputBytes);
+      metricsTable.updateCompactionOutputSize(table, isMajor, outputBytes);
+    }
   }
 
   public void updateBulkLoad(long millis) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/72a29211/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
index 533a05d..c073ef0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
@@ -162,6 +162,11 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
   }
 
   @Override
+  public long getTotalRequestCount() {
+    return getReadRequestCount() + getWriteRequestCount();
+  }
+
+  @Override
   public long getNumCompactionsFailed() {
     return this.region.compactionsFailed.sum();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/72a29211/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTable.java
index b853c75..a3f0dff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTable.java
@@ -24,19 +24,67 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 public class MetricsTable {
   private final MetricsTableAggregateSource tableSourceAgg;
-  private MetricsTableWrapperAggregate tableWrapperAgg;
+  private MetricsTableWrapperAggregate wrapper;
 
   public MetricsTable(final MetricsTableWrapperAggregate wrapper) {
     tableSourceAgg = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
                                              .getTableAggregate();
-    this.tableWrapperAgg = wrapper;
+    this.wrapper = wrapper;
   }
 
   public MetricsTableWrapperAggregate getTableWrapperAgg() {
-    return tableWrapperAgg;
+    return wrapper;
   }
 
   public MetricsTableAggregateSource getTableSourceAgg() {
     return tableSourceAgg;
   }
+
+  public void incrSplitRequest(String table) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper).incrSplitRequest();
+  }
+
+  public void incrSplitSuccess(String table) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper).incrSplitSuccess();
+  }
+
+  public void updateSplitTime(String table, long t) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper).updateSplitTime(t);
+  }
+
+  public void updateFlushTime(String table, long t) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper).updateFlushTime(t);
+  }
+
+  public void updateFlushMemstoreSize(String table, long bytes) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper).updateFlushMemstoreSize(bytes);
+  }
+
+  public void updateFlushOutputSize(String table, long bytes) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper).updateFlushOutputSize(bytes);
+  }
+
+  public void updateCompactionTime(String table, boolean isMajor, long t) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper).updateCompactionTime(isMajor, t);
+  }
+
+  public void updateCompactionInputFileCount(String table, boolean isMajor, long c) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper)
+      .updateCompactionInputFileCount(isMajor, c);
+  }
+
+  public void updateCompactionInputSize(String table, boolean isMajor, long bytes) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper)
+      .updateCompactionInputSize(isMajor, bytes);
+  }
+
+  public void updateCompactionOutputFileCount(String table, boolean isMajor, long c) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper)
+      .updateCompactionOutputFileCount(isMajor, c);
+  }
+
+  public void updateCompactionOutputSize(String table, boolean isMajor, long bytes) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper)
+      .updateCompactionOutputSize(isMajor, bytes);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/72a29211/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java
index f06f747..56b7598 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java
@@ -43,7 +43,8 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr
   private Runnable runnable;
   private long period;
   private ScheduledFuture<?> tableMetricsUpdateTask;
-  private ConcurrentHashMap<TableName, MetricsTableValues> metricsTableMap = new ConcurrentHashMap<>();
+  private ConcurrentHashMap<TableName, MetricsTableValues> metricsTableMap
+    = new ConcurrentHashMap<>();
 
   public MetricsTableWrapperAggregateImpl(final HRegionServer regionServer) {
     this.regionServer = regionServer;
@@ -51,8 +52,8 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr
       HConstants.DEFAULT_REGIONSERVER_METRICS_PERIOD) + 1000;
     this.executor = CompatibilitySingletonFactory.getInstance(MetricsExecutor.class).getExecutor();
     this.runnable = new TableMetricsWrapperRunnable();
-    this.tableMetricsUpdateTask = this.executor.scheduleWithFixedDelay(this.runnable, period, this.period,
-      TimeUnit.MILLISECONDS);
+    this.tableMetricsUpdateTask = this.executor.scheduleWithFixedDelay(this.runnable, period,
+      this.period, TimeUnit.MILLISECONDS);
   }
 
   public class TableMetricsWrapperRunnable implements Runnable {
@@ -62,33 +63,43 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr
       Map<TableName, MetricsTableValues> localMetricsTableMap = new HashMap<>();
 
       for (Region r : regionServer.getOnlineRegionsLocalContext()) {
-        TableName tbl= r.getTableDescriptor().getTableName();
-        MetricsTableValues metricsTable = localMetricsTableMap.get(tbl);
-        if (metricsTable == null) {
-          metricsTable = new MetricsTableValues();
-          localMetricsTableMap.put(tbl, metricsTable);
+        TableName tbl = r.getTableDescriptor().getTableName();
+        MetricsTableValues mt = localMetricsTableMap.get(tbl);
+        if (mt == null) {
+          mt = new MetricsTableValues();
+          localMetricsTableMap.put(tbl, mt);
         }
-        long tempStorefilesSize = 0;
-        for (Store store : r.getStores()) {
-          tempStorefilesSize += store.getStorefilesSize();
+        if (r.getStores() != null) {
+          for (Store store : r.getStores()) {
+            mt.storeFileCount += store.getStorefilesCount();
+            mt.memstoreSize += (store.getMemStoreSize().getDataSize() +
+              store.getMemStoreSize().getHeapSize() + store.getMemStoreSize().getOffHeapSize());
+            mt.storeFileSize += store.getStorefilesSize();
+            mt.referenceFileCount += store.getNumReferenceFiles();
+
+            mt.maxStoreFileAge = Math.max(mt.maxStoreFileAge, store.getMaxStoreFileAge().getAsLong());
+            mt.minStoreFileAge = Math.min(mt.minStoreFileAge, store.getMinStoreFileAge().getAsLong());
+            mt.totalStoreFileAge = (long)store.getAvgStoreFileAge().getAsDouble() *
+                store.getStorefilesCount();
+            mt.storeCount += 1;
+          }
+          mt.regionCount += 1;
+
+          mt.readRequestCount += r.getReadRequestsCount();
+          mt.filteredReadRequestCount += getFilteredReadRequestCount(tbl.getNameAsString());
+          mt.writeRequestCount += r.getWriteRequestsCount();
+
         }
-        metricsTable.setMemStoresSize(metricsTable.getMemStoresSize() + r.getMemStoreDataSize());
-        metricsTable.setStoreFilesSize(metricsTable.getStoreFilesSize() + tempStorefilesSize);
-        metricsTable.setTableSize(metricsTable.getMemStoresSize() + metricsTable.getStoreFilesSize());
-        metricsTable.setReadRequestsCount(metricsTable.getReadRequestsCount() + r.getReadRequestsCount());
-        metricsTable.setWriteRequestsCount(metricsTable.getWriteRequestsCount() + r.getWriteRequestsCount());
-        metricsTable.setTotalRequestsCount(metricsTable.getReadRequestsCount() + metricsTable.getWriteRequestsCount());
       }
 
-      for(Map.Entry<TableName, MetricsTableValues> entry : localMetricsTableMap.entrySet()) {
+      for (Map.Entry<TableName, MetricsTableValues> entry : localMetricsTableMap.entrySet()) {
         TableName tbl = entry.getKey();
         if (metricsTableMap.get(tbl) == null) {
-          MetricsTableSource tableSource = CompatibilitySingletonFactory
-              .getInstance(MetricsRegionServerSourceFactory.class).createTable(tbl.getNameAsString(),
-                MetricsTableWrapperAggregateImpl.this);
+          // this will add the Wrapper to the list of TableMetrics
           CompatibilitySingletonFactory
-          .getInstance(MetricsRegionServerSourceFactory.class).getTableAggregate()
-          .register(tbl.getNameAsString(), tableSource);
+              .getInstance(MetricsRegionServerSourceFactory.class)
+              .getTableAggregate()
+              .getOrCreateTableSource(tbl.getNameAsString(), MetricsTableWrapperAggregateImpl.this);
         }
         metricsTableMap.put(entry.getKey(), entry.getValue());
       }
@@ -97,7 +108,7 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr
       MetricsTableAggregateSource agg = CompatibilitySingletonFactory
           .getInstance(MetricsRegionServerSourceFactory.class).getTableAggregate();
       for (TableName table : existingTableNames) {
-        agg.deregister(table.getNameAsString());
+        agg.deleteTableSource(table.getNameAsString());
         if (metricsTableMap.get(table) != null) {
           metricsTableMap.remove(table);
         }
@@ -106,120 +117,176 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr
   }
 
   @Override
-  public long getReadRequestsCount(String table) {
+  public long getReadRequestCount(String table) {
     MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
-    if (metricsTable == null)
+    if (metricsTable == null) {
       return 0;
-    else
-      return metricsTable.getReadRequestsCount();
+    } else {
+      return metricsTable.readRequestCount;
+    }
   }
 
-  @Override
-  public long getWriteRequestsCount(String table) {
+  public long getFilteredReadRequestCount(String table) {
     MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
-    if (metricsTable == null)
+    if (metricsTable == null) {
       return 0;
-    else
-      return metricsTable.getWriteRequestsCount();
+    }
+    return metricsTable.filteredReadRequestCount;
   }
 
   @Override
-  public long getTotalRequestsCount(String table) {
+  public long getWriteRequestCount(String table) {
     MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
-    if (metricsTable == null)
+    if (metricsTable == null) {
       return 0;
-    else
-      return metricsTable.getTotalRequestsCount();
+    } else {
+      return metricsTable.writeRequestCount;
+    }
   }
 
   @Override
-  public long getMemStoresSize(String table) {
+  public long getTotalRequestsCount(String table) {
     MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
-    if (metricsTable == null)
+    if (metricsTable == null) {
       return 0;
-    else
-      return metricsTable.getMemStoresSize();
+    } else {
+      return metricsTable.readRequestCount + metricsTable.writeRequestCount;
+    }
   }
 
   @Override
-  public long getStoreFilesSize(String table) {
+  public long getMemStoreSize(String table) {
     MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
-    if (metricsTable == null)
+    if (metricsTable == null) {
       return 0;
-    else
-      return metricsTable.getStoreFilesSize();
+    } else {
+      return metricsTable.memstoreSize;
+    }
   }
 
   @Override
-  public long getTableSize(String table) {
+  public long getStoreFileSize(String table) {
     MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
-    if (metricsTable == null)
+    if (metricsTable == null) {
       return 0;
-    else
-      return metricsTable.getTableSize();
+    } else {
+      return metricsTable.storeFileSize;
+    }
   }
 
   @Override
-  public void close() throws IOException {
-    tableMetricsUpdateTask.cancel(true);
+  public long getTableSize(String table) {
+    MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
+    if (metricsTable == null) {
+      return 0;
+    } else {
+      return metricsTable.memstoreSize + metricsTable.storeFileSize;
+    }
   }
 
-  private static class MetricsTableValues {
-
-    private long totalRequestsCount;
-    private long readRequestsCount;
-    private long writeRequestsCount;
-    private long memstoresSize;
-    private long storeFilesSize;
-    private long tableSize;
-
-    public long getTotalRequestsCount() {
-      return totalRequestsCount;
+  public long getNumRegions(String table) {
+    MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
+    if (metricsTable == null) {
+      return 0;
     }
+    return metricsTable.regionCount;
+  }
 
-    public void setTotalRequestsCount(long totalRequestsCount) {
-      this.totalRequestsCount = totalRequestsCount;
+  @Override
+  public long getNumStores(String table) {
+    MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
+    if (metricsTable == null) {
+      return 0;
     }
+    return metricsTable.storeCount;
+  }
 
-    public long getReadRequestsCount() {
-      return readRequestsCount;
+  @Override
+  public long getNumStoreFiles(String table) {
+    MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
+    if (metricsTable == null) {
+      return 0;
     }
+    return metricsTable.storeFileCount;
+  }
 
-    public void setReadRequestsCount(long readRequestsCount) {
-      this.readRequestsCount = readRequestsCount;
+  @Override
+  public long getMaxStoreFileAge(String table) {
+    MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
+    if (metricsTable == null) {
+      return 0;
     }
+    return metricsTable.maxStoreFileAge;
+  }
 
-    public long getWriteRequestsCount() {
-      return writeRequestsCount;
+  @Override
+  public long getMinStoreFileAge(String table) {
+    MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
+    if (metricsTable == null) {
+      return 0;
     }
+    return metricsTable.minStoreFileAge == Long.MAX_VALUE ? 0 : metricsTable.minStoreFileAge;
+  }
 
-    public void setWriteRequestsCount(long writeRequestsCount) {
-      this.writeRequestsCount = writeRequestsCount;
+  @Override
+  public long getAvgStoreFileAge(String table) {
+    MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
+    if (metricsTable == null) {
+      return 0;
     }
 
-    public long getMemStoresSize() {
-      return memstoresSize;
-    }
+    return metricsTable.storeFileCount == 0
+        ? 0
+        : (metricsTable.totalStoreFileAge / metricsTable.storeFileCount);
+  }
 
-    public void setMemStoresSize(long memstoresSize) {
-      this.memstoresSize = memstoresSize;
+  @Override
+  public long getNumReferenceFiles(String table) {
+    MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
+    if (metricsTable == null) {
+      return 0;
     }
+    return metricsTable.referenceFileCount;
+  }
 
-    public long getStoreFilesSize() {
-      return storeFilesSize;
+  @Override
+  public long getAvgRegionSize(String table) {
+    MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
+    if (metricsTable == null) {
+      return 0;
     }
+    return metricsTable.regionCount == 0
+        ? 0
+        : (metricsTable.memstoreSize + metricsTable.storeFileSize) / metricsTable.regionCount;
+  }
 
-    public void setStoreFilesSize(long storeFilesSize) {
-      this.storeFilesSize = storeFilesSize;
+  public long getCpRequestCount(String table) {
+    MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
+    if (metricsTable == null) {
+      return 0;
     }
+    return metricsTable.cpRequestCount;
+  }
 
-    public long getTableSize() {
-      return tableSize;
-    }
+  @Override
+  public void close() throws IOException {
+    tableMetricsUpdateTask.cancel(true);
+  }
 
-    public void setTableSize(long tableSize) {
-      this.tableSize = tableSize;
-    }
+  private static class MetricsTableValues {
+    long readRequestCount;
+    long filteredReadRequestCount;
+    long writeRequestCount;
+    long memstoreSize;
+    long regionCount;
+    long storeCount;
+    long storeFileCount;
+    long storeFileSize;
+    long maxStoreFileAge;
+    long minStoreFileAge = Long.MAX_VALUE;
+    long totalStoreFileAge;
+    long referenceFileCount;
+    long cpRequestCount;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/72a29211/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java
index 524d03b..1b1d0bf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java
@@ -162,4 +162,9 @@ public class MetricsRegionWrapperStub implements MetricsRegionWrapper {
   public long getMaxFlushQueueSize() {
     return 6;
   }
+
+  @Override
+  public long getTotalRequestCount() {
+    return 0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/72a29211/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java
deleted file mode 100644
index ba333a5..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.regionserver;
-
-public class MetricsTableWrapperStub implements MetricsTableWrapperAggregate {
-
-  private String tableName;
-
-  public MetricsTableWrapperStub(String tableName) {
-    this.tableName = tableName;
-  }
-
-  @Override
-  public long getReadRequestsCount(String table) {
-    return 10;
-  }
-
-  @Override
-  public long getWriteRequestsCount(String table) {
-    return 20;
-  }
-
-  @Override
-  public long getTotalRequestsCount(String table) {
-    return 30;
-  }
-
-  @Override
-  public long getMemStoresSize(String table) {
-    return 1000;
-  }
-
-  @Override
-  public long getStoreFilesSize(String table) {
-    return 2000;
-  }
-
-  @Override
-  public long getTableSize(String table) {
-    return 3000;
-  }
-
-  public String getTableName() {
-    return tableName;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/72a29211/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java
index bfc0b63..4bf71c8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java
@@ -57,7 +57,7 @@ public class TestMetricsRegionServer {
   @Before
   public void setUp() {
     wrapper = new MetricsRegionServerWrapperStub();
-    rsm = new MetricsRegionServer(wrapper, new Configuration(false));
+    rsm = new MetricsRegionServer(wrapper, new Configuration(false), null);
     serverSource = rsm.getMetricsSource();
   }
 
@@ -184,14 +184,14 @@ public class TestMetricsRegionServer {
 
   @Test
   public void testFlush() {
-    rsm.updateFlush(1, 2, 3);
+    rsm.updateFlush(null, 1, 2, 3);
     HELPER.assertCounter("flushTime_num_ops", 1, serverSource);
     HELPER.assertCounter("flushMemstoreSize_num_ops", 1, serverSource);
     HELPER.assertCounter("flushOutputSize_num_ops", 1, serverSource);
     HELPER.assertCounter("flushedMemstoreBytes", 2, serverSource);
     HELPER.assertCounter("flushedOutputBytes", 3, serverSource);
 
-    rsm.updateFlush(10, 20, 30);
+    rsm.updateFlush(null, 10, 20, 30);
     HELPER.assertCounter("flushTimeNumOps", 2, serverSource);
     HELPER.assertCounter("flushMemstoreSize_num_ops", 2, serverSource);
     HELPER.assertCounter("flushOutputSize_num_ops", 2, serverSource);
@@ -201,7 +201,7 @@ public class TestMetricsRegionServer {
 
   @Test
   public void testCompaction() {
-    rsm.updateCompaction(false, 1, 2, 3, 4, 5);
+    rsm.updateCompaction(null, false, 1, 2, 3, 4, 5);
     HELPER.assertCounter("compactionTime_num_ops", 1, serverSource);
     HELPER.assertCounter("compactionInputFileCount_num_ops", 1, serverSource);
     HELPER.assertCounter("compactionInputSize_num_ops", 1, serverSource);
@@ -209,7 +209,7 @@ public class TestMetricsRegionServer {
     HELPER.assertCounter("compactedInputBytes", 4, serverSource);
     HELPER.assertCounter("compactedoutputBytes", 5, serverSource);
 
-    rsm.updateCompaction(false, 10, 20, 30, 40, 50);
+    rsm.updateCompaction(null, false, 10, 20, 30, 40, 50);
     HELPER.assertCounter("compactionTime_num_ops", 2, serverSource);
     HELPER.assertCounter("compactionInputFileCount_num_ops", 2, serverSource);
     HELPER.assertCounter("compactionInputSize_num_ops", 2, serverSource);
@@ -218,7 +218,7 @@ public class TestMetricsRegionServer {
     HELPER.assertCounter("compactedoutputBytes", 55, serverSource);
 
     // do major compaction
-    rsm.updateCompaction(true, 100, 200, 300, 400, 500);
+    rsm.updateCompaction(null, true, 100, 200, 300, 400, 500);
 
     HELPER.assertCounter("compactionTime_num_ops", 3, serverSource);
     HELPER.assertCounter("compactionInputFileCount_num_ops", 3, serverSource);

http://git-wip-us.apache.org/repos/asf/hbase/blob/72a29211/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableAggregate.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableAggregate.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableAggregate.java
index e3bac65..b19d4b0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableAggregate.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableAggregate.java
@@ -18,12 +18,14 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CompatibilityFactory;
-import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.test.MetricsAssertHelper;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -33,26 +35,108 @@ public class TestMetricsTableAggregate {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestMetricsTableAggregate.class);
+    HBaseClassTestRule.forClass(TestMetricsTableAggregate.class);
 
   public static MetricsAssertHelper HELPER =
-      CompatibilityFactory.getInstance(MetricsAssertHelper.class);
+    CompatibilityFactory.getInstance(MetricsAssertHelper.class);
+
+  private String tableName = "testTableMetrics";
+  private String pre = "Namespace_default_table_" + tableName + "_metric_";
+
+  private MetricsTableWrapperStub tableWrapper;
+  private MetricsTable mt;
+  private MetricsRegionServerWrapper rsWrapper;
+  private MetricsRegionServer rsm;
+  private MetricsTableAggregateSource agg;
+
+  @BeforeClass
+  public static void classSetUp() {
+    HELPER.init();
+  }
+
+  @Before
+  public void setUp() {
+    tableWrapper = new MetricsTableWrapperStub(tableName);
+    mt = new MetricsTable(tableWrapper);
+    rsWrapper = new MetricsRegionServerWrapperStub();
+    Configuration conf = new Configuration();
+    rsm = new MetricsRegionServer(rsWrapper, conf, mt);
+    agg = mt.getTableSourceAgg();
+  }
 
   @Test
-  public void testTableWrapperAggregateMetrics() throws IOException {
-    String tableName = "testTableMetrics";
-    MetricsTableWrapperStub tableWrapper = new MetricsTableWrapperStub(tableName);
-    CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
-    .createTable(tableName, tableWrapper);
-    MetricsTableAggregateSource agg = CompatibilitySingletonFactory
-        .getInstance(MetricsRegionServerSourceFactory.class).getTableAggregate();
-
-    HELPER.assertCounter("Namespace_default_table_testTableMetrics_metric_readRequestCount", 10, agg);
-    HELPER.assertCounter("Namespace_default_table_testTableMetrics_metric_writeRequestCount", 20, agg);
-    HELPER.assertCounter("Namespace_default_table_testTableMetrics_metric_totalRequestCount", 30, agg);
-
-    HELPER.assertGauge("Namespace_default_table_testTableMetrics_metric_memstoreSize", 1000, agg);
-    HELPER.assertGauge("Namespace_default_table_testTableMetrics_metric_storeFileSize", 2000, agg);
-    HELPER.assertGauge("Namespace_default_table_testTableMetrics_metric_tableSize", 3000, agg);
+  public void testRequestMetrics() throws IOException {
+    HELPER.assertCounter(pre + "readRequestCount", 10, agg);
+    HELPER.assertCounter(pre + "writeRequestCount", 20, agg);
+    HELPER.assertCounter(pre + "totalRequestCount", 30, agg);
   }
+
+  @Test
+  public void testRegionAndStoreMetrics() throws IOException {
+    HELPER.assertGauge(pre + "memstoreSize", 1000, agg);
+    HELPER.assertGauge(pre + "storeFileSize", 2000, agg);
+    HELPER.assertGauge(pre + "tableSize", 3000, agg);
+
+    HELPER.assertGauge(pre + "regionCount", 11, agg);
+    HELPER.assertGauge(pre + "storeCount", 22, agg);
+    HELPER.assertGauge(pre + "storeFileCount", 33, agg);
+    HELPER.assertGauge(pre + "maxStoreFileAge", 44, agg);
+    HELPER.assertGauge(pre + "minStoreFileAge", 55, agg);
+    HELPER.assertGauge(pre + "avgStoreFileAge", 66, agg);
+    HELPER.assertGauge(pre + "numReferenceFiles", 77, agg);
+    HELPER.assertGauge(pre + "averageRegionSize", 88, agg);
+  }
+
+  public void testFlush() {
+    rsm.updateFlush(tableName, 1, 2, 3);
+    HELPER.assertCounter(pre + "flushTime_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "flushMemstoreSize_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "flushOutputSize_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "flushedMemstoreBytes", 2, agg);
+    HELPER.assertCounter(pre + "flushedOutputBytes", 3, agg);
+
+    rsm.updateFlush(tableName, 10, 20, 30);
+    HELPER.assertCounter(pre + "flushTime_num_ops", 2, agg);
+    HELPER.assertCounter(pre + "flushMemstoreSize_num_ops", 2, agg);
+    HELPER.assertCounter(pre + "flushOutputSize_num_ops", 2, agg);
+    HELPER.assertCounter(pre + "flushedMemstoreBytes", 22, agg);
+    HELPER.assertCounter(pre + "flushedOutputBytes", 33, agg);
+  }
+
+  @Test
+  public void testCompaction() {
+    rsm.updateCompaction(tableName, false, 1, 2, 3, 4, 5);
+    HELPER.assertCounter(pre + "compactionTime_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "compactionInputFileCount_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "compactionInputSize_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "compactionOutputFileCount_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "compactedInputBytes", 4, agg);
+    HELPER.assertCounter(pre + "compactedoutputBytes", 5, agg);
+
+    rsm.updateCompaction(tableName, false, 10, 20, 30, 40, 50);
+    HELPER.assertCounter(pre + "compactionTime_num_ops", 2, agg);
+    HELPER.assertCounter(pre + "compactionInputFileCount_num_ops", 2, agg);
+    HELPER.assertCounter(pre + "compactionInputSize_num_ops", 2, agg);
+    HELPER.assertCounter(pre + "compactionOutputFileCount_num_ops", 2, agg);
+    HELPER.assertCounter(pre + "compactedInputBytes", 44, agg);
+    HELPER.assertCounter(pre + "compactedoutputBytes", 55, agg);
+
+    // do major compaction
+    rsm.updateCompaction(tableName, true, 100, 200, 300, 400, 500);
+
+    HELPER.assertCounter(pre + "compactionTime_num_ops", 3, agg);
+    HELPER.assertCounter(pre + "compactionInputFileCount_num_ops", 3, agg);
+    HELPER.assertCounter(pre + "compactionInputSize_num_ops", 3, agg);
+    HELPER.assertCounter(pre + "compactionOutputFileCount_num_ops", 3, agg);
+    HELPER.assertCounter(pre + "compactedInputBytes", 444, agg);
+    HELPER.assertCounter(pre + "compactedoutputBytes", 555, agg);
+
+    HELPER.assertCounter(pre + "majorCompactionTime_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "majorCompactionInputFileCount_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "majorCompactionInputSize_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "majorCompactionOutputFileCount_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "majorCompactedInputBytes", 400, agg);
+    HELPER.assertCounter(pre + "majorCompactedoutputBytes", 500, agg);
+  }
+
 }


[3/3] hbase git commit: HBASE-15728 Add Per-Table metrics back

Posted by ap...@apache.org.
HBASE-15728 Add Per-Table metrics back

Signed-off-by: Andrew Purtell <ap...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fb74f215
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fb74f215
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fb74f215

Branch: refs/heads/branch-1
Commit: fb74f215b453ba3ecebe1254a9b56602b1537110
Parents: 492d69d
Author: Xu Cang <xc...@salesforce.com>
Authored: Tue Aug 28 17:27:22 2018 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Wed Aug 29 16:36:45 2018 -0700

----------------------------------------------------------------------
 .../regionserver/MetricsRegionServerSource.java |   3 +
 .../regionserver/MetricsRegionWrapper.java      |   2 +
 .../MetricsTableAggregateSource.java            |   8 +-
 .../hbase/regionserver/MetricsTableSource.java  |  88 +++++-
 .../MetricsTableWrapperAggregate.java           |  59 +++-
 .../MetricsRegionServerSourceImpl.java          |   3 +-
 .../MetricsTableAggregateSourceImpl.java        |  43 ++-
 .../regionserver/MetricsTableSourceImpl.java    | 285 +++++++++++++++++--
 .../regionserver/MetricsTableWrapperStub.java   | 107 +++++++
 .../TestMetricsRegionSourceImpl.java            |   5 +
 .../TestMetricsTableSourceImpl.java             |  54 +---
 .../hadoop/hbase/regionserver/HRegion.java      |   4 +-
 .../hbase/regionserver/HRegionServer.java       |   2 +-
 .../hadoop/hbase/regionserver/HStore.java       |   9 +-
 .../hbase/regionserver/MetricsRegionServer.java |  31 +-
 .../regionserver/MetricsRegionWrapperImpl.java  |   5 +
 .../hadoop/hbase/regionserver/MetricsTable.java |  54 +++-
 .../MetricsTableWrapperAggregateImpl.java       | 224 +++++++++------
 .../regionserver/MetricsRegionWrapperStub.java  |   5 +
 .../regionserver/MetricsTableWrapperStub.java   |  62 ----
 .../regionserver/TestMetricsRegionServer.java   |  12 +-
 .../regionserver/TestMetricsTableAggregate.java | 117 ++++++--
 22 files changed, 902 insertions(+), 280 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/fb74f215/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
index affd267..5feb241 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
@@ -248,6 +248,9 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo
   String TOTAL_ROW_ACTION_REQUEST_COUNT_DESC =
       "Total number of region requests this RegionServer has answered, count by row-level action";
   String READ_REQUEST_COUNT = "readRequestCount";
+  String FILTERED_READ_REQUEST_COUNT = "filteredReadRequestCount";
+  String FILTERED_READ_REQUEST_COUNT_DESC =
+      "Number of read requests this region server has answered.";
   String READ_REQUEST_COUNT_DESC =
       "Number of read requests this region server has answered.";
   String WRITE_REQUEST_COUNT = "writeRequestCount";

http://git-wip-us.apache.org/repos/asf/hbase/blob/fb74f215/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java
index 9a725cd..850f82c 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java
@@ -94,6 +94,8 @@ public interface MetricsRegionWrapper {
    */
   long getWriteRequestCount();
 
+  long getTotalRequestCount();
+
   long getNumFilesCompacted();
 
   long getNumBytesCompacted();

http://git-wip-us.apache.org/repos/asf/hbase/blob/fb74f215/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSource.java
index 6ee0c3d..988d945 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSource.java
@@ -50,18 +50,16 @@ public interface MetricsTableAggregateSource extends BaseSource {
   String NUMBER_OF_TABLES_DESC = "Number of tables in the metrics system";
 
   /**
-   * Register a MetricsTableSource as being open.
-   *
+   * Returns MetricsTableSource registered for the table. Creates one if not defined.
    * @param table The table name
-   * @param source the source for the table being opened.
    */
-  void register(String table, MetricsTableSource source);
+  MetricsTableSource getOrCreateTableSource(String table, MetricsTableWrapperAggregate wrapper);
 
   /**
    * Remove a table's source. This is called when regions of a table are closed.
    *
    * @param table The table name
    */
-  void deregister(String table);
+  void deleteTableSource(String table);
 
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/fb74f215/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSource.java
index 7d1f5d0..0446ee5 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSource.java
@@ -18,22 +18,14 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
+import java.io.Closeable;
+
 /**
  * This interface will be implemented to allow region server to push table metrics into
  * MetricsRegionAggregateSource that will in turn push data to the Hadoop metrics system.
  */
-public interface MetricsTableSource extends Comparable<MetricsTableSource> {
-
-  String READ_REQUEST_COUNT = "readRequestCount";
-  String READ_REQUEST_COUNT_DESC = "Number fo read requests";
-  String WRITE_REQUEST_COUNT = "writeRequestCount";
-  String WRITE_REQUEST_COUNT_DESC = "Number fo write requests";
-  String TOTAL_REQUEST_COUNT = "totalRequestCount";
-  String TOTAL_REQUEST_COUNT_DESC = "Number fo total requests";
-  String MEMSTORE_SIZE = "memstoreSize";
-  String MEMSTORE_SIZE_DESC = "The size of memory stores";
-  String STORE_FILE_SIZE = "storeFileSize";
-  String STORE_FILE_SIZE_DESC = "The size of store files size";
+public interface MetricsTableSource extends Comparable<MetricsTableSource>, Closeable {
+
   String TABLE_SIZE = "tableSize";
   String TABLE_SIZE_DESC = "Total size of the table in the region server";
 
@@ -42,11 +34,83 @@ public interface MetricsTableSource extends Comparable<MetricsTableSource> {
   /**
    * Close the table's metrics as all the region are closing.
    */
+  @Override
   void close();
 
+  void registerMetrics();
+
   /**
    * Get the aggregate source to which this reports.
    */
   MetricsTableAggregateSource getAggregateSource();
 
+  /**
+   * Update the split transaction time histogram
+   * @param t time it took, in milliseconds
+   */
+  void updateSplitTime(long t);
+
+  /**
+   * Increment number of a requested splits
+   */
+  void incrSplitRequest();
+
+  /**
+   * Increment number of successful splits
+   */
+  void incrSplitSuccess();
+
+  /**
+   * Update the flush time histogram
+   * @param t time it took, in milliseconds
+   */
+  void updateFlushTime(long t);
+
+  /**
+   * Update the flush memstore size histogram
+   * @param bytes the number of bytes in the memstore
+   */
+  void updateFlushMemstoreSize(long bytes);
+
+  /**
+   * Update the flush output file size histogram
+   * @param bytes the number of bytes in the output file
+   */
+  void updateFlushOutputSize(long bytes);
+
+  /**
+   * Update the compaction time histogram, both major and minor
+   * @param isMajor whether compaction is a major compaction
+   * @param t time it took, in milliseconds
+   */
+  void updateCompactionTime(boolean isMajor, long t);
+
+  /**
+   * Update the compaction input number of files histogram
+   * @param isMajor whether compaction is a major compaction
+   * @param c number of files
+   */
+  void updateCompactionInputFileCount(boolean isMajor, long c);
+
+  /**
+   * Update the compaction total input file size histogram
+   * @param isMajor whether compaction is a major compaction
+   * @param bytes the number of bytes of the compaction input file
+   */
+  void updateCompactionInputSize(boolean isMajor, long bytes);
+
+  /**
+   * Update the compaction output number of files histogram
+   * @param isMajor whether compaction is a major compaction
+   * @param c number of files
+   */
+  void updateCompactionOutputFileCount(boolean isMajor, long c);
+
+  /**
+   * Update the compaction total output file size
+   * @param isMajor whether compaction is a major compaction
+   * @param bytes the number of bytes of the compaction input file
+   */
+  void updateCompactionOutputSize(boolean isMajor, long bytes);
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/fb74f215/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregate.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregate.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregate.java
index 85ea4f6..4e8988b 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregate.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregate.java
@@ -28,12 +28,17 @@ public interface MetricsTableWrapperAggregate {
   /**
    * Get the number of read requests that have been issued against this table
    */
-  long getReadRequestsCount(String table);
+  long getReadRequestCount(String table);
+
+  /**
+   * Get the total number of filtered read requests that have been issued against this table
+   */
+  long getFilteredReadRequestCount(String table);
 
   /**
    * Get the number of write requests that have been issued against this table
    */
-  long getWriteRequestsCount(String table);
+  long getWriteRequestCount(String table);
 
   /**
    * Get the total number of requests that have been issued against this table
@@ -41,17 +46,57 @@ public interface MetricsTableWrapperAggregate {
   long getTotalRequestsCount(String table);
 
   /**
-   * Get the memory store size against this table
+   * Get the memory store size for this table
    */
-  long getMemstoresSize(String table);
+  long getMemstoreSize(String table);
 
   /**
-   * Get the store file size against this table
+   * Get the store file size for this table
    */
-  long getStoreFilesSize(String table);
+  long getStoreFileSize(String table);
 
   /**
-   * Get the table region size against this table
+   * Get the table region size for this table
    */
   long getTableSize(String table);
+
+  /**
+   * Get the average region size for this table
+   */
+  long getAvgRegionSize(String table);
+
+  /**
+   * Get the number of regions hosted on for this table
+   */
+  long getNumRegions(String table);
+
+  /**
+   * Get the number of stores hosted on for this table
+   */
+  long getNumStores(String table);
+
+  /**
+   * Get the number of store files hosted for this table
+   */
+  long getNumStoreFiles(String table);
+
+  /**
+   * @return Max age of store files for this table
+   */
+  long getMaxStoreFileAge(String table);
+
+  /**
+   * @return Min age of store files for this table
+   */
+  long getMinStoreFileAge(String table);
+
+  /**
+   *  @return Average age of store files for this table
+   */
+  long getAvgStoreFileAge(String table);
+
+  /**
+   *  @return Number of reference files for this table
+   */
+  long getNumReferenceFiles(String table);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/fb74f215/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
index 8833026..d595e85 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
@@ -54,9 +54,10 @@ public class MetricsRegionServerSourceImpl
   private final MutableFastCounter slowGet;
   private final MutableFastCounter slowIncrement;
   private final MutableFastCounter slowAppend;
+
+  // split related metrics
   private final MutableFastCounter splitRequest;
   private final MutableFastCounter splitSuccess;
-
   private final MetricHistogram splitTimeHisto;
 
   // flush related metrics

http://git-wip-us.apache.org/repos/asf/hbase/blob/fb74f215/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java
index b46a21f..43186b8 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableAggregateSourceImpl.java
@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
 import org.apache.hadoop.hbase.metrics.Interns;
@@ -46,22 +47,46 @@ implements MetricsTableAggregateSource {
     super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
   }
 
-  @Override
-  public void register(String table, MetricsTableSource source) {
-    tableSources.put(table, source);
+  private void register(MetricsTableSource source) {
+    synchronized (this) {
+      source.registerMetrics();
+    }
   }
 
   @Override
-  public void deregister(String table) {
+  public void deleteTableSource(String table) {
     try {
-      tableSources.remove(table);
+      synchronized (this) {
+        MetricsTableSource source = tableSources.remove(table);
+        if (source != null) {
+          source.close();
+        }
+      }
     } catch (Exception e) {
       // Ignored. If this errors out it means that someone is double
-      // closing the region source and the region is already nulled out.
-      LOG.info(
-        "Error trying to remove " + table + " from " + this.getClass().getSimpleName(),
-        e);
+      // closing the user source and the user metrics is already nulled out.
+      LOG.info("Error trying to remove " + table + " from " + getClass().getSimpleName(), e);
+    }
+  }
+
+  @Override
+  public MetricsTableSource getOrCreateTableSource(String table,
+      MetricsTableWrapperAggregate wrapper) {
+    MetricsTableSource source = tableSources.get(table);
+    if (source != null) {
+      return source;
+    }
+    source = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
+      .createTable(table, wrapper);
+    MetricsTableSource prev = tableSources.putIfAbsent(table, source);
+
+    if (prev != null) {
+      return prev;
+    } else {
+      // register the new metrics now
+      register(source);
     }
+    return source;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/fb74f215/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSourceImpl.java
index 5d1dd79..e97e887 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableSourceImpl.java
@@ -25,8 +25,54 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.metrics.Interns;
+import org.apache.hadoop.metrics2.MetricHistogram;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableFastCounter;
+
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTED_INPUT_BYTES;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTED_INPUT_BYTES_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTED_OUTPUT_BYTES;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTED_OUTPUT_BYTES_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTION_INPUT_FILE_COUNT;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTION_INPUT_FILE_COUNT_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTION_INPUT_SIZE;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTION_INPUT_SIZE_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTION_OUTPUT_FILE_COUNT;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTION_OUTPUT_FILE_COUNT_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTION_OUTPUT_SIZE;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTION_OUTPUT_SIZE_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTION_TIME;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.COMPACTION_TIME_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.FLUSHED_MEMSTORE_BYTES;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.FLUSHED_MEMSTORE_BYTES_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.FLUSHED_OUTPUT_BYTES;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.FLUSHED_OUTPUT_BYTES_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.FLUSH_MEMSTORE_SIZE;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.FLUSH_MEMSTORE_SIZE_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.FLUSH_OUTPUT_SIZE;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.FLUSH_OUTPUT_SIZE_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.FLUSH_TIME;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.FLUSH_TIME_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTED_INPUT_BYTES;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTED_INPUT_BYTES_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTED_OUTPUT_BYTES;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTED_OUTPUT_BYTES_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTION_INPUT_FILE_COUNT;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTION_INPUT_FILE_COUNT_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTION_INPUT_SIZE;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTION_INPUT_SIZE_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTION_OUTPUT_FILE_COUNT;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTION_OUTPUT_FILE_COUNT_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTION_OUTPUT_SIZE;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTION_OUTPUT_SIZE_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTION_TIME;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.MAJOR_COMPACTION_TIME_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_KEY;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_REQUEST_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_REQUEST_KEY;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_SUCCESS_DESC;
+import static org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource.SPLIT_SUCCESS_KEY;
 
 @InterfaceAudience.Private
 public class MetricsTableSourceImpl implements MetricsTableSource {
@@ -46,12 +92,41 @@ public class MetricsTableSourceImpl implements MetricsTableSource {
   private final TableName tableName;
   private final int hashCode;
 
+  // split related metrics
+  private MutableFastCounter splitRequest;
+  private MutableFastCounter splitSuccess;
+  private MetricHistogram splitTimeHisto;
+
+  // flush related metrics
+  private MetricHistogram flushTimeHisto;
+  private MetricHistogram flushMemstoreSizeHisto;
+  private MetricHistogram flushOutputSizeHisto;
+  private MutableFastCounter flushedMemstoreBytes;
+  private MutableFastCounter flushedOutputBytes;
+
+  // compaction related metrics
+  private MetricHistogram compactionTimeHisto;
+  private MetricHistogram compactionInputFileCountHisto;
+  private MetricHistogram compactionInputSizeHisto;
+  private MetricHistogram compactionOutputFileCountHisto;
+  private MetricHistogram compactionOutputSizeHisto;
+  private MutableFastCounter compactedInputBytes;
+  private MutableFastCounter compactedOutputBytes;
+
+  private MetricHistogram majorCompactionTimeHisto;
+  private MetricHistogram majorCompactionInputFileCountHisto;
+  private MetricHistogram majorCompactionInputSizeHisto;
+  private MetricHistogram majorCompactionOutputFileCountHisto;
+  private MetricHistogram majorCompactionOutputSizeHisto;
+  private MutableFastCounter majorCompactedInputBytes;
+  private MutableFastCounter majorCompactedOutputBytes;
+
   public MetricsTableSourceImpl(String tblName,
       MetricsTableAggregateSourceImpl aggregate, MetricsTableWrapperAggregate tblWrapperAgg) {
     LOG.debug("Creating new MetricsTableSourceImpl for table ");
     this.tableName = TableName.valueOf(tblName);
     this.agg = aggregate;
-    agg.register(tblName, this);
+
     this.tableWrapperAgg = tblWrapperAgg;
     this.registry = agg.getMetricsRegistry();
     this.tableNamePrefix = "Namespace_" + this.tableName.getNamespaceAsString() +
@@ -60,6 +135,79 @@ public class MetricsTableSourceImpl implements MetricsTableSource {
   }
 
   @Override
+  public synchronized void registerMetrics() {
+    flushTimeHisto = registry.newTimeHistogram(tableNamePrefix + FLUSH_TIME, FLUSH_TIME_DESC);
+    flushMemstoreSizeHisto =
+        registry.newSizeHistogram(tableNamePrefix + FLUSH_MEMSTORE_SIZE, FLUSH_MEMSTORE_SIZE_DESC);
+    flushOutputSizeHisto =
+        registry.newSizeHistogram(tableNamePrefix + FLUSH_OUTPUT_SIZE, FLUSH_OUTPUT_SIZE_DESC);
+    flushedOutputBytes =
+        registry.newCounter(tableNamePrefix + FLUSHED_OUTPUT_BYTES, FLUSHED_OUTPUT_BYTES_DESC, 0L);
+    flushedMemstoreBytes = registry.newCounter(tableNamePrefix + FLUSHED_MEMSTORE_BYTES,
+      FLUSHED_MEMSTORE_BYTES_DESC, 0L);
+
+    compactionTimeHisto =
+        registry.newTimeHistogram(tableNamePrefix + COMPACTION_TIME, COMPACTION_TIME_DESC);
+    compactionInputFileCountHisto = registry.newHistogram(
+      tableNamePrefix + COMPACTION_INPUT_FILE_COUNT, COMPACTION_INPUT_FILE_COUNT_DESC);
+    compactionInputSizeHisto = registry.newSizeHistogram(tableNamePrefix + COMPACTION_INPUT_SIZE,
+      COMPACTION_INPUT_SIZE_DESC);
+    compactionOutputFileCountHisto = registry.newHistogram(
+      tableNamePrefix + COMPACTION_OUTPUT_FILE_COUNT, COMPACTION_OUTPUT_FILE_COUNT_DESC);
+    compactionOutputSizeHisto = registry.newSizeHistogram(tableNamePrefix + COMPACTION_OUTPUT_SIZE,
+      COMPACTION_OUTPUT_SIZE_DESC);
+    compactedInputBytes = registry.newCounter(tableNamePrefix + COMPACTED_INPUT_BYTES,
+      COMPACTED_INPUT_BYTES_DESC, 0L);
+    compactedOutputBytes = registry.newCounter(tableNamePrefix + COMPACTED_OUTPUT_BYTES,
+      COMPACTED_OUTPUT_BYTES_DESC, 0L);
+
+    majorCompactionTimeHisto = registry.newTimeHistogram(tableNamePrefix + MAJOR_COMPACTION_TIME,
+      MAJOR_COMPACTION_TIME_DESC);
+    majorCompactionInputFileCountHisto = registry.newHistogram(
+      tableNamePrefix + MAJOR_COMPACTION_INPUT_FILE_COUNT, MAJOR_COMPACTION_INPUT_FILE_COUNT_DESC);
+    majorCompactionInputSizeHisto = registry.newSizeHistogram(
+      tableNamePrefix + MAJOR_COMPACTION_INPUT_SIZE, MAJOR_COMPACTION_INPUT_SIZE_DESC);
+    majorCompactionOutputFileCountHisto =
+        registry.newHistogram(tableNamePrefix + MAJOR_COMPACTION_OUTPUT_FILE_COUNT,
+          MAJOR_COMPACTION_OUTPUT_FILE_COUNT_DESC);
+    majorCompactionOutputSizeHisto = registry.newSizeHistogram(
+      tableNamePrefix + MAJOR_COMPACTION_OUTPUT_SIZE, MAJOR_COMPACTION_OUTPUT_SIZE_DESC);
+    majorCompactedInputBytes = registry.newCounter(tableNamePrefix + MAJOR_COMPACTED_INPUT_BYTES,
+      MAJOR_COMPACTED_INPUT_BYTES_DESC, 0L);
+    majorCompactedOutputBytes = registry.newCounter(tableNamePrefix + MAJOR_COMPACTED_OUTPUT_BYTES,
+      MAJOR_COMPACTED_OUTPUT_BYTES_DESC, 0L);
+
+    splitTimeHisto = registry.newTimeHistogram(tableNamePrefix + SPLIT_KEY);
+    splitRequest = registry.newCounter(tableNamePrefix + SPLIT_REQUEST_KEY, SPLIT_REQUEST_DESC, 0L);
+    splitSuccess = registry.newCounter(tableNamePrefix + SPLIT_SUCCESS_KEY, SPLIT_SUCCESS_DESC, 0L);
+  }
+
+  private void deregisterMetrics() {
+    registry.removeHistogramMetrics(tableNamePrefix + FLUSH_TIME);
+    registry.removeHistogramMetrics(tableNamePrefix + FLUSH_MEMSTORE_SIZE);
+    registry.removeHistogramMetrics(tableNamePrefix + FLUSH_OUTPUT_SIZE);
+    registry.removeMetric(tableNamePrefix + FLUSHED_OUTPUT_BYTES);
+    registry.removeMetric(tableNamePrefix + FLUSHED_MEMSTORE_BYTES);
+    registry.removeHistogramMetrics(tableNamePrefix + COMPACTION_TIME);
+    registry.removeHistogramMetrics(tableNamePrefix + COMPACTION_INPUT_FILE_COUNT);
+    registry.removeHistogramMetrics(tableNamePrefix + COMPACTION_INPUT_SIZE);
+    registry.removeHistogramMetrics(tableNamePrefix + COMPACTION_OUTPUT_FILE_COUNT);
+    registry.removeHistogramMetrics(tableNamePrefix + COMPACTION_OUTPUT_SIZE);
+    registry.removeMetric(tableNamePrefix + COMPACTED_INPUT_BYTES);
+    registry.removeMetric(tableNamePrefix + COMPACTED_OUTPUT_BYTES);
+    registry.removeHistogramMetrics(tableNamePrefix + MAJOR_COMPACTION_TIME);
+    registry.removeHistogramMetrics(tableNamePrefix + MAJOR_COMPACTION_INPUT_FILE_COUNT);
+    registry.removeHistogramMetrics(tableNamePrefix + MAJOR_COMPACTION_INPUT_SIZE);
+    registry.removeHistogramMetrics(tableNamePrefix + MAJOR_COMPACTION_OUTPUT_FILE_COUNT);
+    registry.removeHistogramMetrics(tableNamePrefix + MAJOR_COMPACTION_OUTPUT_SIZE);
+    registry.removeMetric(tableNamePrefix + MAJOR_COMPACTED_INPUT_BYTES);
+    registry.removeMetric(tableNamePrefix + MAJOR_COMPACTED_OUTPUT_BYTES);
+    registry.removeHistogramMetrics(tableNamePrefix + SPLIT_KEY);
+    registry.removeMetric(tableNamePrefix + SPLIT_REQUEST_KEY);
+    registry.removeMetric(tableNamePrefix + SPLIT_SUCCESS_KEY);
+  }
+
+  @Override
   public void close() {
     boolean wasClosed = closed.getAndSet(true);
 
@@ -70,7 +218,7 @@ public class MetricsTableSourceImpl implements MetricsTableSource {
 
     // Before removing the metrics remove this table from the aggregate table bean.
     // This should mean that it's unlikely that snapshot and close happen at the same time.
-    agg.deregister(tableName.getNameAsString());
+    agg.deleteTableSource(tableName.getNameAsString());
 
     // While it's un-likely that snapshot and close happen at the same time it's still possible.
     // So grab the lock to ensure that all calls to snapshot are done before we remove the metrics
@@ -78,6 +226,7 @@ public class MetricsTableSourceImpl implements MetricsTableSource {
       if (LOG.isTraceEnabled()) {
         LOG.trace("Removing table Metrics for table ");
       }
+      deregisterMetrics();
       tableWrapperAgg = null;
     }
   }
@@ -122,24 +271,52 @@ public class MetricsTableSourceImpl implements MetricsTableSource {
       }
 
       if (this.tableWrapperAgg != null) {
-        mrb.addCounter(Interns.info(tableNamePrefix + MetricsTableSource.READ_REQUEST_COUNT,
-          MetricsTableSource.READ_REQUEST_COUNT_DESC),
-          tableWrapperAgg.getReadRequestsCount(tableName.getNameAsString()));
-        mrb.addCounter(Interns.info(tableNamePrefix + MetricsTableSource.WRITE_REQUEST_COUNT,
-          MetricsTableSource.WRITE_REQUEST_COUNT_DESC),
-          tableWrapperAgg.getWriteRequestsCount(tableName.getNameAsString()));
-        mrb.addCounter(Interns.info(tableNamePrefix + MetricsTableSource.TOTAL_REQUEST_COUNT,
-          MetricsTableSource.TOTAL_REQUEST_COUNT_DESC),
+        mrb.addCounter(Interns.info(tableNamePrefix + MetricsRegionServerSource.READ_REQUEST_COUNT,
+          MetricsRegionServerSource.READ_REQUEST_COUNT_DESC),
+          tableWrapperAgg.getReadRequestCount(tableName.getNameAsString()));
+        mrb.addCounter(
+          Interns.info(tableNamePrefix + MetricsRegionServerSource.FILTERED_READ_REQUEST_COUNT,
+          MetricsRegionServerSource.FILTERED_READ_REQUEST_COUNT_DESC),
+          tableWrapperAgg.getFilteredReadRequestCount(tableName.getNameAsString()));
+        mrb.addCounter(Interns.info(tableNamePrefix + MetricsRegionServerSource.WRITE_REQUEST_COUNT,
+          MetricsRegionServerSource.WRITE_REQUEST_COUNT_DESC),
+          tableWrapperAgg.getWriteRequestCount(tableName.getNameAsString()));
+        mrb.addCounter(Interns.info(tableNamePrefix + MetricsRegionServerSource.TOTAL_REQUEST_COUNT,
+          MetricsRegionServerSource.TOTAL_REQUEST_COUNT_DESC),
           tableWrapperAgg.getTotalRequestsCount(tableName.getNameAsString()));
-        mrb.addGauge(Interns.info(tableNamePrefix + MetricsTableSource.MEMSTORE_SIZE,
-          MetricsTableSource.MEMSTORE_SIZE_DESC),
-          tableWrapperAgg.getMemstoresSize(tableName.getNameAsString()));
-        mrb.addGauge(Interns.info(tableNamePrefix + MetricsTableSource.STORE_FILE_SIZE,
-          MetricsTableSource.STORE_FILE_SIZE_DESC),
-          tableWrapperAgg.getStoreFilesSize(tableName.getNameAsString()));
+        mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.MEMSTORE_SIZE,
+          MetricsRegionServerSource.MEMSTORE_SIZE_DESC),
+          tableWrapperAgg.getMemstoreSize(tableName.getNameAsString()));
+        mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.STOREFILE_COUNT,
+          MetricsRegionServerSource.STOREFILE_COUNT_DESC),
+          tableWrapperAgg.getNumStoreFiles(tableName.getNameAsString()));
+        mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.STOREFILE_SIZE,
+          MetricsRegionServerSource.STOREFILE_SIZE_DESC),
+          tableWrapperAgg.getStoreFileSize(tableName.getNameAsString()));
         mrb.addGauge(Interns.info(tableNamePrefix + MetricsTableSource.TABLE_SIZE,
           MetricsTableSource.TABLE_SIZE_DESC),
           tableWrapperAgg.getTableSize(tableName.getNameAsString()));
+        mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.AVERAGE_REGION_SIZE,
+          MetricsRegionServerSource.AVERAGE_REGION_SIZE_DESC),
+          tableWrapperAgg.getAvgRegionSize(tableName.getNameAsString()));
+        mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.REGION_COUNT,
+          MetricsRegionServerSource.REGION_COUNT_DESC),
+          tableWrapperAgg.getNumRegions(tableName.getNameAsString()));
+        mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.STORE_COUNT,
+          MetricsRegionServerSource.STORE_COUNT_DESC),
+          tableWrapperAgg.getNumStores(tableName.getNameAsString()));
+        mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.MAX_STORE_FILE_AGE,
+          MetricsRegionServerSource.MAX_STORE_FILE_AGE_DESC),
+          tableWrapperAgg.getMaxStoreFileAge(tableName.getNameAsString()));
+        mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.MIN_STORE_FILE_AGE,
+          MetricsRegionServerSource.MIN_STORE_FILE_AGE_DESC),
+          tableWrapperAgg.getMinStoreFileAge(tableName.getNameAsString()));
+        mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.AVG_STORE_FILE_AGE,
+          MetricsRegionServerSource.AVG_STORE_FILE_AGE_DESC),
+          tableWrapperAgg.getAvgStoreFileAge(tableName.getNameAsString()));
+        mrb.addGauge(Interns.info(tableNamePrefix + MetricsRegionServerSource.NUM_REFERENCE_FILES,
+          MetricsRegionServerSource.NUM_REFERENCE_FILES_DESC),
+          tableWrapperAgg.getNumReferenceFiles(tableName.getNameAsString()));
       }
     }
   }
@@ -168,4 +345,80 @@ public class MetricsTableSourceImpl implements MetricsTableSource {
   public String getTableNamePrefix() {
     return tableNamePrefix;
   }
+
+  @Override
+  public void incrSplitRequest() {
+    splitRequest.incr();
+  }
+
+  @Override
+  public void incrSplitSuccess() {
+    splitSuccess.incr();
+  }
+
+  @Override
+  public void updateSplitTime(long t) {
+    splitTimeHisto.add(t);
+  }
+
+  @Override
+  public void updateFlushTime(long t) {
+    flushTimeHisto.add(t);
+  }
+
+  @Override
+  public void updateFlushMemstoreSize(long bytes) {
+    flushMemstoreSizeHisto.add(bytes);
+    flushedMemstoreBytes.incr(bytes);
+  }
+
+  @Override
+  public void updateFlushOutputSize(long bytes) {
+    flushOutputSizeHisto.add(bytes);
+    flushedOutputBytes.incr(bytes);
+  }
+
+  @Override
+  public void updateCompactionTime(boolean isMajor, long t) {
+    compactionTimeHisto.add(t);
+    if (isMajor) {
+      majorCompactionTimeHisto.add(t);
+    }
+  }
+
+  @Override
+  public void updateCompactionInputFileCount(boolean isMajor, long c) {
+    compactionInputFileCountHisto.add(c);
+    if (isMajor) {
+      majorCompactionInputFileCountHisto.add(c);
+    }
+  }
+
+  @Override
+  public void updateCompactionInputSize(boolean isMajor, long bytes) {
+    compactionInputSizeHisto.add(bytes);
+    compactedInputBytes.incr(bytes);
+    if (isMajor) {
+      majorCompactionInputSizeHisto.add(bytes);
+      majorCompactedInputBytes.incr(bytes);
+    }
+  }
+
+  @Override
+  public void updateCompactionOutputFileCount(boolean isMajor, long c) {
+    compactionOutputFileCountHisto.add(c);
+    if (isMajor) {
+      majorCompactionOutputFileCountHisto.add(c);
+    }
+  }
+
+  @Override
+  public void updateCompactionOutputSize(boolean isMajor, long bytes) {
+    compactionOutputSizeHisto.add(bytes);
+    compactedOutputBytes.incr(bytes);
+    if (isMajor) {
+      majorCompactionOutputSizeHisto.add(bytes);
+      majorCompactedOutputBytes.incr(bytes);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/fb74f215/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java
new file mode 100644
index 0000000..83eb3eb
--- /dev/null
+++ b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+public class MetricsTableWrapperStub implements MetricsTableWrapperAggregate {
+
+  private String tableName;
+
+  public MetricsTableWrapperStub(String tableName) {
+    this.tableName = tableName;
+  }
+
+  @Override
+  public long getReadRequestCount(String table) {
+    return 10;
+  }
+
+  @Override
+  public long getWriteRequestCount(String table) {
+    return 20;
+  }
+
+  @Override
+  public long getTotalRequestsCount(String table) {
+    return 30;
+  }
+
+  @Override
+  public long getFilteredReadRequestCount(String table) {
+    return 40;
+  }
+
+  @Override
+  public long getMemstoreSize(String table) {
+    return 1000;
+  }
+
+  @Override
+  public long getStoreFileSize(String table) {
+    return 2000;
+  }
+
+  @Override
+  public long getTableSize(String table) {
+    return 3000;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  @Override
+  public long getNumRegions(String table) {
+    return 11;
+  }
+
+  @Override
+  public long getNumStores(String table) {
+    return 22;
+  }
+
+  @Override
+  public long getNumStoreFiles(String table) {
+    return 33;
+  }
+
+  @Override
+  public long getMaxStoreFileAge(String table) {
+    return 44;
+  }
+
+  @Override
+  public long getMinStoreFileAge(String table) {
+    return 55;
+  }
+
+  @Override
+  public long getAvgStoreFileAge(String table) {
+    return 66;
+  }
+
+  @Override
+  public long getNumReferenceFiles(String table) {
+    return 77;
+  }
+
+  @Override
+  public long getAvgRegionSize(String table) {
+    return 88;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fb74f215/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java
index b39c467..17f769b 100644
--- a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java
+++ b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java
@@ -191,5 +191,10 @@ public class TestMetricsRegionSourceImpl {
     public long getMaxFlushQueueSize() {
       return 0;
     }
+
+    @Override
+    public long getTotalRequestCount() {
+      return 0;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/fb74f215/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableSourceImpl.java b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableSourceImpl.java
index 3a9e73d..7a70da1 100644
--- a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableSourceImpl.java
+++ b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableSourceImpl.java
@@ -40,9 +40,12 @@ public class TestMetricsTableSourceImpl {
     MetricsRegionServerSourceFactory metricsFact =
         CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class);
 
-    MetricsTableSource one = metricsFact.createTable("ONETABLE", new TableWrapperStub("ONETABLE"));
-    MetricsTableSource oneClone = metricsFact.createTable("ONETABLE", new TableWrapperStub("ONETABLE"));
-    MetricsTableSource two = metricsFact.createTable("TWOTABLE", new TableWrapperStub("TWOTABLE"));
+    MetricsTableSource one = metricsFact.createTable(
+      "ONETABLE", new MetricsTableWrapperStub("ONETABLE"));
+    MetricsTableSource oneClone = metricsFact.createTable(
+      "ONETABLE", new MetricsTableWrapperStub("ONETABLE"));
+    MetricsTableSource two = metricsFact.createTable(
+      "TWOTABLE", new MetricsTableWrapperStub("TWOTABLE"));
 
     assertEquals(0, one.compareTo(oneClone));
     assertEquals(one.hashCode(), oneClone.hashCode());
@@ -65,50 +68,7 @@ public class TestMetricsTableSourceImpl {
   public void testGetTableMetrics() throws Exception{
     MetricsTableSource oneTbl =
         CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
-        .createTable("ONETABLE", new TableWrapperStub("ONETABLE"));
+        .createTable("ONETABLE", new MetricsTableWrapperStub("ONETABLE"));
     assertEquals("ONETABLE", oneTbl.getTableName());
   }
-
-  static class TableWrapperStub implements MetricsTableWrapperAggregate {
-
-    private String tableName;
-
-    public TableWrapperStub(String tableName) {
-      this.tableName = tableName;
-    }
-
-    @Override
-    public long getReadRequestsCount(String table) {
-      return 10;
-    }
-
-    @Override
-    public long getWriteRequestsCount(String table) {
-      return 20;
-    }
-
-    @Override
-    public long getTotalRequestsCount(String table) {
-      return 30;
-    }
-
-    @Override
-    public long getMemstoresSize(String table) {
-      return 1000;
-    }
-
-    @Override
-    public long getStoreFilesSize(String table) {
-      return 2000;
-    }
-
-    @Override
-    public long getTableSize(String table) {
-      return 3000;
-    }
-
-    public String getTableName() {
-      return tableName;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/fb74f215/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index d333619..25f151d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2703,7 +2703,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     status.setStatus(msg);
 
     if (rsServices != null && rsServices.getMetrics() != null) {
-      rsServices.getMetrics().updateFlush(time - startTime,
+      rsServices.getMetrics().updateFlush(
+        getTableDesc().getTableName().getNameAsString(),
+        time - startTime,
         totalFlushableSizeOfFlushableStores, flushedOutputFileSize);
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/fb74f215/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 6c32643..0b4ea6b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1450,7 +1450,7 @@ public class HRegionServer extends HasThread implements
       this.walFactory = setupWALAndReplication();
       // Init in here rather than in constructor after thread name has been set
       this.metricsRegionServer = new MetricsRegionServer(
-          new MetricsRegionServerWrapperImpl(this), conf);
+          new MetricsRegionServerWrapperImpl(this), conf, metricsTable);
       this.metricsTable = new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
       // Now that we have a metrics source, start the pause monitor
       this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource());

http://git-wip-us.apache.org/repos/asf/hbase/blob/fb74f215/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 6ce41e3..e6792e7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1380,9 +1380,10 @@ public class HStore implements Store {
       long now = EnvironmentEdgeManager.currentTime();
       if (region.getRegionServerServices() != null
           && region.getRegionServerServices().getMetrics() != null) {
-        region.getRegionServerServices().getMetrics().updateCompaction(cr.isMajor(),
-          now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(),
-          outputBytes);
+        region.getRegionServerServices().getMetrics().updateCompaction(
+          region.getTableDesc().getTableName().getNameAsString(),
+          cr.isMajor(), now - compactionStartTime, cr.getFiles().size(),
+          newFiles.size(), cr.getSize(), outputBytes);
       }
 
       logCompactionEndMessage(cr, sfs, now, compactionStartTime);
@@ -2252,7 +2253,7 @@ public class HStore implements Store {
       latestTS = (createdTS > latestTS) ? createdTS : latestTS;
     }
     long now = EnvironmentEdgeManager.currentTime();
-    return now - latestTS;
+    return latestTS == 0 ? 0 : now - latestTS;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/fb74f215/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
index f6035c1..9466886 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
@@ -46,15 +46,18 @@ public class MetricsRegionServer {
   private MetricsRegionServerSource serverSource;
   private MetricsRegionServerWrapper regionServerWrapper;
   private RegionServerTableMetrics tableMetrics;
+  private final MetricsTable metricsTable;
 
   private MetricRegistry metricRegistry;
   private Timer bulkLoadTimer;
 
-  public MetricsRegionServer(MetricsRegionServerWrapper regionServerWrapper, Configuration conf) {
+  public MetricsRegionServer(MetricsRegionServerWrapper regionServerWrapper, Configuration conf,
+      MetricsTable metricsTable) {
     this(regionServerWrapper,
         CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
             .createServer(regionServerWrapper),
-        createTableMetrics(conf));
+        createTableMetrics(conf),
+        metricsTable);
 
     // Create hbase-metrics module based metrics. The registry should already be registered by the
     // MetricsRegionServerSource
@@ -66,10 +69,12 @@ public class MetricsRegionServer {
 
   MetricsRegionServer(MetricsRegionServerWrapper regionServerWrapper,
                       MetricsRegionServerSource serverSource,
-                      RegionServerTableMetrics tableMetrics) {
+                      RegionServerTableMetrics tableMetrics,
+                      MetricsTable metricsTable) {
     this.regionServerWrapper = regionServerWrapper;
     this.serverSource = serverSource;
     this.tableMetrics = tableMetrics;
+    this.metricsTable = metricsTable;
   }
 
   /**
@@ -193,19 +198,33 @@ public class MetricsRegionServer {
     serverSource.incrSplitSuccess();
   }
 
-  public void updateFlush(long t, long memstoreSize, long fileSize) {
+  public void updateFlush(String table, long t, long memstoreSize, long fileSize) {
     serverSource.updateFlushTime(t);
     serverSource.updateFlushMemstoreSize(memstoreSize);
     serverSource.updateFlushOutputSize(fileSize);
+
+    if (table != null) {
+      metricsTable.updateFlushTime(table, memstoreSize);
+      metricsTable.updateFlushMemstoreSize(table, memstoreSize);
+      metricsTable.updateFlushOutputSize(table, fileSize);
+    }
   }
 
-  public void updateCompaction(boolean isMajor, long t, int inputFileCount, int outputFileCount,
-      long inputBytes, long outputBytes) {
+  public void updateCompaction(String table, boolean isMajor, long t, int inputFileCount,
+      int outputFileCount, long inputBytes, long outputBytes) {
     serverSource.updateCompactionTime(isMajor, t);
     serverSource.updateCompactionInputFileCount(isMajor, inputFileCount);
     serverSource.updateCompactionOutputFileCount(isMajor, outputFileCount);
     serverSource.updateCompactionInputSize(isMajor, inputBytes);
     serverSource.updateCompactionOutputSize(isMajor, outputBytes);
+
+    if (table != null) {
+      metricsTable.updateCompactionTime(table, isMajor, t);
+      metricsTable.updateCompactionInputFileCount(table, isMajor, inputFileCount);
+      metricsTable.updateCompactionOutputFileCount(table, isMajor, outputFileCount);
+      metricsTable.updateCompactionInputSize(table, isMajor, inputBytes);
+      metricsTable.updateCompactionOutputSize(table, isMajor, outputBytes);
+    }
   }
 
   public void updateBulkLoad(long millis) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/fb74f215/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
index 60085f6..f9923a6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
@@ -155,6 +155,11 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
   }
 
   @Override
+  public long getTotalRequestCount() {
+    return getReadRequestCount() + getWriteRequestCount();
+  }
+
+  @Override
   public long getNumCompactionsFailed() {
     return this.region.compactionsFailed.get();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/fb74f215/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTable.java
index e08fda5..2e906c9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTable.java
@@ -24,19 +24,67 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 @InterfaceAudience.Private
 public class MetricsTable {
   private final MetricsTableAggregateSource tableSourceAgg;
-  private MetricsTableWrapperAggregate tableWrapperAgg;
+  private MetricsTableWrapperAggregate wrapper;
 
   public MetricsTable(final MetricsTableWrapperAggregate wrapper) {
     tableSourceAgg = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
                                              .getTableAggregate();
-    this.tableWrapperAgg = wrapper;
+    this.wrapper = wrapper;
   }
 
   public MetricsTableWrapperAggregate getTableWrapperAgg() {
-    return tableWrapperAgg;
+    return wrapper;
   }
 
   public MetricsTableAggregateSource getTableSourceAgg() {
     return tableSourceAgg;
   }
+
+  public void incrSplitRequest(String table) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper).incrSplitRequest();
+  }
+
+  public void incrSplitSuccess(String table) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper).incrSplitSuccess();
+  }
+
+  public void updateSplitTime(String table, long t) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper).updateSplitTime(t);
+  }
+
+  public void updateFlushTime(String table, long t) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper).updateFlushTime(t);
+  }
+
+  public void updateFlushMemstoreSize(String table, long bytes) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper).updateFlushMemstoreSize(bytes);
+  }
+
+  public void updateFlushOutputSize(String table, long bytes) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper).updateFlushOutputSize(bytes);
+  }
+
+  public void updateCompactionTime(String table, boolean isMajor, long t) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper).updateCompactionTime(isMajor, t);
+  }
+
+  public void updateCompactionInputFileCount(String table, boolean isMajor, long c) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper)
+      .updateCompactionInputFileCount(isMajor, c);
+  }
+
+  public void updateCompactionInputSize(String table, boolean isMajor, long bytes) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper)
+      .updateCompactionInputSize(isMajor, bytes);
+  }
+
+  public void updateCompactionOutputFileCount(String table, boolean isMajor, long c) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper)
+      .updateCompactionOutputFileCount(isMajor, c);
+  }
+
+  public void updateCompactionOutputSize(String table, boolean isMajor, long bytes) {
+    tableSourceAgg.getOrCreateTableSource(table, wrapper)
+      .updateCompactionOutputSize(isMajor, bytes);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/fb74f215/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java
index c5f0f7b..a2a3824 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperAggregateImpl.java
@@ -43,7 +43,8 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr
   private Runnable runnable;
   private long period;
   private ScheduledFuture<?> tableMetricsUpdateTask;
-  private ConcurrentHashMap<TableName, MetricsTableValues> metricsTableMap = new ConcurrentHashMap<>();
+  private ConcurrentHashMap<TableName, MetricsTableValues> metricsTableMap
+    = new ConcurrentHashMap<>();
 
   public MetricsTableWrapperAggregateImpl(final HRegionServer regionServer) {
     this.regionServer = regionServer;
@@ -51,8 +52,8 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr
       HConstants.DEFAULT_REGIONSERVER_METRICS_PERIOD) + 1000;
     this.executor = CompatibilitySingletonFactory.getInstance(MetricsExecutor.class).getExecutor();
     this.runnable = new TableMetricsWrapperRunnable();
-    this.tableMetricsUpdateTask = this.executor.scheduleWithFixedDelay(this.runnable, period, this.period,
-      TimeUnit.MILLISECONDS);
+    this.tableMetricsUpdateTask = this.executor.scheduleWithFixedDelay(this.runnable, period,
+      this.period, TimeUnit.MILLISECONDS);
   }
 
   public class TableMetricsWrapperRunnable implements Runnable {
@@ -62,33 +63,43 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr
       Map<TableName, MetricsTableValues> localMetricsTableMap = new HashMap<>();
 
       for (Region r : regionServer.getOnlineRegionsLocalContext()) {
-        TableName tbl= r.getTableDesc().getTableName();
-        MetricsTableValues metricsTable = localMetricsTableMap.get(tbl);
-        if (metricsTable == null) {
-          metricsTable = new MetricsTableValues();
-          localMetricsTableMap.put(tbl, metricsTable);
+        TableName tbl = r.getTableDesc().getTableName();
+        MetricsTableValues mt = localMetricsTableMap.get(tbl);
+        if (mt == null) {
+          mt = new MetricsTableValues();
+          localMetricsTableMap.put(tbl, mt);
         }
-        long tempStorefilesSize = 0;
-        for (Store store : r.getStores()) {
-          tempStorefilesSize += store.getStorefilesSize();
+
+        if (r.getStores() != null) {
+          for (Store store : r.getStores()) {
+            mt.storeFileCount += store.getStorefilesCount();
+            mt.memstoreSize += store.getMemStoreSize();
+            mt.storeFileSize += store.getStorefilesSize();
+            mt.referenceFileCount += store.getNumReferenceFiles();
+
+            mt.maxStoreFileAge = Math.max(mt.maxStoreFileAge, store.getMaxStoreFileAge());
+            mt.minStoreFileAge = Math.min(mt.minStoreFileAge, store.getMinStoreFileAge());
+            mt.totalStoreFileAge = store.getAvgStoreFileAge() * store.getStorefilesCount();
+            mt.storeCount += 1;
+          }
         }
-        metricsTable.setMemstoresSize(metricsTable.getMemstoresSize() + r.getMemstoreSize());
-        metricsTable.setStoreFilesSize(metricsTable.getStoreFilesSize() + tempStorefilesSize);
-        metricsTable.setTableSize(metricsTable.getMemstoresSize() + metricsTable.getStoreFilesSize());
-        metricsTable.setReadRequestsCount(metricsTable.getReadRequestsCount() + r.getReadRequestsCount());
-        metricsTable.setWriteRequestsCount(metricsTable.getWriteRequestsCount() + r.getWriteRequestsCount());
-        metricsTable.setTotalRequestsCount(metricsTable.getReadRequestsCount() + metricsTable.getWriteRequestsCount());
+
+        mt.regionCount += 1;
+
+        mt.readRequestCount += r.getReadRequestsCount();
+        mt.filteredReadRequestCount += getFilteredReadRequestCount(tbl.getNameAsString());
+        mt.writeRequestCount += r.getWriteRequestsCount();
       }
 
-      for(Map.Entry<TableName, MetricsTableValues> entry : localMetricsTableMap.entrySet()) {
+      for (Map.Entry<TableName, MetricsTableValues> entry : localMetricsTableMap.entrySet()) {
         TableName tbl = entry.getKey();
         if (metricsTableMap.get(tbl) == null) {
-          MetricsTableSource tableSource = CompatibilitySingletonFactory
-              .getInstance(MetricsRegionServerSourceFactory.class).createTable(tbl.getNameAsString(),
-                MetricsTableWrapperAggregateImpl.this);
+          // this will add the Wrapper to the list of TableMetrics
           CompatibilitySingletonFactory
-          .getInstance(MetricsRegionServerSourceFactory.class).getTableAggregate()
-          .register(tbl.getNameAsString(), tableSource);
+              .getInstance(MetricsRegionServerSourceFactory.class)
+              .getTableAggregate()
+              .getOrCreateTableSource(tbl.getNameAsString(), MetricsTableWrapperAggregateImpl.this);
+
         }
         metricsTableMap.put(entry.getKey(), entry.getValue());
       }
@@ -97,7 +108,7 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr
       MetricsTableAggregateSource agg = CompatibilitySingletonFactory
           .getInstance(MetricsRegionServerSourceFactory.class).getTableAggregate();
       for (TableName table : existingTableNames) {
-        agg.deregister(table.getNameAsString());
+        agg.deleteTableSource(table.getNameAsString());
         if (metricsTableMap.get(table) != null) {
           metricsTableMap.remove(table);
         }
@@ -106,120 +117,165 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr
   }
 
   @Override
-  public long getReadRequestsCount(String table) {
+  public long getReadRequestCount(String table) {
     MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
     if (metricsTable == null)
       return 0;
     else
-      return metricsTable.getReadRequestsCount();
+      return metricsTable.readRequestCount;
   }
 
   @Override
-  public long getWriteRequestsCount(String table) {
+  public long getFilteredReadRequestCount(String table) {
     MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
-    if (metricsTable == null)
+    if (metricsTable == null) {
       return 0;
-    else
-      return metricsTable.getWriteRequestsCount();
+    }
+    return metricsTable.filteredReadRequestCount;
+  }
+
+  @Override
+  public long getWriteRequestCount(String table) {
+    MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
+    if (metricsTable == null) {
+      return 0;
+    }
+    return metricsTable.writeRequestCount;
   }
 
   @Override
   public long getTotalRequestsCount(String table) {
     MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
-    if (metricsTable == null)
+    if (metricsTable == null) {
       return 0;
-    else
-      return metricsTable.getTotalRequestsCount();
+    }
+    return metricsTable.readRequestCount + metricsTable.writeRequestCount;
   }
 
   @Override
-  public long getMemstoresSize(String table) {
+  public long getMemstoreSize(String table) {
     MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
-    if (metricsTable == null)
+    if (metricsTable == null) {
       return 0;
-    else
-      return metricsTable.getMemstoresSize();
+    }
+    return metricsTable.memstoreSize;
   }
 
   @Override
-  public long getStoreFilesSize(String table) {
+  public long getStoreFileSize(String table) {
     MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
-    if (metricsTable == null)
+    if (metricsTable == null) {
       return 0;
-    else
-      return metricsTable.getStoreFilesSize();
+    }
+    return metricsTable.storeFileSize;
   }
 
   @Override
   public long getTableSize(String table) {
     MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
-    if (metricsTable == null)
+    if (metricsTable == null) {
       return 0;
-    else
-      return metricsTable.getTableSize();
+    }
+    return metricsTable.memstoreSize + metricsTable.storeFileSize;
   }
 
   @Override
-  public void close() throws IOException {
-    tableMetricsUpdateTask.cancel(true);
+  public long getNumRegions(String table) {
+    MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
+    if (metricsTable == null) {
+      return 0;
+    }
+    return metricsTable.regionCount;
   }
 
-  private static class MetricsTableValues {
-
-    private long totalRequestsCount;
-    private long readRequestsCount;
-    private long writeRequestsCount;
-    private long memstoresSize;
-    private long storeFilesSize;
-    private long tableSize;
-
-    public long getTotalRequestsCount() {
-      return totalRequestsCount;
+  @Override
+  public long getNumStores(String table) {
+    MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
+    if (metricsTable == null) {
+      return 0;
     }
+    return metricsTable.storeCount;
+  }
 
-    public void setTotalRequestsCount(long totalRequestsCount) {
-      this.totalRequestsCount = totalRequestsCount;
+  @Override
+  public long getNumStoreFiles(String table) {
+    MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
+    if (metricsTable == null) {
+      return 0;
     }
+    return metricsTable.storeFileCount;
+  }
 
-    public long getReadRequestsCount() {
-      return readRequestsCount;
+  @Override
+  public long getMaxStoreFileAge(String table) {
+    MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
+    if (metricsTable == null) {
+      return 0;
     }
+    return metricsTable.maxStoreFileAge;
+  }
 
-    public void setReadRequestsCount(long readRequestsCount) {
-      this.readRequestsCount = readRequestsCount;
+  @Override
+  public long getMinStoreFileAge(String table) {
+    MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
+    if (metricsTable == null) {
+      return 0;
     }
 
-    public long getWriteRequestsCount() {
-      return writeRequestsCount;
-    }
+    return metricsTable.minStoreFileAge == Long.MAX_VALUE ? 0 : metricsTable.minStoreFileAge;
+  }
 
-    public void setWriteRequestsCount(long writeRequestsCount) {
-      this.writeRequestsCount = writeRequestsCount;
+  @Override
+  public long getAvgStoreFileAge(String table) {
+    MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
+    if (metricsTable == null) {
+      return 0;
     }
 
-    public long getMemstoresSize() {
-      return memstoresSize;
-    }
+    return metricsTable.storeFileCount == 0
+        ? 0
+        : (metricsTable.totalStoreFileAge / metricsTable.storeFileCount);
+  }
 
-    public void setMemstoresSize(long memstoresSize) {
-      this.memstoresSize = memstoresSize;
+  @Override
+  public long getNumReferenceFiles(String table) {
+    MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
+    if (metricsTable == null) {
+      return 0;
     }
+    return metricsTable.referenceFileCount;
+  }
 
-    public long getStoreFilesSize() {
-      return storeFilesSize;
+  @Override
+  public long getAvgRegionSize(String table) {
+    MetricsTableValues metricsTable = metricsTableMap.get(TableName.valueOf(table));
+    if (metricsTable == null) {
+      return 0;
     }
 
-    public void setStoreFilesSize(long storeFilesSize) {
-      this.storeFilesSize = storeFilesSize;
-    }
+    return metricsTable.regionCount == 0
+        ? 0
+        : (metricsTable.memstoreSize + metricsTable.storeFileSize) / metricsTable.regionCount;
+  }
 
-    public long getTableSize() {
-      return tableSize;
-    }
+  @Override
+  public void close() throws IOException {
+    tableMetricsUpdateTask.cancel(true);
+  }
 
-    public void setTableSize(long tableSize) {
-      this.tableSize = tableSize;
-    }
+  private static class MetricsTableValues {
+    long readRequestCount;
+    long filteredReadRequestCount;
+    long writeRequestCount;
+    long memstoreSize;
+    long regionCount;
+    long storeCount;
+    long storeFileCount;
+    long storeFileSize;
+    long maxStoreFileAge;
+    long minStoreFileAge = Long.MAX_VALUE;
+    long totalStoreFileAge;
+    long referenceFileCount;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/fb74f215/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java
index 4f18144..8714347 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java
@@ -157,4 +157,9 @@ public class MetricsRegionWrapperStub implements MetricsRegionWrapper {
   public long getMaxFlushQueueSize() {
     return 6;
   }
+
+  @Override
+  public long getTotalRequestCount() {
+    return 0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/fb74f215/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java
deleted file mode 100644
index 6fd8dd7..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsTableWrapperStub.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.regionserver;
-
-public class MetricsTableWrapperStub implements MetricsTableWrapperAggregate {
-
-  private String tableName;
-
-  public MetricsTableWrapperStub(String tableName) {
-    this.tableName = tableName;
-  }
-
-  @Override
-  public long getReadRequestsCount(String table) {
-    return 10;
-  }
-
-  @Override
-  public long getWriteRequestsCount(String table) {
-    return 20;
-  }
-
-  @Override
-  public long getTotalRequestsCount(String table) {
-    return 30;
-  }
-
-  @Override
-  public long getMemstoresSize(String table) {
-    return 1000;
-  }
-
-  @Override
-  public long getStoreFilesSize(String table) {
-    return 2000;
-  }
-
-  @Override
-  public long getTableSize(String table) {
-    return 3000;
-  }
-
-  public String getTableName() {
-    return tableName;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fb74f215/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java
index f954393..7807c43 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java
@@ -49,7 +49,7 @@ public class TestMetricsRegionServer {
   @Before
   public void setUp() {
     wrapper = new MetricsRegionServerWrapperStub();
-    rsm = new MetricsRegionServer(wrapper, new Configuration(false));
+    rsm = new MetricsRegionServer(wrapper, new Configuration(false), null);
     serverSource = rsm.getMetricsSource();
   }
 
@@ -175,14 +175,14 @@ public class TestMetricsRegionServer {
 
   @Test
   public void testFlush() {
-    rsm.updateFlush(1, 2, 3);
+    rsm.updateFlush(null, 1, 2, 3);
     HELPER.assertCounter("flushTime_num_ops", 1, serverSource);
     HELPER.assertCounter("flushMemstoreSize_num_ops", 1, serverSource);
     HELPER.assertCounter("flushOutputSize_num_ops", 1, serverSource);
     HELPER.assertCounter("flushedMemstoreBytes", 2, serverSource);
     HELPER.assertCounter("flushedOutputBytes", 3, serverSource);
 
-    rsm.updateFlush(10, 20, 30);
+    rsm.updateFlush(null, 10, 20, 30);
     HELPER.assertCounter("flushTimeNumOps", 2, serverSource);
     HELPER.assertCounter("flushMemstoreSize_num_ops", 2, serverSource);
     HELPER.assertCounter("flushOutputSize_num_ops", 2, serverSource);
@@ -192,7 +192,7 @@ public class TestMetricsRegionServer {
 
   @Test
   public void testCompaction() {
-    rsm.updateCompaction(false, 1, 2, 3, 4, 5);
+    rsm.updateCompaction(null, false, 1, 2, 3, 4, 5);
     HELPER.assertCounter("compactionTime_num_ops", 1, serverSource);
     HELPER.assertCounter("compactionInputFileCount_num_ops", 1, serverSource);
     HELPER.assertCounter("compactionInputSize_num_ops", 1, serverSource);
@@ -200,7 +200,7 @@ public class TestMetricsRegionServer {
     HELPER.assertCounter("compactedInputBytes", 4, serverSource);
     HELPER.assertCounter("compactedoutputBytes", 5, serverSource);
 
-    rsm.updateCompaction(false, 10, 20, 30, 40, 50);
+    rsm.updateCompaction(null, false, 10, 20, 30, 40, 50);
     HELPER.assertCounter("compactionTime_num_ops", 2, serverSource);
     HELPER.assertCounter("compactionInputFileCount_num_ops", 2, serverSource);
     HELPER.assertCounter("compactionInputSize_num_ops", 2, serverSource);
@@ -209,7 +209,7 @@ public class TestMetricsRegionServer {
     HELPER.assertCounter("compactedoutputBytes", 55, serverSource);
 
     // do major compaction
-    rsm.updateCompaction(true, 100, 200, 300, 400, 500);
+    rsm.updateCompaction(null, true, 100, 200, 300, 400, 500);
 
     HELPER.assertCounter("compactionTime_num_ops", 3, serverSource);
     HELPER.assertCounter("compactionInputFileCount_num_ops", 3, serverSource);

http://git-wip-us.apache.org/repos/asf/hbase/blob/fb74f215/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableAggregate.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableAggregate.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableAggregate.java
index c114298..bed4574 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableAggregate.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableAggregate.java
@@ -18,11 +18,14 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CompatibilityFactory;
-import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.hbase.test.MetricsAssertHelper;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -32,21 +35,103 @@ public class TestMetricsTableAggregate {
   public static MetricsAssertHelper HELPER =
       CompatibilityFactory.getInstance(MetricsAssertHelper.class);
 
+  private String tableName = "testTableMetrics";
+  private String pre = "Namespace_default_table_" + tableName + "_metric_";
+
+  private MetricsTableWrapperStub tableWrapper;
+  private MetricsTable mt;
+  private MetricsRegionServerWrapper rsWrapper;
+  private MetricsRegionServer rsm;
+  private MetricsTableAggregateSource agg;
+
+  @BeforeClass
+  public static void classSetUp() {
+    HELPER.init();
+  }
+
+  @Before
+  public void setUp() {
+    tableWrapper = new MetricsTableWrapperStub(tableName);
+    mt = new MetricsTable(tableWrapper);
+    rsWrapper = new MetricsRegionServerWrapperStub();
+    Configuration conf = new Configuration();
+    rsm = new MetricsRegionServer(rsWrapper, conf, mt);
+    agg = mt.getTableSourceAgg();
+  }
+
+  @Test
+  public void testRequestMetrics() throws IOException {
+    HELPER.assertCounter(pre + "readRequestCount", 10, agg);
+    HELPER.assertCounter(pre + "writeRequestCount", 20, agg);
+    HELPER.assertCounter(pre + "totalRequestCount", 30, agg);
+  }
+
+  @Test
+  public void testRegionAndStoreMetrics() throws IOException {
+    HELPER.assertGauge(pre + "memstoreSize", 1000, agg);
+    HELPER.assertGauge(pre + "storeFileSize", 2000, agg);
+    HELPER.assertGauge(pre + "tableSize", 3000, agg);
+
+    HELPER.assertGauge(pre + "regionCount", 11, agg);
+    HELPER.assertGauge(pre + "storeCount", 22, agg);
+    HELPER.assertGauge(pre + "storeFileCount", 33, agg);
+    HELPER.assertGauge(pre + "maxStoreFileAge", 44, agg);
+    HELPER.assertGauge(pre + "minStoreFileAge", 55, agg);
+    HELPER.assertGauge(pre + "avgStoreFileAge", 66, agg);
+    HELPER.assertGauge(pre + "numReferenceFiles", 77, agg);
+    HELPER.assertGauge(pre + "averageRegionSize", 88, agg);
+  }
+
   @Test
-  public void testTableWrapperAggregateMetrics() throws IOException {
-    String tableName = "testTableMetrics";
-    MetricsTableWrapperStub tableWrapper = new MetricsTableWrapperStub(tableName);
-    CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
-    .createTable(tableName, tableWrapper);
-    MetricsTableAggregateSource agg = CompatibilitySingletonFactory
-        .getInstance(MetricsRegionServerSourceFactory.class).getTableAggregate();
-
-    HELPER.assertCounter("Namespace_default_table_testTableMetrics_metric_readRequestCount", 10, agg);
-    HELPER.assertCounter("Namespace_default_table_testTableMetrics_metric_writeRequestCount", 20, agg);
-    HELPER.assertCounter("Namespace_default_table_testTableMetrics_metric_totalRequestCount", 30, agg);
-
-    HELPER.assertGauge("Namespace_default_table_testTableMetrics_metric_memstoreSize", 1000, agg);
-    HELPER.assertGauge("Namespace_default_table_testTableMetrics_metric_storeFileSize", 2000, agg);
-    HELPER.assertGauge("Namespace_default_table_testTableMetrics_metric_tableSize", 3000, agg);
+  public void testFlush() {
+    rsm.updateFlush(tableName, 1, 2, 3);
+    HELPER.assertCounter(pre + "flushTime_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "flushMemstoreSize_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "flushOutputSize_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "flushedMemstoreBytes", 2, agg);
+    HELPER.assertCounter(pre + "flushedOutputBytes", 3, agg);
+
+    rsm.updateFlush(tableName, 10, 20, 30);
+    HELPER.assertCounter(pre + "flushTime_num_ops", 2, agg);
+    HELPER.assertCounter(pre + "flushMemstoreSize_num_ops", 2, agg);
+    HELPER.assertCounter(pre + "flushOutputSize_num_ops", 2, agg);
+    HELPER.assertCounter(pre + "flushedMemstoreBytes", 22, agg);
+    HELPER.assertCounter(pre + "flushedOutputBytes", 33, agg);
+  }
+
+  @Test
+  public void testCompaction() {
+    rsm.updateCompaction(tableName, false, 1, 2, 3, 4, 5);
+    HELPER.assertCounter(pre + "compactionTime_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "compactionInputFileCount_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "compactionInputSize_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "compactionOutputFileCount_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "compactedInputBytes", 4, agg);
+    HELPER.assertCounter(pre + "compactedoutputBytes", 5, agg);
+
+    rsm.updateCompaction(tableName, false, 10, 20, 30, 40, 50);
+    HELPER.assertCounter(pre + "compactionTime_num_ops", 2, agg);
+    HELPER.assertCounter(pre + "compactionInputFileCount_num_ops", 2, agg);
+    HELPER.assertCounter(pre + "compactionInputSize_num_ops", 2, agg);
+    HELPER.assertCounter(pre + "compactionOutputFileCount_num_ops", 2, agg);
+    HELPER.assertCounter(pre + "compactedInputBytes", 44, agg);
+    HELPER.assertCounter(pre + "compactedoutputBytes", 55, agg);
+
+    // do major compaction
+    rsm.updateCompaction(tableName, true, 100, 200, 300, 400, 500);
+
+    HELPER.assertCounter(pre + "compactionTime_num_ops", 3, agg);
+    HELPER.assertCounter(pre + "compactionInputFileCount_num_ops", 3, agg);
+    HELPER.assertCounter(pre + "compactionInputSize_num_ops", 3, agg);
+    HELPER.assertCounter(pre + "compactionOutputFileCount_num_ops", 3, agg);
+    HELPER.assertCounter(pre + "compactedInputBytes", 444, agg);
+    HELPER.assertCounter(pre + "compactedoutputBytes", 555, agg);
+
+    HELPER.assertCounter(pre + "majorCompactionTime_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "majorCompactionInputFileCount_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "majorCompactionInputSize_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "majorCompactionOutputFileCount_num_ops", 1, agg);
+    HELPER.assertCounter(pre + "majorCompactedInputBytes", 400, agg);
+    HELPER.assertCounter(pre + "majorCompactedoutputBytes", 500, agg);
   }
 }
\ No newline at end of file