You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2012/09/20 19:59:30 UTC
svn commit: r1388141 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
Author: ramkrishna
Date: Thu Sep 20 17:59:30 2012
New Revision: 1388141
URL: http://svn.apache.org/viewvc?rev=1388141&view=rev
Log:
HBASE-6698 Refactor checkAndPut and checkAndDelete to use doMiniBatchMutation
(Priya)
Submitted by:PrIya
Reviewed by:Ram, Stack, Ted, Lars
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1388141&r1=1388140&r2=1388141&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Sep 20 17:59:30 2012
@@ -1783,8 +1783,7 @@ public class HRegion implements HeapSize
try {
// All edits for the given row (across all column families) must happen atomically.
- prepareDelete(delete);
- internalDelete(delete, delete.getClusterId(), writeToWAL);
+ doBatchMutate(delete, lid);
} finally {
if(lockid == null) releaseRowLock(lid);
}
@@ -1801,11 +1800,11 @@ public class HRegion implements HeapSize
*/
void delete(Map<byte[], List<KeyValue>> familyMap, UUID clusterId,
boolean writeToWAL) throws IOException {
- Delete delete = new Delete();
+ Delete delete = new Delete(HConstants.EMPTY_BYTE_ARRAY);
delete.setFamilyMap(familyMap);
delete.setClusterId(clusterId);
delete.setWriteToWAL(writeToWAL);
- internalDelete(delete, clusterId, writeToWAL);
+ doBatchMutate(delete, null);
}
/**
@@ -1863,65 +1862,6 @@ public class HRegion implements HeapSize
}
/**
- * @param delete The Delete command
- * @param clusterId UUID of the originating cluster (for replication).
- * @param writeToWAL
- * @throws IOException
- */
- private void internalDelete(Delete delete, UUID clusterId,
- boolean writeToWAL) throws IOException {
- Map<byte[], List<KeyValue>> familyMap = delete.getFamilyMap();
- WALEdit walEdit = new WALEdit();
- /* Run coprocessor pre hook outside of locks to avoid deadlock */
- if (coprocessorHost != null) {
- if (coprocessorHost.preDelete(delete, walEdit, writeToWAL)) {
- return;
- }
- }
-
- long now = EnvironmentEdgeManager.currentTimeMillis();
- byte [] byteNow = Bytes.toBytes(now);
- boolean flush = false;
-
- updatesLock.readLock().lock();
- try {
- prepareDeleteTimestamps(delete.getFamilyMap(), byteNow);
-
- if (writeToWAL) {
- // write/sync to WAL should happen before we touch memstore.
- //
- // If order is reversed, i.e. we write to memstore first, and
- // for some reason fail to write/sync to commit log, the memstore
- // will contain uncommitted transactions.
- //
- // bunch up all edits across all column families into a
- // single WALEdit.
- addFamilyMapToWALEdit(familyMap, walEdit);
- this.log.append(regionInfo, this.htableDescriptor.getName(),
- walEdit, clusterId, now, this.htableDescriptor);
- }
-
- // Now make changes to the memstore.
- long addedSize = applyFamilyMapToMemstore(familyMap, null);
- flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
-
- } finally {
- this.updatesLock.readLock().unlock();
- }
- // do after lock
- if (coprocessorHost != null) {
- coprocessorHost.postDelete(delete, walEdit, writeToWAL);
- }
- final long after = EnvironmentEdgeManager.currentTimeMillis();
- this.opMetrics.updateDeleteMetrics(familyMap.keySet(), after-now);
-
- if (flush) {
- // Request a cache flush. Do it outside update lock.
- requestFlush();
- }
- }
-
- /**
* @param put
* @throws IOException
*/
@@ -1978,7 +1918,7 @@ public class HRegion implements HeapSize
try {
// All edits for the given row (across all column families) must happen atomically.
- internalPut(put, put.getClusterId(), writeToWAL);
+ doBatchMutate(put, lid);
} finally {
if(lockid == null) releaseRowLock(lid);
}
@@ -2444,7 +2384,6 @@ public class HRegion implements HeapSize
}
startRegionOperation();
- this.writeRequestsCount.increment();
try {
RowLock lock = isPut ? ((Put)w).getRowLock() : ((Delete)w).getRowLock();
Get get = new Get(row, lock);
@@ -2496,17 +2435,7 @@ public class HRegion implements HeapSize
if (matches) {
// All edits for the given row (across all column families) must
// happen atomically.
- //
- // Using default cluster id, as this can only happen in the
- // originating cluster. A slave cluster receives the result as a Put
- // or Delete
- if (isPut) {
- internalPut(((Put) w), HConstants.DEFAULT_CLUSTER_ID, writeToWAL);
- } else {
- Delete d = (Delete)w;
- prepareDelete(d);
- internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, writeToWAL);
- }
+ doBatchMutate((Mutation)w, lid);
this.checkAndMutateChecksPassed.increment();
return true;
}
@@ -2520,6 +2449,18 @@ public class HRegion implements HeapSize
}
}
+ @SuppressWarnings("unchecked")
+ private void doBatchMutate(Mutation mutation, Integer lid) throws IOException,
+ DoNotRetryIOException {
+ Pair<Mutation, Integer>[] mutateWithLocks = new Pair[] { new Pair<Mutation, Integer>(mutation,
+ lid) };
+ OperationStatus[] batchMutate = this.batchMutate(mutateWithLocks);
+ if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
+ throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
+ } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
+ throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
+ }
+ }
/**
* Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP}
@@ -2592,7 +2533,7 @@ public class HRegion implements HeapSize
* @praram now
* @throws IOException
*/
- private void put(byte [] family, List<KeyValue> edits)
+ private void put(byte [] family, List<KeyValue> edits, Integer lid)
throws IOException {
Map<byte[], List<KeyValue>> familyMap;
familyMap = new HashMap<byte[], List<KeyValue>>();
@@ -2602,70 +2543,9 @@ public class HRegion implements HeapSize
p.setFamilyMap(familyMap);
p.setClusterId(HConstants.DEFAULT_CLUSTER_ID);
p.setWriteToWAL(true);
- this.internalPut(p, HConstants.DEFAULT_CLUSTER_ID, true);
+ doBatchMutate(p, lid);
}
-
- /**
- * Add updates first to the hlog (if writeToWal) and then add values to memstore.
- * Warning: Assumption is caller has lock on passed in row.
- * @param put The Put command
- * @param clusterId UUID of the originating cluster (for replication).
- * @param writeToWAL if true, then we should write to the log
- * @throws IOException
- */
- private void internalPut(Put put, UUID clusterId, boolean writeToWAL) throws IOException {
- Map<byte[], List<KeyValue>> familyMap = put.getFamilyMap();
- WALEdit walEdit = new WALEdit();
- /* run pre put hook outside of lock to avoid deadlock */
- if (coprocessorHost != null) {
- if (coprocessorHost.prePut(put, walEdit, writeToWAL)) {
- return;
- }
- }
-
- long now = EnvironmentEdgeManager.currentTimeMillis();
- byte[] byteNow = Bytes.toBytes(now);
- boolean flush = false;
-
- this.updatesLock.readLock().lock();
- try {
- checkFamilies(familyMap.keySet());
- checkTimestamps(familyMap, now);
- updateKVTimestamps(familyMap.values(), byteNow);
- // write/sync to WAL should happen before we touch memstore.
- //
- // If order is reversed, i.e. we write to memstore first, and
- // for some reason fail to write/sync to commit log, the memstore
- // will contain uncommitted transactions.
- if (writeToWAL) {
- addFamilyMapToWALEdit(familyMap, walEdit);
- this.log.append(regionInfo, this.htableDescriptor.getName(),
- walEdit, clusterId, now, this.htableDescriptor);
- } else {
- recordPutWithoutWal(familyMap);
- }
-
- long addedSize = applyFamilyMapToMemstore(familyMap, null);
- flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
- } finally {
- this.updatesLock.readLock().unlock();
- }
-
- if (coprocessorHost != null) {
- coprocessorHost.postPut(put, walEdit, writeToWAL);
- }
-
- // do after lock
- final long after = EnvironmentEdgeManager.currentTimeMillis();
- this.opMetrics.updatePutMetrics(familyMap.keySet(), after - now);
-
-
- if (flush) {
- // Request a cache flush. Do it outside update lock.
- requestFlush();
- }
- }
-
+
/**
* Atomically apply the given map of family->edits to the memstore.
* This handles the consistency control on its own, but the caller
@@ -4019,7 +3899,7 @@ public class HRegion implements HeapSize
edits.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
HConstants.META_VERSION_QUALIFIER, now,
Bytes.toBytes(HConstants.META_VERSION)));
- meta.put(HConstants.CATALOG_FAMILY, edits);
+ meta.put(HConstants.CATALOG_FAMILY, edits, lid);
} finally {
meta.releaseRowLock(lid);
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java?rev=1388141&r1=1388140&r2=1388141&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java Thu Sep 20 17:59:30 2012
@@ -189,10 +189,10 @@ public class TestRegionServerMetrics {
assertTimeVaryingMetricCount(1, TABLE_NAME, cf, regionName, "append_");
// One delete where the cf is known
- assertTimeVaryingMetricCount(1, TABLE_NAME, cf, null, "delete_");
+ assertTimeVaryingMetricCount(1, TABLE_NAME, cf, null, "multidelete_");
// two deletes in the region.
- assertTimeVaryingMetricCount(2, TABLE_NAME, null, regionName, "delete_");
+ assertTimeVaryingMetricCount(2, TABLE_NAME, null, regionName, "multidelete_");
// Three gets. one for gets. One for append. One for increment.
assertTimeVaryingMetricCount(3, TABLE_NAME, cf, regionName, "get_");