You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/05/01 22:48:37 UTC
svn commit: r1332811 -
/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Author: larsh
Date: Tue May 1 20:48:37 2012
New Revision: 1332811
URL: http://svn.apache.org/viewvc?rev=1332811&view=rev
Log:
HBASE-5897 prePut coprocessor hook causing substantial CPU usage
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1332811&r1=1332810&r2=1332811&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue May 1 20:48:37 2012
@@ -1928,10 +1928,12 @@ public class HRegion implements HeapSize
T[] operations;
int nextIndexToProcess = 0;
OperationStatus[] retCodeDetails;
+ WALEdit[] walEditsFromCoprocessors;
public BatchOperationInProgress(T[] operations) {
this.operations = operations;
this.retCodeDetails = new OperationStatus[operations.length];
+ this.walEditsFromCoprocessors = new WALEdit[operations.length];
Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
}
@@ -1968,14 +1970,21 @@ public class HRegion implements HeapSize
BatchOperationInProgress<Pair<Put, Integer>> batchOp =
new BatchOperationInProgress<Pair<Put,Integer>>(putsAndLocks);
+ boolean initialized = false;
+
while (!batchOp.isDone()) {
checkReadOnly();
checkResources();
long newSize;
startRegionOperation();
- this.writeRequestsCount.increment();
+
try {
+ if (!initialized) {
+ this.writeRequestsCount.increment();
+ doPrePutHook(batchOp);
+ initialized = true;
+ }
long addedSize = doMiniBatchPut(batchOp);
newSize = this.addAndGetGlobalMemstoreSize(addedSize);
} finally {
@@ -1988,33 +1997,42 @@ public class HRegion implements HeapSize
return batchOp.retCodeDetails;
}
+ private void doPrePutHook(BatchOperationInProgress<Pair<Put, Integer>> batchOp)
+ throws IOException {
+ /* Run coprocessor pre hook outside of locks to avoid deadlock */
+ WALEdit walEdit = new WALEdit();
+ if (coprocessorHost != null) {
+ for (int i = 0 ; i < batchOp.operations.length; i++) {
+ Pair<Put, Integer> nextPair = batchOp.operations[i];
+ Put put = nextPair.getFirst();
+ if (coprocessorHost.prePut(put, walEdit, put.getWriteToWAL())) {
+ // pre hook says skip this Put
+ // mark as success and skip in doMiniBatchPut
+ batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
+ }
+ if (!walEdit.isEmpty()) {
+ batchOp.walEditsFromCoprocessors[i] = walEdit;
+ walEdit = new WALEdit();
+ }
+ }
+ }
+ }
+
+
@SuppressWarnings("unchecked")
private long doMiniBatchPut(
BatchOperationInProgress<Pair<Put, Integer>> batchOp) throws IOException {
- final String tableName = getTableDesc().getNameAsString();
-
// variable to note if all Put items are for the same CF -- metrics related
boolean cfSetConsistent = true;
//The set of columnFamilies first seen.
Set<byte[]> cfSet = null;
+
+ WALEdit walEdit = new WALEdit();
long startTimeMs = EnvironmentEdgeManager.currentTimeMillis();
- WALEdit walEdit = new WALEdit();
- /* Run coprocessor pre hook outside of locks to avoid deadlock */
- if (coprocessorHost != null) {
- for (int i = 0; i < batchOp.operations.length; i++) {
- Pair<Put, Integer> nextPair = batchOp.operations[i];
- Put put = nextPair.getFirst();
- if (coprocessorHost.prePut(put, walEdit, put.getWriteToWAL())) {
- // pre hook says skip this Put
- // mark as success and skip below
- batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
- }
- }
- }
MultiVersionConsistencyControl.WriteEntry w = null;
long txid = 0;
@@ -2152,7 +2170,15 @@ public class HRegion implements HeapSize
Put p = batchOp.operations[i].getFirst();
if (!p.getWriteToWAL()) continue;
+ // Add WAL edits by CP
+ WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
+ if (fromCP != null) {
+ for (KeyValue kv : fromCP.getKeyValues()) {
+ walEdit.add(kv);
+ }
+ }
addFamilyMapToWALEdit(familyMaps[i], walEdit);
+
}
// -------------------------