You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:21:50 UTC

svn commit: r1181575 - /hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Author: nspiegelberg
Date: Tue Oct 11 02:21:50 2011
New Revision: 1181575

URL: http://svn.apache.org/viewvc?rev=1181575&view=rev
Log:
Added Get/Delete/Put per CF metrics in HRegion.

Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1181575&r1=1181574&r2=1181575&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Oct 11 02:21:50 2011
@@ -46,8 +46,6 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import javax.print.attribute.standard.Finishings;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -60,12 +58,12 @@ import org.apache.hadoop.hbase.DroppedSn
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.UnknownScannerException;
-import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
@@ -260,6 +258,53 @@ public class HRegion implements HeapSize
       new ConcurrentHashMap<String,
                             Pair<AtomicLong, AtomicInteger>>();
 
+  /**
+   * Method to transform a set of column families in byte[] format into a
+   * signature for printing out in metrics
+   *
+   * @param families
+   *          the ordered set of column families
+   * @return a string to print out in metrics
+   */
+  private String createMutationSignature(Set<byte[]> families) {
+    int limit = families.size();
+    if (1 == limit) {
+      return "cf." + Bytes.toString(families.iterator().next());
+    }
+
+    StringBuilder sb = new StringBuilder("cf.");
+
+    int MAX_SIZE = 256;
+    for (byte[] family : families) {
+      if (sb.length() > MAX_SIZE) {
+        sb.append("__more");
+        break;
+      }
+
+      --limit;
+      sb.append(Bytes.toString(family));
+      if (0 != limit) {
+        sb.append(":");
+      }
+    }
+
+    return sb.toString();
+  }
+
+  /**
+   * Method to transform a single column family in byte[] format into a
+   * signature for printing out in metrics. Used as overloading so as to not
+   * create an extra Set. Could have gone further and imposed restriction on the
+   * Set version to be used for length > 1, but that puts strain on method user.
+   *
+   * @param family
+   *          the family to convert
+   * @return the string to print out in metrics
+   */
+  private String createMutationSignature(byte[] family) {
+    return "cf." + Bytes.toString(family);
+  }
+
   public static void incrNumericMetric(String key, long amount) {
     AtomicLong oldVal = numericMetrics.get(key);
     if (oldVal == null) {
@@ -1392,7 +1437,6 @@ public class HRegion implements HeapSize
     } finally {
       if(lockid == null) releaseRowLock(lid);
       splitsAndClosesLock.readLock().unlock();
-      HRegion.writeOps.incrementAndGet();
     }
   }
 
@@ -1478,6 +1522,11 @@ public class HRegion implements HeapSize
       this.updatesLock.readLock().unlock();
     }
 
+    // do after lock
+    long after = EnvironmentEdgeManager.currentTimeMillis();
+    String signature = this.createMutationSignature(familyMap.keySet());
+    HRegion.incrTimeVaryingMetric(signature + ".delete_", after - now);
+
     if (flush) {
       // Request a cache flush.  Do it outside update lock.
       requestFlush();
@@ -1615,6 +1664,10 @@ public class HRegion implements HeapSize
   }
 
   private long doMiniBatchPut(BatchOperationInProgress<Pair<Put, Integer>> batchOp) throws IOException {
+    String signature = null;
+    // variable to note if all Put items are for the same CF -- metrics related
+    boolean isSignatureClear = true;
+
     long now = EnvironmentEdgeManager.currentTimeMillis();
     byte[] byteNow = Bytes.toBytes(now);
     boolean locked = false;
@@ -1660,6 +1713,21 @@ public class HRegion implements HeapSize
         }
         lastIndexExclusive++;
         numReadyToWrite++;
+
+        // if first time around, designate expected signature for metric
+        // else, if all have been consistent so far, check if it still holds
+        // all else, designate failure signature and mark as unclear
+        if (null == signature) {
+          signature = this.createMutationSignature(put.getFamilyMap().keySet());
+        } else {
+          if (isSignatureClear) {
+            if (!signature.equals(this.createMutationSignature(put
+                .getFamilyMap().keySet()))) {
+              isSignatureClear = false;
+              signature = "cf.__unknown";
+            }
+          }
+        }
       }
       // We've now grabbed as many puts off the list as we can
       assert numReadyToWrite > 0;
@@ -1714,6 +1782,14 @@ public class HRegion implements HeapSize
       for (Integer toRelease : acquiredLocks) {
         releaseRowLock(toRelease);
       }
+
+      // do after lock
+      long after = EnvironmentEdgeManager.currentTimeMillis();
+      if (null == signature) {
+        signature = "cf.__badfamily";
+      }
+      HRegion.incrTimeVaryingMetric(signature + ".multiput_", after - now);
+
       if (!success) {
         for (int i = firstIndex; i < lastIndexExclusive; i++) {
           if (batchOp.retCodes[i] == OperationStatusCode.NOT_RUN) {
@@ -1927,6 +2003,12 @@ public class HRegion implements HeapSize
     } finally {
       this.updatesLock.readLock().unlock();
     }
+
+    // do after lock
+    long after = EnvironmentEdgeManager.currentTimeMillis();
+    String signature = this.createMutationSignature(familyMap.keySet());
+    HRegion.incrTimeVaryingMetric(signature + ".put_", after - now);
+
     if (flush) {
       // Request a cache flush.  Do it outside update lock.
       requestFlush();
@@ -2520,6 +2602,7 @@ public class HRegion implements HeapSize
       }
     }
 
+    @Override
     public synchronized boolean next(List<KeyValue> outResults, int limit)
         throws IOException {
       if (this.filterClosed) {
@@ -2547,6 +2630,7 @@ public class HRegion implements HeapSize
       return returnResult;
     }
 
+    @Override
     public synchronized boolean next(List<KeyValue> outResults)
         throws IOException {
       // apply the batching limit by default
@@ -2641,6 +2725,7 @@ public class HRegion implements HeapSize
               currentRow, 0, currentRow.length) <= isScan);
     }
 
+    @Override
     public synchronized void close() {
       if (storeHeap != null) {
         storeHeap.close();
@@ -3119,13 +3204,13 @@ public class HRegion implements HeapSize
       if (stats == null || stats.length == 0) {
         return;
       }
-      for (int i = 0; i < stats.length; i++) {
-        String path = stats[i].getPath().toString();
-        if (stats[i].isDir()) {
+      for (FileStatus stat : stats) {
+        String path = stat.getPath().toString();
+        if (stat.isDir()) {
           LOG.debug("d " + path);
-          listPaths(fs, stats[i].getPath());
+          listPaths(fs, stat.getPath());
         } else {
-          LOG.debug("f " + path + " size=" + stats[i].getLen());
+          LOG.debug("f " + path + " size=" + stat.getLen());
         }
       }
     }
@@ -3161,6 +3246,8 @@ public class HRegion implements HeapSize
    * Do a get based on the get parameter.
    */
   private List<KeyValue> get(final Get get) throws IOException {
+    long now = EnvironmentEdgeManager.currentTimeMillis();
+
     Scan scan = new Scan(get);
 
     List<KeyValue> results = new ArrayList<KeyValue>();
@@ -3173,6 +3260,12 @@ public class HRegion implements HeapSize
       if (scanner != null)
         scanner.close();
     }
+
+    // do after lock
+    long after = EnvironmentEdgeManager.currentTimeMillis();
+    String signature = this.createMutationSignature(get.familySet());
+    HRegion.incrTimeVaryingMetric(signature + ".get_", after - now);
+
     return results;
   }
 
@@ -3189,6 +3282,9 @@ public class HRegion implements HeapSize
   public long incrementColumnValue(byte [] row, byte [] family,
       byte [] qualifier, long amount, boolean writeToWAL)
   throws IOException {
+    // to be used for metrics
+    long before = EnvironmentEdgeManager.currentTimeMillis();
+
     checkRow(row);
     boolean flush = false;
     // Lock row
@@ -3238,6 +3334,11 @@ public class HRegion implements HeapSize
       HRegion.writeOps.incrementAndGet();
     }
 
+    // do after lock
+    long after = EnvironmentEdgeManager.currentTimeMillis();
+    String signature = this.createMutationSignature(family);
+    HRegion.incrTimeVaryingMetric(signature + ".increment_", after - before);
+
     if (flush) {
       // Request a cache flush.  Do it outside update lock.
       requestFlush();
@@ -3279,6 +3380,7 @@ public class HRegion implements HeapSize
         (5 * Bytes.SIZEOF_BOOLEAN)) +
         (3 * ClassSize.REENTRANT_LOCK));
 
+  @Override
   public long heapSize() {
     long heapSize = DEEP_OVERHEAD;
     for(Store store : this.stores.values()) {