You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:21:50 UTC
svn commit: r1181575 -
/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Author: nspiegelberg
Date: Tue Oct 11 02:21:50 2011
New Revision: 1181575
URL: http://svn.apache.org/viewvc?rev=1181575&view=rev
Log:
Added Get/Delete/Put per CF metrics in HRegion.
Modified:
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1181575&r1=1181574&r2=1181575&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Oct 11 02:21:50 2011
@@ -46,8 +46,6 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import javax.print.attribute.standard.Finishings;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -60,12 +58,12 @@ import org.apache.hadoop.hbase.DroppedSn
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.UnknownScannerException;
-import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
@@ -260,6 +258,53 @@ public class HRegion implements HeapSize
new ConcurrentHashMap<String,
Pair<AtomicLong, AtomicInteger>>();
+ /**
+ * Method to transform a set of column families in byte[] format into a
+ * signature for printing out in metrics
+ *
+ * @param families
+ * the ordered set of column families
+ * @return a string to print out in metrics
+ */
+ private String createMutationSignature(Set<byte[]> families) {
+ int limit = families.size();
+ if (1 == limit) {
+ return "cf." + Bytes.toString(families.iterator().next());
+ }
+
+ StringBuilder sb = new StringBuilder("cf.");
+
+ int MAX_SIZE = 256;
+ for (byte[] family : families) {
+ if (sb.length() > MAX_SIZE) {
+ sb.append("__more");
+ break;
+ }
+
+ --limit;
+ sb.append(Bytes.toString(family));
+ if (0 != limit) {
+ sb.append(":");
+ }
+ }
+
+ return sb.toString();
+ }
+
+ /**
+ * Method to transform a single column family in byte[] format into a
+ * signature for printing out in metrics. Used as overloading so as to not
+ * create an extra Set. Could have gone further and imposed restriction on the
+ * Set version to be used for length > 1, but that puts strain on method user.
+ *
+ * @param family
+ * the family to convert
+ * @return the string to print out in metrics
+ */
+ private String createMutationSignature(byte[] family) {
+ return "cf." + Bytes.toString(family);
+ }
+
public static void incrNumericMetric(String key, long amount) {
AtomicLong oldVal = numericMetrics.get(key);
if (oldVal == null) {
@@ -1392,7 +1437,6 @@ public class HRegion implements HeapSize
} finally {
if(lockid == null) releaseRowLock(lid);
splitsAndClosesLock.readLock().unlock();
- HRegion.writeOps.incrementAndGet();
}
}
@@ -1478,6 +1522,11 @@ public class HRegion implements HeapSize
this.updatesLock.readLock().unlock();
}
+ // do after lock
+ long after = EnvironmentEdgeManager.currentTimeMillis();
+ String signature = this.createMutationSignature(familyMap.keySet());
+ HRegion.incrTimeVaryingMetric(signature + ".delete_", after - now);
+
if (flush) {
// Request a cache flush. Do it outside update lock.
requestFlush();
@@ -1615,6 +1664,10 @@ public class HRegion implements HeapSize
}
private long doMiniBatchPut(BatchOperationInProgress<Pair<Put, Integer>> batchOp) throws IOException {
+ String signature = null;
+ // variable to note if all Put items are for the same CF -- metrics related
+ boolean isSignatureClear = true;
+
long now = EnvironmentEdgeManager.currentTimeMillis();
byte[] byteNow = Bytes.toBytes(now);
boolean locked = false;
@@ -1660,6 +1713,21 @@ public class HRegion implements HeapSize
}
lastIndexExclusive++;
numReadyToWrite++;
+
+ // if first time around, designate expected signature for metric
+ // else, if all have been consistent so far, check if it still holds
+ // all else, designate failure signature and mark as unclear
+ if (null == signature) {
+ signature = this.createMutationSignature(put.getFamilyMap().keySet());
+ } else {
+ if (isSignatureClear) {
+ if (!signature.equals(this.createMutationSignature(put
+ .getFamilyMap().keySet()))) {
+ isSignatureClear = false;
+ signature = "cf.__unknown";
+ }
+ }
+ }
}
// We've now grabbed as many puts off the list as we can
assert numReadyToWrite > 0;
@@ -1714,6 +1782,14 @@ public class HRegion implements HeapSize
for (Integer toRelease : acquiredLocks) {
releaseRowLock(toRelease);
}
+
+ // do after lock
+ long after = EnvironmentEdgeManager.currentTimeMillis();
+ if (null == signature) {
+ signature = "cf.__badfamily";
+ }
+ HRegion.incrTimeVaryingMetric(signature + ".multiput_", after - now);
+
if (!success) {
for (int i = firstIndex; i < lastIndexExclusive; i++) {
if (batchOp.retCodes[i] == OperationStatusCode.NOT_RUN) {
@@ -1927,6 +2003,12 @@ public class HRegion implements HeapSize
} finally {
this.updatesLock.readLock().unlock();
}
+
+ // do after lock
+ long after = EnvironmentEdgeManager.currentTimeMillis();
+ String signature = this.createMutationSignature(familyMap.keySet());
+ HRegion.incrTimeVaryingMetric(signature + ".put_", after - now);
+
if (flush) {
// Request a cache flush. Do it outside update lock.
requestFlush();
@@ -2520,6 +2602,7 @@ public class HRegion implements HeapSize
}
}
+ @Override
public synchronized boolean next(List<KeyValue> outResults, int limit)
throws IOException {
if (this.filterClosed) {
@@ -2547,6 +2630,7 @@ public class HRegion implements HeapSize
return returnResult;
}
+ @Override
public synchronized boolean next(List<KeyValue> outResults)
throws IOException {
// apply the batching limit by default
@@ -2641,6 +2725,7 @@ public class HRegion implements HeapSize
currentRow, 0, currentRow.length) <= isScan);
}
+ @Override
public synchronized void close() {
if (storeHeap != null) {
storeHeap.close();
@@ -3119,13 +3204,13 @@ public class HRegion implements HeapSize
if (stats == null || stats.length == 0) {
return;
}
- for (int i = 0; i < stats.length; i++) {
- String path = stats[i].getPath().toString();
- if (stats[i].isDir()) {
+ for (FileStatus stat : stats) {
+ String path = stat.getPath().toString();
+ if (stat.isDir()) {
LOG.debug("d " + path);
- listPaths(fs, stats[i].getPath());
+ listPaths(fs, stat.getPath());
} else {
- LOG.debug("f " + path + " size=" + stats[i].getLen());
+ LOG.debug("f " + path + " size=" + stat.getLen());
}
}
}
@@ -3161,6 +3246,8 @@ public class HRegion implements HeapSize
* Do a get based on the get parameter.
*/
private List<KeyValue> get(final Get get) throws IOException {
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+
Scan scan = new Scan(get);
List<KeyValue> results = new ArrayList<KeyValue>();
@@ -3173,6 +3260,12 @@ public class HRegion implements HeapSize
if (scanner != null)
scanner.close();
}
+
+ // do after lock
+ long after = EnvironmentEdgeManager.currentTimeMillis();
+ String signature = this.createMutationSignature(get.familySet());
+ HRegion.incrTimeVaryingMetric(signature + ".get_", after - now);
+
return results;
}
@@ -3189,6 +3282,9 @@ public class HRegion implements HeapSize
public long incrementColumnValue(byte [] row, byte [] family,
byte [] qualifier, long amount, boolean writeToWAL)
throws IOException {
+ // to be used for metrics
+ long before = EnvironmentEdgeManager.currentTimeMillis();
+
checkRow(row);
boolean flush = false;
// Lock row
@@ -3238,6 +3334,11 @@ public class HRegion implements HeapSize
HRegion.writeOps.incrementAndGet();
}
+ // do after lock
+ long after = EnvironmentEdgeManager.currentTimeMillis();
+ String signature = this.createMutationSignature(family);
+ HRegion.incrTimeVaryingMetric(signature + ".increment_", after - before);
+
if (flush) {
// Request a cache flush. Do it outside update lock.
requestFlush();
@@ -3279,6 +3380,7 @@ public class HRegion implements HeapSize
(5 * Bytes.SIZEOF_BOOLEAN)) +
(3 * ClassSize.REENTRANT_LOCK));
+ @Override
public long heapSize() {
long heapSize = DEEP_OVERHEAD;
for(Store store : this.stores.values()) {