You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2012/11/07 00:22:09 UTC

svn commit: r1406396 [5/6] - in /hbase/trunk: dev-support/ hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/ hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/met...

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1406396&r1=1406395&r2=1406396&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Nov  6 23:22:01 2012
@@ -48,13 +48,11 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.management.ObjectName;
 
 import com.google.protobuf.Message;
-import org.apache.commons.lang.mutable.MutableDouble;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -67,7 +65,6 @@ import org.apache.hadoop.hbase.DoNotRetr
 import org.apache.hadoop.hbase.FailedSanityCheckException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
@@ -107,9 +104,7 @@ import org.apache.hadoop.hbase.executor.
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.fs.HFileSystem;
-import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.CacheStats;
 import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
@@ -191,11 +186,6 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
 import org.apache.hadoop.hbase.regionserver.handler.OpenRootHandler;
-import org.apache.hadoop.hbase.regionserver.metrics.RegionMetricsStorage;
-import org.apache.hadoop.hbase.regionserver.metrics.RegionServerDynamicMetrics;
-import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
-import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
-import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.StoreMetricType;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
@@ -225,6 +215,7 @@ import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.zookeeper.KeeperException;
+import org.cliffc.high_scale_lib.Counter;
 import org.codehaus.jackson.map.ObjectMapper;
 
 import com.google.common.base.Function;
@@ -297,9 +288,8 @@ public class  HRegionServer implements C
   // Instance of the hbase executor service.
   protected ExecutorService service;
 
-  // Request counter.
-  // Do we need this?  Can't we just sum region counters?  St.Ack 20110412
-  protected AtomicInteger requestCount = new AtomicInteger();
+  // Request counter. (Includes requests that are not serviced by regions.)
+  final Counter requestCount = new Counter();
 
   // If false, the file system has become unavailable
   protected volatile boolean fsOk;
@@ -366,9 +356,7 @@ public class  HRegionServer implements C
    */
   private final LinkedList<byte[]> reservedSpace = new LinkedList<byte[]>();
 
-  private RegionServerMetrics metrics;
-
-  private RegionServerDynamicMetrics dynamicMetrics;
+  private MetricsRegionServer metricsRegionServer;
 
   /*
    * Check for compactions requests.
@@ -403,7 +391,7 @@ public class  HRegionServer implements C
   private final RegionServerAccounting regionServerAccounting;
 
   // Cache configuration and block cache reference
-  private final CacheConfig cacheConfig;
+  final CacheConfig cacheConfig;
 
   // reference to the Thrift Server.
   volatile private HRegionThriftServer thriftServer;
@@ -446,7 +434,6 @@ public class  HRegionServer implements C
    */
   private final QosFunction qosFunction;
 
-
   /**
    * Starts a HRegionServer at the default location
    *
@@ -550,6 +537,10 @@ public class  HRegionServer implements C
     }
   }
 
+  String getClusterId() {
+    return this.conf.get(HConstants.CLUSTER_ID);
+  }
+
   @Retention(RetentionPolicy.RUNTIME)
   protected @interface QosPriority {
     int priority() default 0;
@@ -858,7 +849,6 @@ public class  HRegionServer implements C
           break;
         }
       }
-      registerMBean();
 
       // We registered with the Master.  Go into run mode.
       long lastMsg = 0;
@@ -893,7 +883,6 @@ public class  HRegionServer implements C
         }
         long now = System.currentTimeMillis();
         if ((now - lastMsg) >= msgInterval) {
-          doMetrics();
           tryRegionServerReport(lastMsg, now);
           lastMsg = System.currentTimeMillis();
         }
@@ -1022,8 +1011,6 @@ public class  HRegionServer implements C
   void tryRegionServerReport(long reportStartTime, long reportEndTime)
   throws IOException {
     HBaseProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
-    // Why we do this?
-    this.requestCount.set(0);
     try {
       RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
       ServerName sn = ServerName.parseVersionedServerName(
@@ -1044,13 +1031,21 @@ public class  HRegionServer implements C
   }
 
   HBaseProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) {
+    // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
+    // per second, and other metrics  As long as metrics are part of ServerLoad it's best to use
+    // the wrapper to compute those numbers in one place.
+    // In the long term most of these should be moved off of ServerLoad and the heart beat.
+    // Instead they should be stored in an HBase table so that external visibility into HBase is
+    // improved; Additionally the load balancer will be able to take advantage of a more complete
+    // history.
+    MetricsRegionServerWrapper regionServerWrapper = this.metricsRegionServer.getRegionServerWrapper();
     Collection<HRegion> regions = getOnlineRegionsLocalContext();
     MemoryUsage memory =
       ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
 
     HBaseProtos.ServerLoad.Builder serverLoad = HBaseProtos.ServerLoad.newBuilder();
-    serverLoad.setNumberOfRequests((int)metrics.getRequests());
-    serverLoad.setTotalNumberOfRequests(requestCount.get());
+    serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
+    serverLoad.setTotalNumberOfRequests((int) regionServerWrapper.getTotalRequestCount());
     serverLoad.setUsedHeapMB((int)(memory.getUsed() / 1024 / 1024));
     serverLoad.setMaxHeapMB((int) (memory.getMax() / 1024 / 1024));
     Set<String> coprocessors = this.hlog.getCoprocessorHost().getCoprocessors();
@@ -1205,8 +1200,7 @@ public class  HRegionServer implements C
       this.tableDescriptors = new FSTableDescriptors(this.fs, this.rootDir, true);
       this.hlog = setupWALAndReplication();
       // Init in here rather than in constructor after thread name has been set
-      this.metrics = new RegionServerMetrics();
-      this.dynamicMetrics = RegionServerDynamicMetrics.newInstance();
+      this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
       startServiceThreads();
       LOG.info("Serving as " + this.serverNameFromMasterPOV +
         ", RPC listening on " + this.isa +
@@ -1441,179 +1435,8 @@ public class  HRegionServer implements C
     return hlogRoller;
   }
 
-  /*
-   * @param interval Interval since last time metrics were called.
-   */
-  protected void doMetrics() {
-    try {
-      metrics();
-    } catch (Throwable e) {
-      LOG.warn("Failed metrics", e);
-    }
-  }
-
-  protected void metrics() {
-    this.metrics.regions.set(this.onlineRegions.size());
-    this.metrics.incrementRequests(this.requestCount.get());
-    this.metrics.requests.intervalHeartBeat();
-    // Is this too expensive every three seconds getting a lock on onlineRegions
-    // and then per store carried? Can I make metrics be sloppier and avoid
-    // the synchronizations?
-    int stores = 0;
-    int storefiles = 0;
-    long memstoreSize = 0;
-    int readRequestsCount = 0;
-    int writeRequestsCount = 0;
-    long checkAndMutateChecksFailed = 0;
-    long checkAndMutateChecksPassed = 0;
-    long storefileIndexSize = 0;
-    HDFSBlocksDistribution hdfsBlocksDistribution =
-      new HDFSBlocksDistribution();
-    long totalStaticIndexSize = 0;
-    long totalStaticBloomSize = 0;
-    long numPutsWithoutWAL = 0;
-    long dataInMemoryWithoutWAL = 0;
-    long updatesBlockedMs = 0;
-
-    // Note that this is a map of Doubles instead of Longs. This is because we
-    // do effective integer division, which would perhaps truncate more than it
-    // should because we do it only on one part of our sum at a time. Rather
-    // than dividing at the end, where it is difficult to know the proper
-    // factor, everything is exact then truncated.
-    final Map<String, MutableDouble> tempVals =
-        new HashMap<String, MutableDouble>();
-
-    for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
-      HRegion r = e.getValue();
-      memstoreSize += r.memstoreSize.get();
-      numPutsWithoutWAL += r.numPutsWithoutWAL.get();
-      dataInMemoryWithoutWAL += r.dataInMemoryWithoutWAL.get();
-      readRequestsCount += r.readRequestsCount.get();
-      writeRequestsCount += r.writeRequestsCount.get();
-      checkAndMutateChecksFailed += r.checkAndMutateChecksFailed.get();
-      checkAndMutateChecksPassed += r.checkAndMutateChecksPassed.get();
-      updatesBlockedMs += r.updatesBlockedMs.get();
-      synchronized (r.stores) {
-        stores += r.stores.size();
-        for (Map.Entry<byte[], Store> ee : r.stores.entrySet()) {
-          final Store store = ee.getValue();
-            final SchemaMetrics schemaMetrics = store.getSchemaMetrics();
-
-            {
-              long tmpStorefiles = store.getStorefilesCount();
-              schemaMetrics.accumulateStoreMetric(tempVals,
-                  StoreMetricType.STORE_FILE_COUNT, tmpStorefiles);
-              storefiles += tmpStorefiles;
-            }
-
-
-            {
-              long tmpStorefileIndexSize = store.getStorefilesIndexSize();
-              schemaMetrics.accumulateStoreMetric(tempVals,
-                  StoreMetricType.STORE_FILE_INDEX_SIZE,
-                  (long) (tmpStorefileIndexSize / (1024.0 * 1024)));
-              storefileIndexSize += tmpStorefileIndexSize;
-            }
-
-            {
-              long tmpStorefilesSize = store.getStorefilesSize();
-              schemaMetrics.accumulateStoreMetric(tempVals,
-                  StoreMetricType.STORE_FILE_SIZE_MB,
-                  (long) (tmpStorefilesSize / (1024.0 * 1024)));
-            }
-
-            {
-              long tmpStaticBloomSize = store.getTotalStaticBloomSize();
-              schemaMetrics.accumulateStoreMetric(tempVals,
-                  StoreMetricType.STATIC_BLOOM_SIZE_KB,
-                  (long) (tmpStaticBloomSize / 1024.0));
-              totalStaticBloomSize += tmpStaticBloomSize;
-            }
-
-            {
-              long tmpStaticIndexSize = store.getTotalStaticIndexSize();
-              schemaMetrics.accumulateStoreMetric(tempVals,
-                  StoreMetricType.STATIC_INDEX_SIZE_KB,
-                  (long) (tmpStaticIndexSize / 1024.0));
-              totalStaticIndexSize += tmpStaticIndexSize;
-            }
-
-            schemaMetrics.accumulateStoreMetric(tempVals,
-                StoreMetricType.MEMSTORE_SIZE_MB,
-                (long) (store.getMemStoreSize() / (1024.0 * 1024)));
-        }
-      }
-
-      hdfsBlocksDistribution.add(r.getHDFSBlocksDistribution());
-    }
-
-    for (Entry<String, MutableDouble> e : tempVals.entrySet()) {
-      RegionMetricsStorage.setNumericMetric(e.getKey(), e.getValue().longValue());
-    }
-
-    this.metrics.stores.set(stores);
-    this.metrics.storefiles.set(storefiles);
-    this.metrics.memstoreSizeMB.set((int) (memstoreSize / (1024 * 1024)));
-    this.metrics.mbInMemoryWithoutWAL.set((int) (dataInMemoryWithoutWAL / (1024 * 1024)));
-    this.metrics.numPutsWithoutWAL.set(numPutsWithoutWAL);
-    this.metrics.storefileIndexSizeMB.set(
-        (int) (storefileIndexSize / (1024 * 1024)));
-    this.metrics.rootIndexSizeKB.set(
-        (int) (storefileIndexSize / 1024));
-    this.metrics.totalStaticIndexSizeKB.set(
-        (int) (totalStaticIndexSize / 1024));
-    this.metrics.totalStaticBloomSizeKB.set(
-        (int) (totalStaticBloomSize / 1024));
-    this.metrics.readRequestsCount.set(readRequestsCount);
-    this.metrics.writeRequestsCount.set(writeRequestsCount);
-    this.metrics.checkAndMutateChecksFailed.set(checkAndMutateChecksFailed);
-    this.metrics.checkAndMutateChecksPassed.set(checkAndMutateChecksPassed);
-    this.metrics.compactionQueueSize.set(compactSplitThread
-        .getCompactionQueueSize());
-    this.metrics.flushQueueSize.set(cacheFlusher
-        .getFlushQueueSize());
-    this.metrics.updatesBlockedSeconds.update(updatesBlockedMs > 0 ? 
-        updatesBlockedMs/1000: 0);
-    final long updatesBlockedMsHigherWater = cacheFlusher.getUpdatesBlockedMsHighWater().get();
-    this.metrics.updatesBlockedSecondsHighWater.update(updatesBlockedMsHigherWater > 0 ? 
-        updatesBlockedMsHigherWater/1000: 0);
-
-    BlockCache blockCache = cacheConfig.getBlockCache();
-    if (blockCache != null) {
-      this.metrics.blockCacheCount.set(blockCache.size());
-      this.metrics.blockCacheFree.set(blockCache.getFreeSize());
-      this.metrics.blockCacheSize.set(blockCache.getCurrentSize());
-      CacheStats cacheStats = blockCache.getStats();
-      this.metrics.blockCacheHitCount.set(cacheStats.getHitCount());
-      this.metrics.blockCacheMissCount.set(cacheStats.getMissCount());
-      this.metrics.blockCacheEvictedCount.set(blockCache.getEvictedCount());
-      double ratio = blockCache.getStats().getHitRatio();
-      int percent = (int) (ratio * 100);
-      this.metrics.blockCacheHitRatio.set(percent);
-      ratio = blockCache.getStats().getHitCachingRatio();
-      percent = (int) (ratio * 100);
-      this.metrics.blockCacheHitCachingRatio.set(percent);
-      // past N period block cache hit / hit caching ratios
-      cacheStats.rollMetricsPeriod();
-      ratio = cacheStats.getHitRatioPastNPeriods();
-      percent = (int) (ratio * 100);
-      this.metrics.blockCacheHitRatioPastNPeriods.set(percent);
-      ratio = cacheStats.getHitCachingRatioPastNPeriods();
-      percent = (int) (ratio * 100);
-      this.metrics.blockCacheHitCachingRatioPastNPeriods.set(percent);
-    }
-    float localityIndex = hdfsBlocksDistribution.getBlockLocalityIndex(
-      getServerName().getHostname());
-    int percent = (int) (localityIndex * 100);
-    this.metrics.hdfsBlocksLocalityIndex.set(percent);
-
-  }
-
-  /**
-   * @return Region server metrics instance.
-   */
-  public RegionServerMetrics getMetrics() {
-    return this.metrics;
+  public MetricsRegionServer getMetrics() {
+    return this.metricsRegionServer;
   }
 
   /**
@@ -1841,9 +1664,6 @@ public class  HRegionServer implements C
     // java.util.HashSet's toString() method to print the coprocessor names.
     LOG.fatal("RegionServer abort: loaded coprocessors are: " +
         CoprocessorHost.getLoadedCoprocessors());
-    if (this.metrics != null) {
-      LOG.info("Dump of metrics: " + this.metrics);
-    }
     // Do our best to report our abort to the master, but this may not work
     try {
       if (cause != null) {
@@ -2146,45 +1966,7 @@ public class  HRegionServer implements C
   }
 
   /**
-   * @param encodedRegionName
-   * @return JSON Map of labels to values for passed in <code>encodedRegionName</code>
-   * @throws IOException
-   */
-  public byte [] getRegionStats(final String encodedRegionName)
-  throws IOException {
-    HRegion r = null;
-    synchronized (this.onlineRegions) {
-      r = this.onlineRegions.get(encodedRegionName);
-    }
-    if (r == null) return null;
-    ObjectMapper mapper = new ObjectMapper();
-    int stores = 0;
-    int storefiles = 0;
-    int storefileSizeMB = 0;
-    int memstoreSizeMB = (int) (r.memstoreSize.get() / 1024 / 1024);
-    int storefileIndexSizeMB = 0;
-    synchronized (r.stores) {
-      stores += r.stores.size();
-      for (Store store : r.stores.values()) {
-        storefiles += store.getStorefilesCount();
-        storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
-        storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
-      }
-    }
-    Map<String, Integer> map = new TreeMap<String, Integer>();
-    map.put("stores", stores);
-    map.put("storefiles", storefiles);
-    map.put("storefileSizeMB", storefileSizeMB);
-    map.put("storefileIndexSizeMB", storefileIndexSizeMB);
-    map.put("memstoreSizeMB", memstoreSizeMB);
-    StringWriter w = new StringWriter();
-    mapper.writeValue(w, map);
-    w.close();
-    return Bytes.toBytes(w.toString());
-  }
-
-  /**
-   * For tests and web ui.
+   * For tests, web ui and metrics.
    * This method will only work if HRegionServer is in the same JVM as client;
    * HRegion cannot be serialized to cross an rpc.
    * @see #getOnlineRegions()
@@ -2218,11 +2000,6 @@ public class  HRegionServer implements C
     return sortedRegions;
   }
 
-  /** @return the request count */
-  public AtomicInteger getRequestCount() {
-    return this.requestCount;
-  }
-
   /**
    * @return time stamp in millis of when this region server was started
    */
@@ -2498,16 +2275,6 @@ public class  HRegionServer implements C
   }
 
   /**
-   * Register bean with platform management server
-   */
-  void registerMBean() {
-    MXBeanImpl mxBeanInfo = MXBeanImpl.init(this);
-    mxBean = MBeanUtil.registerMBean("RegionServer", "RegionServer",
-        mxBeanInfo);
-    LOG.info("Registered RegionServer MXBean");
-  }
-
-  /**
    * Instantiated as a row lock lease. If the lease times out, the row lock is
    * released
    */
@@ -2685,14 +2452,7 @@ public class  HRegionServer implements C
     if (destination != null){
       addToMovedRegions(encodedRegionName, destination);
     }
-    
-    //Clear all of the dynamic metrics as they are now probably useless.
-    //This is a clear because dynamic metrics could include metrics per cf and
-    //per hfile.  Figuring out which cfs, hfiles, and regions are still relevant to
-    //this region server would be an onerous task.  Instead just clear everything
-    //and on the next tick of the metrics everything that is still relevant will be
-    //re-added.
-    this.dynamicMetrics.clear();
+
     return toReturn != null;
   }
 
@@ -2885,8 +2645,9 @@ public class  HRegionServer implements C
   @Override
   public GetResponse get(final RpcController controller,
       final GetRequest request) throws ServiceException {
+    long before = EnvironmentEdgeManager.currentTimeMillis();
     try {
-      requestCount.incrementAndGet();
+      requestCount.increment();
       HRegion region = getRegion(request.getRegion());
       GetResponse.Builder builder = GetResponse.newBuilder();
       ClientProtos.Get get = request.getGet();
@@ -2926,6 +2687,8 @@ public class  HRegionServer implements C
       return builder.build();
     } catch (IOException ie) {
       throw new ServiceException(ie);
+    } finally {
+      metricsRegionServer.updateGet(EnvironmentEdgeManager.currentTimeMillis() - before);
     }
   }
 
@@ -2940,7 +2703,7 @@ public class  HRegionServer implements C
   public MutateResponse mutate(final RpcController controller,
       final MutateRequest request) throws ServiceException {
     try {
-      requestCount.incrementAndGet();
+      requestCount.increment();
       HRegion region = getRegion(request.getRegion());
       MutateResponse.Builder builder = MutateResponse.newBuilder();
       Mutate mutate = request.getMutate();
@@ -3073,7 +2836,7 @@ public class  HRegionServer implements C
         }
         throw e;
       }
-      requestCount.incrementAndGet();
+      requestCount.increment();
 
       try {
         int ttl = 0;
@@ -3167,7 +2930,7 @@ public class  HRegionServer implements C
               for (int i = 0; i < rows
                   && currentScanResultSize < maxResultSize; i++) {
                 // Collect values to be returned here
-                boolean moreRows = scanner.next(values, SchemaMetrics.METRIC_NEXTSIZE);
+                boolean moreRows = scanner.next(values);
                 if (!values.isEmpty()) {
                   for (KeyValue kv : values) {
                     currentScanResultSize += kv.heapSize();
@@ -3261,7 +3024,7 @@ public class  HRegionServer implements C
         throw new DoNotRetryIOException(
           "lockRow supports only one row now, not " + request.getRowCount() + " rows");
       }
-      requestCount.incrementAndGet();
+      requestCount.increment();
       HRegion region = getRegion(request.getRegion());
       byte[] row = request.getRow(0).toByteArray();
       try {
@@ -3292,7 +3055,7 @@ public class  HRegionServer implements C
   public UnlockRowResponse unlockRow(final RpcController controller,
       final UnlockRowRequest request) throws ServiceException {
     try {
-      requestCount.incrementAndGet();
+      requestCount.increment();
       HRegion region = getRegion(request.getRegion());
       if (!request.hasLockId()) {
         throw new DoNotRetryIOException(
@@ -3327,7 +3090,7 @@ public class  HRegionServer implements C
   public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
       final BulkLoadHFileRequest request) throws ServiceException {
     try {
-      requestCount.incrementAndGet();
+      requestCount.increment();
       HRegion region = getRegion(request.getRegion());
       List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
       for (FamilyPath familyPath: request.getFamilyPathList()) {
@@ -3374,7 +3137,7 @@ public class  HRegionServer implements C
   public ExecCoprocessorResponse execCoprocessor(final RpcController controller,
       final ExecCoprocessorRequest request) throws ServiceException {
     try {
-      requestCount.incrementAndGet();
+      requestCount.increment();
       HRegion region = getRegion(request.getRegion());
       ExecCoprocessorResponse.Builder
         builder = ExecCoprocessorResponse.newBuilder();
@@ -3392,7 +3155,7 @@ public class  HRegionServer implements C
   public CoprocessorServiceResponse execService(final RpcController controller,
       final CoprocessorServiceRequest request) throws ServiceException {
     try {
-      requestCount.incrementAndGet();
+      requestCount.increment();
       HRegion region = getRegion(request.getRegion());
       // ignore the passed in controller (from the serialized call)
       ServerRpcController execController = new ServerRpcController();
@@ -3441,7 +3204,7 @@ public class  HRegionServer implements C
         ActionResult.Builder resultBuilder = null;
         List<Mutate> mutates = new ArrayList<Mutate>();
         for (ClientProtos.MultiAction actionUnion : request.getActionList()) {
-          requestCount.incrementAndGet();
+          requestCount.increment();
           try {
             Object result = null;
             if (actionUnion.hasGet()) {
@@ -3524,7 +3287,7 @@ public class  HRegionServer implements C
       final GetRegionInfoRequest request) throws ServiceException {
     try {
       checkOpen();
-      requestCount.incrementAndGet();
+      requestCount.increment();
       HRegion region = getRegion(request.getRegion());
       HRegionInfo info = region.getRegionInfo();
       GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
@@ -3544,7 +3307,7 @@ public class  HRegionServer implements C
       final GetStoreFileRequest request) throws ServiceException {
     try {
       HRegion region = getRegion(request.getRegion());
-      requestCount.incrementAndGet();
+      requestCount.increment();
       Set<byte[]> columnFamilies = null;
       if (request.getFamilyCount() == 0) {
         columnFamilies = region.getStores().keySet();
@@ -3571,7 +3334,7 @@ public class  HRegionServer implements C
       final GetOnlineRegionRequest request) throws ServiceException {
     try {
       checkOpen();
-      requestCount.incrementAndGet();
+      requestCount.increment();
       List<HRegionInfo> list = new ArrayList<HRegionInfo>(onlineRegions.size());
       for (HRegion region: this.onlineRegions.values()) {
         list.add(region.getRegionInfo());
@@ -3602,7 +3365,7 @@ public class  HRegionServer implements C
     } catch (IOException ie) {
       throw new ServiceException(ie);
     }
-    requestCount.incrementAndGet();
+    requestCount.increment();
     OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
     int regionCount = request.getOpenInfoCount();
     Map<String, HTableDescriptor> htds =
@@ -3694,7 +3457,6 @@ public class  HRegionServer implements C
 
     try {
       checkOpen();
-      requestCount.incrementAndGet();
       String encodedRegionName =
         ProtobufUtil.getRegionEncodedName(request.getRegion());
       byte[] encodedName = Bytes.toBytes(encodedRegionName);
@@ -3706,6 +3468,7 @@ public class  HRegionServer implements C
         checkIfRegionInTransition(encodedName, CLOSE);
       }
       HRegion region = getRegionByEncodedName(encodedRegionName);
+      requestCount.increment();
       LOG.info("Received close region: " + region.getRegionNameAsString() +
         ". Version of ZK closing node:" + versionOfClosingNode +
         ". Destination server:" + sn);
@@ -3734,7 +3497,7 @@ public class  HRegionServer implements C
       final FlushRegionRequest request) throws ServiceException {
     try {
       checkOpen();
-      requestCount.incrementAndGet();
+      requestCount.increment();
       HRegion region = getRegion(request.getRegion());
       LOG.info("Flushing " + region.getRegionNameAsString());
       boolean shouldFlush = true;
@@ -3765,7 +3528,7 @@ public class  HRegionServer implements C
       final SplitRegionRequest request) throws ServiceException {
     try {
       checkOpen();
-      requestCount.incrementAndGet();
+      requestCount.increment();
       HRegion region = getRegion(request.getRegion());
       LOG.info("Splitting " + region.getRegionNameAsString());
       region.flushcache();
@@ -3794,7 +3557,7 @@ public class  HRegionServer implements C
       final CompactRegionRequest request) throws ServiceException {
     try {
       checkOpen();
-      requestCount.incrementAndGet();
+      requestCount.increment();
       HRegion region = getRegion(request.getRegion());
       LOG.info("Compacting " + region.getRegionNameAsString());
       boolean major = false;
@@ -3829,7 +3592,7 @@ public class  HRegionServer implements C
     try {
       if (replicationSinkHandler != null) {
         checkOpen();
-        requestCount.incrementAndGet();
+        requestCount.increment();
         HLog.Entry[] entries = ProtobufUtil.toHLogEntries(request.getEntryList());
         if (entries != null && entries.length > 0) {
           replicationSinkHandler.replicateLogEntries(entries);
@@ -3852,7 +3615,7 @@ public class  HRegionServer implements C
   public RollWALWriterResponse rollWALWriter(final RpcController controller,
       final RollWALWriterRequest request) throws ServiceException {
     try {
-      requestCount.incrementAndGet();
+      requestCount.increment();
       HLog wal = this.getWAL();
       byte[][] regionsToFlush = wal.rollWriter(true);
       RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
@@ -3877,7 +3640,7 @@ public class  HRegionServer implements C
   @Override
   public StopServerResponse stopServer(final RpcController controller,
       final StopServerRequest request) throws ServiceException {
-    requestCount.incrementAndGet();
+    requestCount.increment();
     String reason = request.getReason();
     stop(reason);
     return StopServerResponse.newBuilder().build();
@@ -3894,7 +3657,7 @@ public class  HRegionServer implements C
   public GetServerInfoResponse getServerInfo(final RpcController controller,
       final GetServerInfoRequest request) throws ServiceException {
     ServerName serverName = getServerName();
-    requestCount.incrementAndGet();
+    requestCount.increment();
     return ResponseConverter.buildGetServerInfoResponse(serverName, webuiport);
   }
 
@@ -3924,6 +3687,7 @@ public class  HRegionServer implements C
    */
   protected Result append(final HRegion region,
       final Mutate mutate) throws IOException {
+    long before = EnvironmentEdgeManager.currentTimeMillis();
     Append append = ProtobufUtil.toAppend(mutate);
     Result r = null;
     if (region.getCoprocessorHost() != null) {
@@ -3936,6 +3700,7 @@ public class  HRegionServer implements C
         region.getCoprocessorHost().postAppend(append, r);
       }
     }
+    metricsRegionServer.updateAppend(EnvironmentEdgeManager.currentTimeMillis() - before);
     return r;
   }
 
@@ -3949,6 +3714,7 @@ public class  HRegionServer implements C
    */
   protected Result increment(final HRegion region,
       final Mutate mutate) throws IOException {
+    long before = EnvironmentEdgeManager.currentTimeMillis();
     Increment increment = ProtobufUtil.toIncrement(mutate);
     Result r = null;
     if (region.getCoprocessorHost() != null) {
@@ -3961,6 +3727,7 @@ public class  HRegionServer implements C
         r = region.getCoprocessorHost().postIncrement(increment, r);
       }
     }
+    metricsRegionServer.updateIncrement(EnvironmentEdgeManager.currentTimeMillis() - before);
     return r;
   }
 
@@ -3975,7 +3742,8 @@ public class  HRegionServer implements C
       final HRegion region, final List<Mutate> mutates) {
     @SuppressWarnings("unchecked")
     Pair<Mutation, Integer>[] mutationsWithLocks = new Pair[mutates.size()];
-
+    long before = EnvironmentEdgeManager.currentTimeMillis();
+    boolean batchContainsPuts = false, batchContainsDelete = false;
     try {
       ActionResult.Builder resultBuilder = ActionResult.newBuilder();
       NameBytesPair value = ProtobufUtil.toParameter(new Result());
@@ -3987,15 +3755,18 @@ public class  HRegionServer implements C
         Mutation mutation = null;
         if (m.getMutateType() == MutateType.PUT) {
           mutation = ProtobufUtil.toPut(m);
+          batchContainsPuts = true;
         } else {
           mutation = ProtobufUtil.toDelete(m);
+          batchContainsDelete = true;
         }
         Integer lock = getLockFromId(mutation.getLockId());
         mutationsWithLocks[i++] = new Pair<Mutation, Integer>(mutation, lock);
         builder.addResult(result);
       }
 
-      requestCount.addAndGet(mutates.size());
+
+      requestCount.add(mutates.size());
       if (!region.getRegionInfo().isMetaTable()) {
         cacheFlusher.reclaimMemStoreMemory();
       }
@@ -4031,6 +3802,13 @@ public class  HRegionServer implements C
         builder.setResult(i, result);
       }
     }
+    long after = EnvironmentEdgeManager.currentTimeMillis();
+    if (batchContainsPuts) {
+      metricsRegionServer.updatePut(after - before);
+    }
+    if (batchContainsDelete) {
+      metricsRegionServer.updateDelete(after - before);
+    }
   }
 
   /**

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1406396&r1=1406395&r2=1406396&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Tue Nov  6 23:22:01 2012
@@ -66,8 +66,6 @@ import org.apache.hadoop.hbase.monitorin
 import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
-import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -106,7 +104,7 @@ import com.google.common.collect.Lists;
  * not be called directly but by an HRegion manager.
  */
 @InterfaceAudience.Private
-public class HStore extends SchemaConfigured implements Store {
+public class HStore implements Store {
   static final Log LOG = LogFactory.getLog(HStore.class);
 
   protected final MemStore memstore;
@@ -174,9 +172,7 @@ public class HStore extends SchemaConfig
   protected HStore(Path basedir, HRegion region, HColumnDescriptor family,
       FileSystem fs, Configuration confParam)
   throws IOException {
-    super(new CompoundConfiguration().add(confParam).add(
-        family.getValues()), region.getRegionInfo().getTableNameAsString(),
-        Bytes.toString(family.getName()));
+
     HRegionInfo info = region.getRegionInfo();
     this.fs = fs;
     // Assemble the store's home directory.
@@ -260,6 +256,15 @@ public class HStore extends SchemaConfig
     return ttl;
   }
 
+  public String getColumnFamilyName() {
+    return this.family.getNameAsString();
+  }
+
+  @Override
+  public String getTableName() {
+    return this.region.getTableDesc().getNameAsString();
+  }
+
   /**
    * Create this store's homedir
    * @param fs
@@ -414,7 +419,6 @@ public class HStore extends SchemaConfig
         public StoreFile call() throws IOException {
           StoreFile storeFile = new StoreFile(fs, p, conf, cacheConf,
               family.getBloomFilterType(), dataBlockEncoder);
-          passSchemaMetricsTo(storeFile);
           storeFile.createReader();
           return storeFile;
         }
@@ -573,7 +577,6 @@ public class HStore extends SchemaConfig
 
     StoreFile sf = new StoreFile(fs, dstPath, this.conf, this.cacheConf,
         this.family.getBloomFilterType(), this.dataBlockEncoder);
-    passSchemaMetricsTo(sf);
 
     StoreFile.Reader r = sf.createReader();
     this.storeSize += r.length();
@@ -817,19 +820,11 @@ public class HStore extends SchemaConfig
     status.setStatus("Flushing " + this + ": reopening flushed file");
     StoreFile sf = new StoreFile(this.fs, dstPath, this.conf, this.cacheConf,
         this.family.getBloomFilterType(), this.dataBlockEncoder);
-    passSchemaMetricsTo(sf);
 
     StoreFile.Reader r = sf.createReader();
     this.storeSize += r.length();
     this.totalUncompressedBytes += r.getTotalUncompressedBytes();
 
-    // This increments the metrics associated with total flushed bytes for this
-    // family. The overall flush count is stored in the static metrics and
-    // retrieved from HRegion.recentFlushes, which is set within
-    // HRegion.internalFlushcache, which indirectly calls this to actually do
-    // the flushing through the StoreFlusherImpl class
-    getSchemaMetrics().updatePersistentStoreMetric(
-        SchemaMetrics.StoreMetricType.FLUSH_SIZE, flushedSize.longValue());
     if (LOG.isInfoEnabled()) {
       LOG.info("Added " + sf + ", entries=" + r.getEntries() +
         ", sequenceid=" + logCacheFlushId +
@@ -875,11 +870,6 @@ public class HStore extends SchemaConfig
             .withBytesPerChecksum(bytesPerChecksum)
             .withCompression(compression)
             .build();
-    // The store file writer's path does not include the CF name, so we need
-    // to configure the HFile writer directly.
-    SchemaConfigured sc = (SchemaConfigured) w.writer;
-    SchemaConfigured.resetSchemaMetricsConf(sc);
-    passSchemaMetricsTo(sc);
     return w;
   }
 
@@ -1409,8 +1399,8 @@ public class HStore extends SchemaConfig
       (forcemajor || isMajorCompaction(compactSelection.getFilesToCompact())) &&
       (compactSelection.getFilesToCompact().size() < this.maxFilesToCompact
     );
-    LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
-      this.getColumnFamilyName() + ": Initiating " +
+    LOG.debug(this.getHRegionInfo().getEncodedName() + " - "
+        + this.getColumnFamilyName() + ": Initiating " +
       (majorcompaction ? "major" : "minor") + "compaction");
 
     if (!majorcompaction &&
@@ -1523,7 +1513,6 @@ public class HStore extends SchemaConfig
       storeFile = new StoreFile(this.fs, path, this.conf,
           this.cacheConf, this.family.getBloomFilterType(),
           NoOpDataBlockEncoder.INSTANCE);
-      passSchemaMetricsTo(storeFile);
       storeFile.createReader();
     } catch (IOException e) {
       LOG.error("Failed to open store file : " + path
@@ -1575,7 +1564,6 @@ public class HStore extends SchemaConfig
       }
       result = new StoreFile(this.fs, destPath, this.conf, this.cacheConf,
           this.family.getBloomFilterType(), this.dataBlockEncoder);
-      passSchemaMetricsTo(result);
       result.createReader();
     }
     try {
@@ -1936,7 +1924,7 @@ public class HStore extends SchemaConfig
 
   @Override
   public String toString() {
-    return getColumnFamilyName();
+    return this.getColumnFamilyName();
   }
 
   @Override
@@ -2125,9 +2113,8 @@ public class HStore extends SchemaConfig
   }
 
   public static final long FIXED_OVERHEAD =
-      ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
-          + (17 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
-          + (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
+      ClassSize.align((19 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
+              + (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
 
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
       + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1406396&r1=1406395&r2=1406396&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java Tue Nov  6 23:22:01 2012
@@ -416,7 +416,6 @@ class MemStoreFlusher extends HasThread 
         server.compactSplitThread.requestCompaction(region, getName());
       }
 
-      server.getMetrics().addFlush(region.getRecentFlushInfo());
     } catch (DroppedSnapshotException ex) {
       // Cache flush can fail in a few places. If it fails in a critical
       // section, we get a DroppedSnapshotException and a replay of hlog

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegion.java?rev=1406396&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegion.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegion.java Tue Nov  6 23:22:01 2012
@@ -0,0 +1,66 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.CompatibilityFactory;
+
+
+/**
+ * This is the glue between the HRegion and whatever hadoop shim layer
+ * is loaded (hbase-hadoop1-compat or hbase-hadoop2-compat).
+ */
+@InterfaceAudience.Private
+public class MetricsRegion {
+
+  private MetricsRegionSource source;
+
+  public MetricsRegion(MetricsRegionWrapper wrapper) {
+    source = CompatibilityFactory.getInstance(MetricsRegionServerSourceFactory.class)
+                                             .createRegion(wrapper);
+  }
+
+  public void close() {
+    source.close();
+  }
+
+  public void updatePut() {
+    source.updatePut();
+  }
+
+  public void updateDelete() {
+    source.updateDelete();
+  }
+
+  public void updateGet() {
+    source.updateGet();
+  }
+
+  public void updateAppend() {
+    source.updateAppend();
+  }
+
+  public void updateIncrement() {
+    source.updateIncrement();
+  }
+
+  MetricsRegionSource getSource() {
+    return source;
+  }
+}

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java?rev=1406396&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java Tue Nov  6 23:22:01 2012
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+
+/**
+ * This class is for maintaining the various regionserver statistics
+ * and publishing them through the metrics interfaces.
+ * <p>
+ * This class has a number of metrics variables that are publicly accessible;
+ * these variables (objects) have methods to update their values.
+ */
+@InterfaceStability.Evolving
+@InterfaceAudience.Private
+public class MetricsRegionServer {
+  private final Log LOG = LogFactory.getLog(this.getClass());
+  private MetricsRegionServerSource serverSource;
+  private MetricsRegionServerWrapper regionServerWrapper;
+
+  public MetricsRegionServer(MetricsRegionServerWrapper regionServerWrapper) {
+    this.regionServerWrapper = regionServerWrapper;
+    serverSource =
+            CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
+            .createServer(regionServerWrapper);
+  }
+
+  // for unit-test usage
+  public MetricsRegionServerSource getMetricsSource() {
+    return serverSource;
+  }
+
+  public MetricsRegionServerWrapper getRegionServerWrapper() {
+    return regionServerWrapper;
+  }
+
+  public void updatePut(long t){
+    serverSource.updatePut(t);
+  }
+
+  public void updateDelete(long t){
+    serverSource.updateDelete(t);
+  }
+
+  public void updateGet(long t){
+    serverSource.updateGet(t);
+  }
+
+  public void updateIncrement(long t){
+    serverSource.updateIncrement(t);
+  }
+
+  public void updateAppend(long t){
+    serverSource.updateAppend(t);
+  }
+}

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java?rev=1406396&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java Tue Nov  6 23:22:01 2012
@@ -0,0 +1,395 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
+import org.apache.hadoop.hbase.io.hfile.CacheStats;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.metrics2.MetricsExecutor;
+
+import java.util.Collection;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Impl for exposing HRegionServer Information through Hadoop's metrics 2 system.
+ */
+@InterfaceAudience.Private
+class MetricsRegionServerWrapperImpl
+    implements MetricsRegionServerWrapper {
+
+  public static final Log LOG = LogFactory.getLog(MetricsRegionServerWrapperImpl.class);
+
+  public static final int PERIOD = 15;
+
+  private final HRegionServer regionServer;
+  private final BlockCache blockCache;
+
+  private volatile long numStores = 0;
+  private volatile long numStoreFiles = 0;
+  private volatile long memstoreSize = 0;
+  private volatile long storeFileSize = 0;
+  private volatile double requestsPerSecond = 0.0;
+  private volatile long readRequestsCount = 0;
+  private volatile long writeRequestsCount = 0;
+  private volatile long checkAndMutateChecksFailed = 0;
+  private volatile long checkAndMutateChecksPassed = 0;
+  private volatile long storefileIndexSize = 0;
+  private volatile long totalStaticIndexSize = 0;
+  private volatile long totalStaticBloomSize = 0;
+  private volatile long numPutsWithoutWAL = 0;
+  private volatile long dataInMemoryWithoutWAL = 0;
+  private volatile int percentFileLocal = 0;
+
+  private CacheStats cacheStats;
+  private ScheduledExecutorService executor;
+  private Runnable runnable;
+
+  public MetricsRegionServerWrapperImpl(final HRegionServer regionServer) {
+    this.regionServer = regionServer;
+    this.blockCache = this.regionServer.cacheConfig.getBlockCache();
+    this.cacheStats = blockCache.getStats();
+    this.executor = CompatibilitySingletonFactory.getInstance(MetricsExecutor.class).getExecutor();
+    this.runnable = new RegionServerMetricsWrapperRunnable();
+    this.executor.scheduleWithFixedDelay(this.runnable, PERIOD, PERIOD, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public String getClusterId() {
+    return regionServer.getClusterId();
+  }
+
+  @Override
+  public long getStartCode() {
+    return regionServer.getStartcode();
+  }
+
+  @Override
+  public String getZookeeperQuorum() {
+    ZooKeeperWatcher zk = regionServer.getZooKeeperWatcher();
+    if (zk == null) {
+      return "";
+    }
+    return zk.getQuorum();
+  }
+
+  @Override
+  public String getCoprocessors() {
+    String[] coprocessors = regionServer.getCoprocessors();
+    if (coprocessors == null || coprocessors.length == 0) {
+      return "";
+    }
+    return StringUtils.join(coprocessors, ", ");
+  }
+
+  @Override
+  public String getServerName() {
+    ServerName serverName = regionServer.getServerName();
+    if (serverName == null) {
+      return "";
+    }
+    return serverName.getServerName();
+  }
+
+  @Override
+  public long getNumOnlineRegions() {
+    Collection<HRegion> onlineRegionsLocalContext = regionServer.getOnlineRegionsLocalContext();
+    if (onlineRegionsLocalContext == null) {
+      return 0;
+    }
+    return onlineRegionsLocalContext.size();
+  }
+
+  @Override
+  public long getTotalRequestCount() {
+    return regionServer.requestCount.get();
+  }
+
+  @Override
+  public int getCompactionQueueSize() {
+    //The thread could be zero.  if so assume there is no queue.
+    if (this.regionServer.compactSplitThread == null) {
+      return 0;
+    }
+    return this.regionServer.compactSplitThread.getCompactionQueueSize();
+  }
+
+  @Override
+  public int getFlushQueueSize() {
+    //If there is no flusher there should be no queue.
+    if (this.regionServer.cacheFlusher == null) {
+      return 0;
+    }
+    return this.regionServer.cacheFlusher.getFlushQueueSize();
+  }
+
+  @Override
+  public long getBlockCacheCount() {
+    if (this.blockCache == null) {
+      return 0;
+    }
+    return this.blockCache.size();
+  }
+
+  @Override
+  public long getBlockCacheSize() {
+    if (this.blockCache == null) {
+      return 0;
+    }
+    return this.blockCache.getCurrentSize();
+  }
+
+  @Override
+  public long getBlockCacheFreeSize() {
+    if (this.blockCache == null) {
+      return 0;
+    }
+    return this.blockCache.getFreeSize();
+  }
+
+  @Override
+  public long getBlockCacheHitCount() {
+    if (this.cacheStats == null) {
+      return 0;
+    }
+    return this.cacheStats.getHitCount();
+  }
+
+  @Override
+  public long getBlockCacheMissCount() {
+    if (this.cacheStats == null) {
+      return 0;
+    }
+    return this.cacheStats.getMissCount();
+  }
+
+  @Override
+  public long getBlockCacheEvictedCount() {
+    if (this.cacheStats == null) {
+      return 0;
+    }
+    return this.cacheStats.getEvictedCount();
+  }
+
+  @Override
+  public int getBlockCacheHitPercent() {
+    if (this.cacheStats == null) {
+      return 0;
+    }
+    return (int) (this.cacheStats.getHitRatio() * 100);
+  }
+
+  @Override
+  public int getBlockCacheHitCachingPercent() {
+    if (this.cacheStats == null) {
+      return 0;
+    }
+    return (int) (this.cacheStats.getHitCachingRatio() * 100);
+  }
+
+  @Override public void forceRecompute() {
+    this.runnable.run();
+  }
+
+  @Override
+  public long getNumStores() {
+    return numStores;
+  }
+
+  @Override
+  public long getNumStoreFiles() {
+    return numStoreFiles;
+  }
+
+  @Override
+  public long getMemstoreSize() {
+    return memstoreSize;
+  }
+
+  @Override
+  public long getStoreFileSize() {
+    return storeFileSize;
+  }
+
+  @Override public double getRequestsPerSecond() {
+    return requestsPerSecond;
+  }
+
+  @Override
+  public long getReadRequestsCount() {
+    return readRequestsCount;
+  }
+
+  @Override
+  public long getWriteRequestsCount() {
+    return writeRequestsCount;
+  }
+
+  @Override
+  public long getCheckAndMutateChecksFailed() {
+    return checkAndMutateChecksFailed;
+  }
+
+  @Override
+  public long getCheckAndMutateChecksPassed() {
+    return checkAndMutateChecksPassed;
+  }
+
+  @Override
+  public long getStoreFileIndexSize() {
+    return storefileIndexSize;
+  }
+
+  @Override
+  public long getTotalStaticIndexSize() {
+    return totalStaticIndexSize;
+  }
+
+  @Override
+  public long getTotalStaticBloomSize() {
+    return totalStaticBloomSize;
+  }
+
+  @Override
+  public long getNumPutsWithoutWAL() {
+    return numPutsWithoutWAL;
+  }
+
+  @Override
+  public long getDataInMemoryWithoutWAL() {
+    return dataInMemoryWithoutWAL;
+  }
+
+  @Override
+  public int getPercentFileLocal() {
+    return percentFileLocal;
+  }
+
+  @Override
+  public long getUpdatesBlockedTime() {
+    if (this.regionServer.cacheFlusher == null) {
+      return 0;
+    }
+    return this.regionServer.cacheFlusher.getUpdatesBlockedMsHighWater().get();
+  }
+
+
+  /**
+   * This is the runnable that will be executed on the executor every PERIOD number of seconds
+   * It will take metrics/numbers from all of the regions and use them to compute point in
+   * time metrics.
+   */
+  public class RegionServerMetricsWrapperRunnable implements Runnable {
+
+    private long lastRan = 0;
+    private long lastRequestCount = 0;
+
+    @Override
+    synchronized public void run() {
+
+      cacheStats = blockCache.getStats();
+
+      HDFSBlocksDistribution hdfsBlocksDistribution =
+          new HDFSBlocksDistribution();
+
+      long tempNumStores = 0;
+      long tempNumStoreFiles = 0;
+      long tempMemstoreSize = 0;
+      long tempStoreFileSize = 0;
+      long tempReadRequestsCount = 0;
+      long tempWriteRequestsCount = 0;
+      long tempCheckAndMutateChecksFailed = 0;
+      long tempCheckAndMutateChecksPassed = 0;
+      long tempStorefileIndexSize = 0;
+      long tempTotalStaticIndexSize = 0;
+      long tempTotalStaticBloomSize = 0;
+      long tempNumPutsWithoutWAL = 0;
+      long tempDataInMemoryWithoutWAL = 0;
+      int tempPercentFileLocal = 0;
+
+
+      for (HRegion r : regionServer.getOnlineRegionsLocalContext()) {
+        tempNumPutsWithoutWAL += r.numPutsWithoutWAL.get();
+        tempDataInMemoryWithoutWAL += r.dataInMemoryWithoutWAL.get();
+        tempReadRequestsCount += r.readRequestsCount.get();
+        tempWriteRequestsCount += r.writeRequestsCount.get();
+        tempCheckAndMutateChecksFailed += r.checkAndMutateChecksFailed.get();
+        tempCheckAndMutateChecksPassed += r.checkAndMutateChecksPassed.get();
+        tempNumStores += r.stores.size();
+        for (Store store : r.stores.values()) {
+          tempNumStoreFiles += store.getStorefilesCount();
+          tempMemstoreSize += store.getMemStoreSize();
+          tempStoreFileSize += store.getStorefilesSize();
+          tempStorefileIndexSize += store.getStorefilesIndexSize();
+          tempTotalStaticBloomSize += store.getTotalStaticBloomSize();
+          tempTotalStaticIndexSize += store.getTotalStaticIndexSize();
+        }
+
+        hdfsBlocksDistribution.add(r.getHDFSBlocksDistribution());
+      }
+
+      float localityIndex = hdfsBlocksDistribution.getBlockLocalityIndex(
+          regionServer.getServerName().getHostname());
+      tempPercentFileLocal = (int) (localityIndex * 100);
+
+
+      //Compute the number of requests per second
+      long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+
+      // assume that it took PERIOD seconds to start the executor.
+      // this is a guess but it's a pretty good one.
+      if (lastRan == 0) {
+        lastRan = currentTime - (PERIOD*1000);
+      }
+
+
+      //If we've time traveled keep the last requests per second.
+      if ((currentTime - lastRan) > 10) {
+        long currentRequestCount = getTotalRequestCount();
+        requestsPerSecond = (currentRequestCount - lastRequestCount) / ((currentTime - lastRan) / 1000.0);
+        lastRequestCount = currentRequestCount;
+      }
+      lastRan = currentTime;
+
+      //Copy over computed values so that no thread sees half computed values.
+      numStores = tempNumStores;
+      numStoreFiles = tempNumStoreFiles;
+      memstoreSize = tempMemstoreSize;
+      storeFileSize = tempStoreFileSize;
+      readRequestsCount = tempReadRequestsCount;
+      writeRequestsCount = tempWriteRequestsCount;
+      checkAndMutateChecksFailed = tempCheckAndMutateChecksFailed;
+      checkAndMutateChecksPassed = tempCheckAndMutateChecksPassed;
+      storefileIndexSize = tempStorefileIndexSize;
+      totalStaticIndexSize = tempTotalStaticIndexSize;
+      totalStaticBloomSize = tempTotalStaticBloomSize;
+      numPutsWithoutWAL = tempNumPutsWithoutWAL;
+      dataInMemoryWithoutWAL = tempDataInMemoryWithoutWAL;
+      percentFileLocal = tempPercentFileLocal;
+    }
+  }
+
+}

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java?rev=1406396&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java Tue Nov  6 23:22:01 2012
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.hadoop.metrics2.MetricsExecutor;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class MetricsRegionWrapperImpl implements MetricsRegionWrapper {
+
+  public static final int PERIOD = 45;
+
+  private final HRegion region;
+  private ScheduledExecutorService executor;
+  private Runnable runnable;
+  private long numStoreFiles;
+  private long memstoreSize;
+  private long storeFileSize;
+
+  public MetricsRegionWrapperImpl(HRegion region) {
+    this.region = region;
+    this.executor = CompatibilitySingletonFactory.getInstance(MetricsExecutor.class).getExecutor();
+    this.runnable = new HRegionMetricsWrapperRunnable();
+    this.executor.scheduleWithFixedDelay(this.runnable, PERIOD, PERIOD, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public String getTableName() {
+    return this.region.getTableDesc().getNameAsString();
+  }
+
+  @Override
+  public String getRegionName() {
+    return this.region.getRegionInfo().getEncodedName();
+  }
+
+  @Override
+  public long getNumStores() {
+    return this.region.stores.size();
+  }
+
+  @Override
+  public long getNumStoreFiles() {
+    return numStoreFiles;
+  }
+
+  @Override
+  public long getMemstoreSize() {
+    return memstoreSize;
+  }
+
+  @Override
+  public long getStoreFileSize() {
+    return storeFileSize;
+  }
+
+  @Override
+  public long getReadRequestCount() {
+    return this.region.getReadRequestsCount();
+  }
+
+  @Override
+  public long getWriteRequestCount() {
+    return this.region.getWriteRequestsCount();
+  }
+
+  public class HRegionMetricsWrapperRunnable implements Runnable {
+
+    @Override
+    public void run() {
+      long tempNumStoreFiles = 0;
+      long tempMemstoreSize = 0;
+      long tempStoreFileSize = 0;
+
+      for (Store store : region.stores.values()) {
+        tempNumStoreFiles += store.getStorefilesCount();
+        tempMemstoreSize += store.getMemStoreSize();
+        tempStoreFileSize += store.getStorefilesSize();
+      }
+
+      numStoreFiles = tempNumStoreFiles;
+      memstoreSize = tempMemstoreSize;
+      storeFileSize = tempStoreFileSize;
+    }
+  }
+
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java?rev=1406396&r1=1406395&r2=1406396&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java Tue Nov  6 23:22:01 2012
@@ -66,7 +66,6 @@ class SplitRequest implements Runnable {
       if (!st.prepare()) return;
       try {
         st.execute(this.server, this.server);
-        this.server.getMetrics().incrementSplitSuccessCount(System.currentTimeMillis() - startTime);
       } catch (Exception e) {
         if (this.server.isStopping() || this.server.isStopped()) {
           LOG.info(
@@ -81,7 +80,6 @@ class SplitRequest implements Runnable {
           if (st.rollback(this.server, this.server)) {
             LOG.info("Successful rollback of failed split of " +
               parent.getRegionNameAsString());
-            this.server.getMetrics().incrementSplitFailureCount();
           } else {
             this.server.abort("Abort; we got an error after point-of-no-return");
           }
@@ -102,7 +100,6 @@ class SplitRequest implements Runnable {
     } catch (IOException ex) {
       LOG.error("Split failed " + this, RemoteExceptionHandler
           .checkIOException(ex));
-      this.server.getMetrics().incrementSplitFailureCount();
       server.checkFileSystem();
     } finally {
       if (this.parent.getCoprocessorHost() != null) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1406396&r1=1406395&r2=1406396&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Nov  6 23:22:01 2012
@@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.SchemaAware;
 
 import com.google.common.collect.ImmutableList;
 
@@ -42,7 +41,7 @@ import com.google.common.collect.Immutab
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public interface Store extends SchemaAware, HeapSize {
+public interface Store extends  HeapSize {
 
   /* The default priority for user-specified compaction requests.
    * The user gets top priority unless we have blocking compactions. (Pri <= 0)
@@ -287,4 +286,8 @@ public interface Store extends SchemaAwa
    * @return the parent region hosting this store
    */
   public HRegion getHRegion();
+
+  public String getColumnFamilyName();
+
+  public String getTableName();
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1406396&r1=1406395&r2=1406396&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Tue Nov  6 23:22:01 2012
@@ -58,8 +58,6 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.io.hfile.HFileWriterV1;
 import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
-import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
-import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
 import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
 import org.apache.hadoop.hbase.util.ChecksumType;
@@ -80,7 +78,7 @@ import com.google.common.collect.Orderin
 /**
  * A Store data file.  Stores usually have one or more of these files.  They
  * are produced by flushing the memstore to disk.  To
- * create, instantiate a writer using {@link StoreFile#WriterBuilder}
+ * create, instantiate a writer using {@link StoreFile.WriterBuilder}
  * and append data. Be sure to add any metadata before calling close on the
  * Writer (Use the appendMetadata convenience methods). On close, a StoreFile
  * is sitting in the Filesystem.  To refer to it, create a StoreFile instance
@@ -91,7 +89,7 @@ import com.google.common.collect.Orderin
  * writer and a reader is that we write once but read a lot more.
  */
 @InterfaceAudience.LimitedPrivate("Coprocessor")
-public class StoreFile extends SchemaConfigured {
+public class StoreFile {
   static final Log LOG = LogFactory.getLog(StoreFile.class.getName());
 
   public static enum BloomType {
@@ -277,7 +275,6 @@ public class StoreFile extends SchemaCon
       this.modificationTimeStamp = 0;
     }
 
-    SchemaMetrics.configureGlobally(conf);
   }
 
   /**
@@ -545,11 +542,6 @@ public class StoreFile extends SchemaCon
           dataBlockEncoder.getEncodingInCache());
     }
 
-    if (isSchemaConfigured()) {
-      SchemaConfigured.resetSchemaMetricsConf(reader);
-      passSchemaMetricsTo(reader);
-    }
-
     computeHDFSBlockDistribution();
 
     // Load up indices and fileinfo. This also loads Bloom filter type.
@@ -1287,7 +1279,7 @@ public class StoreFile extends SchemaCon
   /**
    * Reader for a StoreFile.
    */
-  public static class Reader extends SchemaConfigured {
+  public static class Reader {
     static final Log LOG = LogFactory.getLog(Reader.class.getName());
 
     protected BloomFilter generalBloomFilter = null;
@@ -1301,7 +1293,6 @@ public class StoreFile extends SchemaCon
 
     public Reader(FileSystem fs, Path path, CacheConfig cacheConf,
         DataBlockEncoding preferredEncodingInCache) throws IOException {
-      super(path);
       reader = HFile.createReaderWithEncoding(fs, path, cacheConf,
           preferredEncodingInCache);
       bloomFilterType = BloomType.NONE;
@@ -1310,7 +1301,6 @@ public class StoreFile extends SchemaCon
     public Reader(FileSystem fs, Path path, HFileLink hfileLink, long size,
         CacheConfig cacheConf, DataBlockEncoding preferredEncodingInCache,
         boolean closeIStream) throws IOException {
-      super(path);
 
       FSDataInputStream in = hfileLink.open(fs);
       FSDataInputStream inNoChecksum = in;
@@ -1584,7 +1574,6 @@ public class StoreFile extends SchemaCon
                 && bloomFilter.contains(key, 0, key.length, bloom);
           }
 
-          getSchemaMetrics().updateBloomMetrics(exists);
           return exists;
         }
       } catch (IOException e) {
@@ -1728,10 +1717,6 @@ public class StoreFile extends SchemaCon
       return reader.indexSize();
     }
 
-    public String getColumnFamilyName() {
-      return reader.getColumnFamilyName();
-    }
-
     public BloomType getBloomFilterType() {
       return this.bloomFilterType;
     }
@@ -1774,11 +1759,6 @@ public class StoreFile extends SchemaCon
     public long getMaxTimestamp() {
       return timeRangeTracker.maximumTimestamp;
     }
-
-    @Override
-    public void schemaConfigurationChanged() {
-      passSchemaMetricsTo((SchemaConfigured) reader);
-    }
   }
 
   /**

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1406396&r1=1406395&r2=1406396&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Tue Nov  6 23:22:01 2012
@@ -33,8 +33,6 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.regionserver.HStore.ScanInfo;
-import org.apache.hadoop.hbase.regionserver.metrics.RegionMetricsStorage;
-import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
@@ -110,7 +108,6 @@ public class StoreScanner extends NonLaz
                               throws IOException {
     this(store, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
         scanInfo.getMinVersions());
-    initializeMetricNames();
     if (columns != null && scan.isRaw()) {
       throw new DoNotRetryIOException(
           "Cannot specify any column for a raw scan");
@@ -163,7 +160,6 @@ public class StoreScanner extends NonLaz
       long smallestReadPoint, long earliestPutTs) throws IOException {
     this(store, false, scan, null, scanInfo.getTtl(),
         scanInfo.getMinVersions());
-    initializeMetricNames();
     matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType,
         smallestReadPoint, earliestPutTs, oldestUnexpiredTS);
 
@@ -194,7 +190,6 @@ public class StoreScanner extends NonLaz
           throws IOException {
     this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
         scanInfo.getMinVersions());
-    this.initializeMetricNames();
     this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
         Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS);
 
@@ -206,23 +201,6 @@ public class StoreScanner extends NonLaz
   }
 
   /**
-   * Method used internally to initialize metric names throughout the
-   * constructors.
-   *
-   * To be called after the store variable has been initialized!
-   */
-  private void initializeMetricNames() {
-    String tableName = SchemaMetrics.UNKNOWN;
-    String family = SchemaMetrics.UNKNOWN;
-    if (store != null) {
-      tableName = store.getTableName();
-      family = Bytes.toString(store.getFamily().getName());
-    }
-    this.metricNamePrefix =
-        SchemaMetrics.generateSchemaMetricsPrefix(tableName, family);
-  }
-
-  /**
    * Get a filtered list of scanners. Assumes we are not in a compaction.
    * @return list of scanners to seek
    */
@@ -458,8 +436,7 @@ public class StoreScanner extends NonLaz
       }
     } finally {
       if (cumulativeMetric > 0 && metric != null) {
-        RegionMetricsStorage.incrNumericMetric(this.metricNamePrefix + metric,
-            cumulativeMetric);
+
       }
     }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java?rev=1406396&r1=1406395&r2=1406396&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java Tue Nov  6 23:22:01 2012
@@ -253,7 +253,6 @@ public class CompactionRequest implement
         LOG.info(((completed) ? "completed" : "aborted") + " compaction: " +
               this + "; duration=" + StringUtils.formatTimeDiff(now, start));
         if (completed) {
-          server.getMetrics().addCompaction(now - start, this.totalSize);
           // degenerate case: blocked regions require recursive enqueues
           if (s.getCompactPriority() <= 0) {
             server.compactSplitThread

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java?rev=1406396&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java Tue Nov  6 23:22:01 2012
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSource;
+
+/**
+ * This class is for maintaining the various replication statistics for a sink and publishing them
+ * through the metrics interfaces.
+ */
+@InterfaceAudience.Private
+public class MetricsSink {
+
+  public static final String SINK_AGE_OF_LAST_APPLIED_OP = "sink.ageOfLastAppliedOp";
+  public static final String SINK_APPLIED_BATCHES = "sink.appliedBatches";
+  public static final String SINK_APPLIED_OPS = "sink.appliedOps";
+
+  private MetricsReplicationSource rms;
+
+  public MetricsSink() {
+    rms = CompatibilitySingletonFactory.getInstance(MetricsReplicationSource.class);
+  }
+
+  /**
+   * Set the age of the last applied operation
+   *
+   * @param timestamp The timestamp of the last operation applied.
+   */
+  public void setAgeOfLastAppliedOp(long timestamp) {
+    long age = System.currentTimeMillis() - timestamp;
+    rms.setGauge(SINK_AGE_OF_LAST_APPLIED_OP, age);
+  }
+
+  /**
+   * Convience method to change metrics when a batch of operations are applied.
+   *
+   * @param batchSize
+   */
+  public void applyBatch(long batchSize) {
+    rms.incCounters(SINK_APPLIED_BATCHES, 1);
+    rms.incCounters(SINK_APPLIED_OPS, batchSize);
+  }
+
+}

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java?rev=1406396&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java Tue Nov  6 23:22:01 2012
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * This class is for maintaining the various replication statistics for a source and publishing them
+ * through the metrics interfaces.
+ */
+@InterfaceAudience.Private
+public class MetricsSource {
+
+  public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
+  public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp";
+  public static final String SOURCE_LOG_EDITS_READ = "source.logEditsRead";
+  public static final String SOURCE_LOG_EDITS_FILTERED = "source.logEditsFiltered";
+  public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches";
+  public static final String SOURCE_SHIPPED_OPS = "source.shippedOps";
+
+  public static final Log LOG = LogFactory.getLog(MetricsSource.class);
+  private String id;
+
+  private long lastTimestamp = 0;
+  private int lastQueueSize = 0;
+
+  private String sizeOfLogQueKey;
+  private String ageOfLastShippedOpKey;
+  private String logEditsReadKey;
+  private String logEditsFilteredKey;
+  private final String shippedBatchesKey;
+  private final String shippedOpsKey;
+
+  private MetricsReplicationSource rms;
+
+  /**
+   * Constructor used to register the metrics
+   *
+   * @param id Name of the source this class is monitoring
+   */
+  public MetricsSource(String id) {
+    this.id = id;
+
+    sizeOfLogQueKey = "source." + id + ".sizeOfLogQueue";
+    ageOfLastShippedOpKey = "source." + id + ".ageOfLastShippedOp";
+    logEditsReadKey = "source." + id + ".logEditsRead";
+    logEditsFilteredKey = "source." + id + ".logEditsFiltered";
+    shippedBatchesKey = "source." + this.id + ".shippedBatches";
+    shippedOpsKey = "source." + this.id + ".shippedOps";
+    rms = CompatibilitySingletonFactory.getInstance(MetricsReplicationSource.class);
+  }
+
+  /**
+   * Set the age of the last edit that was shipped
+   *
+   * @param timestamp write time of the edit
+   */
+  public void setAgeOfLastShippedOp(long timestamp) {
+    long age = EnvironmentEdgeManager.currentTimeMillis() - timestamp;
+    rms.setGauge(ageOfLastShippedOpKey, age);
+    rms.setGauge(SOURCE_AGE_OF_LAST_SHIPPED_OP, age);
+    this.lastTimestamp = timestamp;
+  }
+
+  /**
+   * Convenience method to use the last given timestamp to refresh the age of the last edit. Used
+   * when replication fails and need to keep that metric accurate.
+   */
+  public void refreshAgeOfLastShippedOp() {
+    if (this.lastTimestamp > 0) {
+      setAgeOfLastShippedOp(this.lastTimestamp);
+    }
+  }
+
+  /**
+   * Set the size of the log queue
+   *
+   * @param size the size.
+   */
+  public void setSizeOfLogQueue(int size) {
+    rms.setGauge(sizeOfLogQueKey, size);
+    rms.incGauge(SOURCE_SIZE_OF_LOG_QUEUE, size - lastQueueSize);
+    lastQueueSize = size;
+  }
+
+  /**
+   * Add on the the number of log edits read
+   *
+   * @param delta the number of log edits read.
+   */
+  private void incrLogEditsRead(long delta) {
+    rms.incCounters(logEditsReadKey, delta);
+    rms.incCounters(SOURCE_LOG_EDITS_READ, delta);
+  }
+
+  /** Increment the number of log edits read by one. */
+  public void incrLogEditsRead() {
+    incrLogEditsRead(1);
+  }
+
+  /**
+   * Add on the number of log edits filtered
+   *
+   * @param delta the number filtered.
+   */
+  private void incrLogEditsFiltered(long delta) {
+    rms.incCounters(logEditsFilteredKey, delta);
+    rms.incCounters(SOURCE_LOG_EDITS_FILTERED, delta);
+  }
+
+  /** The number of log edits filtered out. */
+  public void incrLogEditsFiltered() {
+    incrLogEditsFiltered(1);
+  }
+
+  /**
+   * Convience method to apply changes to metrics do to shipping a batch of logs.
+   *
+   * @param batchSize the size of the batch that was shipped to sinks.
+   */
+  public void shipBatch(long batchSize) {
+    rms.incCounters(shippedBatchesKey, 1);
+    rms.incCounters(SOURCE_SHIPPED_BATCHES, 1);
+    rms.incCounters(shippedOpsKey, batchSize);
+    rms.incCounters(SOURCE_SHIPPED_OPS, batchSize);
+  }
+
+  /** Removes all metrics about this Source. */
+  public void clear() {
+    rms.removeMetric(sizeOfLogQueKey);
+    rms.decGauge(SOURCE_SIZE_OF_LOG_QUEUE, lastQueueSize);
+    lastQueueSize = 0;
+    rms.removeMetric(ageOfLastShippedOpKey);
+
+    rms.removeMetric(logEditsFilteredKey);
+    rms.removeMetric(logEditsReadKey);
+
+  }
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=1406396&r1=1406395&r2=1406396&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java Tue Nov  6 23:22:01 2012
@@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.replication.regionserver.metrics.ReplicationSinkMetrics;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
 
@@ -73,7 +72,7 @@ public class ReplicationSink {
   private final Configuration conf;
   private final ExecutorService sharedThreadPool;
   private final HConnection sharedHtableCon;
-  private final ReplicationSinkMetrics metrics;
+  private final MetricsSink metrics;
 
   /**
    * Create a sink for replication
@@ -86,7 +85,7 @@ public class ReplicationSink {
       throws IOException {
     this.conf = HBaseConfiguration.create(conf);
     decorateConf();
-    this.metrics = new ReplicationSinkMetrics();
+    this.metrics = new MetricsSink();
     this.sharedHtableCon = HConnectionManager.createConnection(this.conf);
     this.sharedThreadPool = new ThreadPoolExecutor(1,
         conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE),

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1406396&r1=1406395&r2=1406396&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Tue Nov  6 23:22:01 2012
@@ -56,12 +56,8 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
-import org.apache.hadoop.hbase.replication.regionserver.metrics.ReplicationSourceMetrics;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.zookeeper.KeeperException;
 
@@ -141,7 +137,7 @@ public class ReplicationSource extends T
   // Indicates if this particular source is running
   private volatile boolean running = true;
   // Metrics for this source
-  private ReplicationSourceMetrics metrics;
+  private MetricsSource metrics;
 
   /**
    * Instantiation method used by region servers
@@ -188,7 +184,7 @@ public class ReplicationSource extends T
     this.sleepForRetries =
         this.conf.getLong("replication.source.sleepforretries", 1000);
     this.fs = fs;
-    this.metrics = new ReplicationSourceMetrics(peerClusterZnode);
+    this.metrics = new MetricsSource(peerClusterZnode);
 
     try {
       this.clusterId = zkHelper.getUUIDForCluster(zkHelper.getZookeeperWatcher());