You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2013/07/17 07:33:51 UTC
svn commit: r1504000 - in /hbase/branches/0.95:
hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/
hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ h...
Author: larsh
Date: Wed Jul 17 05:33:50 2013
New Revision: 1504000
URL: http://svn.apache.org/r1504000
Log:
HBASE-8877 Reentrant row locks (Dave Latham)
Modified:
hbase/branches/0.95/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java
Modified: hbase/branches/0.95/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java?rev=1504000&r1=1503999&r2=1504000&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java (original)
+++ hbase/branches/0.95/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java Wed Jul 17 05:33:50 2013
@@ -49,7 +49,6 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.OperationStatus;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
@@ -146,20 +145,19 @@ public class BulkDeleteEndpoint extends
}
}
if (deleteRows.size() > 0) {
- Pair<Mutation, Integer>[] deleteWithLockArr = new Pair[deleteRows.size()];
+ Mutation[] deleteArr = new Mutation[deleteRows.size()];
int i = 0;
for (List<KeyValue> deleteRow : deleteRows) {
- Delete delete = createDeleteMutation(deleteRow, deleteType, timestamp);
- deleteWithLockArr[i++] = new Pair<Mutation, Integer>(delete, null);
+ deleteArr[i++] = createDeleteMutation(deleteRow, deleteType, timestamp);
}
- OperationStatus[] opStatus = region.batchMutate(deleteWithLockArr);
+ OperationStatus[] opStatus = region.batchMutate(deleteArr);
for (i = 0; i < opStatus.length; i++) {
if (opStatus[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) {
break;
}
totalRowsDeleted++;
if (deleteType == DeleteType.VERSION) {
- byte[] versionsDeleted = deleteWithLockArr[i].getFirst().getAttribute(
+ byte[] versionsDeleted = deleteArr[i].getAttribute(
NO_OF_VERSIONS_TO_DELETE);
if (versionsDeleted != null) {
totalVersionsDeleted += Bytes.toInt(versionsDeleted);
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java?rev=1504000&r1=1503999&r2=1504000&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java Wed Jul 17 05:33:50 2013
@@ -253,12 +253,12 @@ public abstract class BaseRegionObserver
@Override
public void preBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
- final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
+ final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
}
@Override
public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
- final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
+ final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
}
@Override
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java?rev=1504000&r1=1503999&r2=1504000&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java Wed Jul 17 05:33:50 2013
@@ -554,7 +554,7 @@ public interface RegionObserver extends
* @throws IOException if an error occurred on the coprocessor
*/
void preBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
- final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException;
+ final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException;
/**
* This will be called after applying a batch of Mutations on a region. The Mutations are added to
@@ -564,7 +564,7 @@ public interface RegionObserver extends
* @throws IOException if an error occurred on the coprocessor
*/
void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
- final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException;
+ final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException;
/**
* Called before checkAndPut
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1504000&r1=1503999&r2=1504000&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Jul 17 05:33:50 2013
@@ -34,7 +34,6 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
-import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
@@ -138,6 +137,7 @@ import org.apache.hadoop.io.MultipleIOEx
import org.apache.hadoop.util.StringUtils;
import org.cliffc.high_scale_lib.Counter;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -223,12 +223,13 @@ public class HRegion implements HeapSize
// Members
//////////////////////////////////////////////////////////////////////////////
- private final ConcurrentHashMap<HashedBytes, CountDownLatch> lockedRows =
- new ConcurrentHashMap<HashedBytes, CountDownLatch>();
- private final ConcurrentHashMap<Integer, HashedBytes> lockIds =
- new ConcurrentHashMap<Integer, HashedBytes>();
- private final AtomicInteger lockIdGenerator = new AtomicInteger(1);
- static private Random rand = new Random();
+ // map from a locked row to the context for that lock including:
+ // - CountDownLatch for threads waiting on that row
+ // - the thread that owns the lock (allow reentrancy)
+ // - reference count of (reentrant) locks held by the thread
+ // - the row itself
+ private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows =
+ new ConcurrentHashMap<HashedBytes, RowLockContext>();
protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(
Bytes.BYTES_RAWCOMPARATOR);
@@ -1780,7 +1781,7 @@ public class HRegion implements HeapSize
try {
delete.getRow();
// All edits for the given row (across all column families) must happen atomically.
- doBatchMutate(delete, null);
+ doBatchMutate(delete);
} finally {
closeRegionOperation();
}
@@ -1803,7 +1804,7 @@ public class HRegion implements HeapSize
delete.setFamilyMap(familyMap);
delete.setClusterId(clusterId);
delete.setDurability(durability);
- doBatchMutate(delete, null);
+ doBatchMutate(delete);
}
/**
@@ -1878,7 +1879,7 @@ public class HRegion implements HeapSize
this.writeRequestsCount.increment();
try {
// All edits for the given row (across all column families) must happen atomically.
- doBatchMutate(put, null);
+ doBatchMutate(put);
} finally {
closeRegionOperation();
}
@@ -1908,46 +1909,29 @@ public class HRegion implements HeapSize
}
/**
- * Perform a batch put with no pre-specified locks
- * @see HRegion#batchMutate(Pair[])
- */
- public OperationStatus[] put(Put[] puts) throws IOException {
- @SuppressWarnings("unchecked")
- Pair<Mutation, Integer> putsAndLocks[] = new Pair[puts.length];
-
- for (int i = 0; i < puts.length; i++) {
- putsAndLocks[i] = new Pair<Mutation, Integer>(puts[i], null);
- }
- return batchMutate(putsAndLocks);
- }
-
- /**
* Perform a batch of mutations.
* It supports only Put and Delete mutations and will ignore other types passed.
- * @param mutationsAndLocks
- * the list of mutations paired with their requested lock IDs.
+ * @param mutations the list of mutations
* @return an array of OperationStatus which internally contains the
* OperationStatusCode and the exceptionMessage if any.
* @throws IOException
*/
- public OperationStatus[] batchMutate(
- Pair<Mutation, Integer>[] mutationsAndLocks) throws IOException {
- return batchMutate(mutationsAndLocks, false);
+ public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
+ return batchMutate(mutations, false);
}
/**
* Perform a batch of mutations.
* It supports only Put and Delete mutations and will ignore other types passed.
- * @param mutationsAndLocks
- * the list of mutations paired with their requested lock IDs.
+ * @param mutations the list of mutations
* @return an array of OperationStatus which internally contains the
* OperationStatusCode and the exceptionMessage if any.
* @throws IOException
*/
- OperationStatus[] batchMutate(Pair<Mutation, Integer>[] mutationsAndLocks, boolean isReplay)
+ OperationStatus[] batchMutate(Mutation[] mutations, boolean isReplay)
throws IOException {
- BatchOperationInProgress<Pair<Mutation, Integer>> batchOp =
- new BatchOperationInProgress<Pair<Mutation,Integer>>(mutationsAndLocks);
+ BatchOperationInProgress<Mutation> batchOp =
+ new BatchOperationInProgress<Mutation>(mutations);
boolean initialized = false;
@@ -1985,14 +1969,13 @@ public class HRegion implements HeapSize
}
- private void doPreMutationHook(BatchOperationInProgress<Pair<Mutation, Integer>> batchOp)
+ private void doPreMutationHook(BatchOperationInProgress<Mutation> batchOp)
throws IOException {
/* Run coprocessor pre hook outside of locks to avoid deadlock */
WALEdit walEdit = new WALEdit();
if (coprocessorHost != null) {
for (int i = 0 ; i < batchOp.operations.length; i++) {
- Pair<Mutation, Integer> nextPair = batchOp.operations[i];
- Mutation m = nextPair.getFirst();
+ Mutation m = batchOp.operations[i];
if (m instanceof Put) {
if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
// pre hook says skip this Put
@@ -2021,7 +2004,7 @@ public class HRegion implements HeapSize
}
@SuppressWarnings("unchecked")
- private long doMiniBatchMutation(BatchOperationInProgress<Pair<Mutation, Integer>> batchOp,
+ private long doMiniBatchMutation(BatchOperationInProgress<Mutation> batchOp,
boolean isInReplay) throws IOException {
// variable to note if all Put items are for the same CF -- metrics related
@@ -2040,7 +2023,7 @@ public class HRegion implements HeapSize
boolean locked = false;
/** Keep track of the locks we hold so we can release them in finally clause */
- List<Integer> acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
+ List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
// reference family maps directly so coprocessors can mutate them if desired
Map<byte[], List<? extends Cell>>[] familyMaps = new Map[batchOp.operations.length];
// We try to set up a batch in the range [firstIndex,lastIndexExclusive)
@@ -2056,10 +2039,8 @@ public class HRegion implements HeapSize
int numReadyToWrite = 0;
long now = EnvironmentEdgeManager.currentTimeMillis();
while (lastIndexExclusive < batchOp.operations.length) {
- Pair<Mutation, Integer> nextPair = batchOp.operations[lastIndexExclusive];
- Mutation mutation = nextPair.getFirst();
+ Mutation mutation = batchOp.operations[lastIndexExclusive];
boolean isPutMutation = mutation instanceof Put;
- Integer providedLockId = nextPair.getSecond();
Map<byte[], List<? extends Cell>> familyMap = mutation.getFamilyMap();
// store the family map reference to allow for mutations
@@ -2097,25 +2078,25 @@ public class HRegion implements HeapSize
lastIndexExclusive++;
continue;
}
+
// If we haven't got any rows in our batch, we should block to
// get the next one.
boolean shouldBlock = numReadyToWrite == 0;
- Integer acquiredLockId = null;
+ RowLock rowLock = null;
try {
- acquiredLockId = getLock(providedLockId, mutation.getRow(),
- shouldBlock);
+ rowLock = getRowLock(mutation.getRow(), shouldBlock);
} catch (IOException ioe) {
LOG.warn("Failed getting lock in batch put, row="
- + Bytes.toStringBinary(mutation.getRow()), ioe);
+ + Bytes.toStringBinary(mutation.getRow()), ioe);
}
- if (acquiredLockId == null) {
+ if (rowLock == null) {
// We failed to grab another lock
assert !shouldBlock : "Should never fail to get lock when blocking";
break; // stop acquiring more rows for this batch
+ } else {
+ acquiredRowLocks.add(rowLock);
}
- if (providedLockId == null) {
- acquiredLocks.add(acquiredLockId);
- }
+
lastIndexExclusive++;
numReadyToWrite++;
@@ -2157,7 +2138,7 @@ public class HRegion implements HeapSize
if (batchOp.retCodeDetails[i].getOperationStatusCode()
!= OperationStatusCode.NOT_RUN) continue;
- Mutation mutation = batchOp.operations[i].getFirst();
+ Mutation mutation = batchOp.operations[i];
if (mutation instanceof Put) {
updateKVTimestamps(familyMaps[i].values(), byteNow);
noOfPuts++;
@@ -2178,8 +2159,8 @@ public class HRegion implements HeapSize
// calling the pre CP hook for batch mutation
if (!isInReplay && coprocessorHost != null) {
- MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp =
- new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations,
+ MiniBatchOperationInProgress<Mutation> miniBatchOp =
+ new MiniBatchOperationInProgress<Mutation>(batchOp.operations,
batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
}
@@ -2214,7 +2195,7 @@ public class HRegion implements HeapSize
}
batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
- Mutation m = batchOp.operations[i].getFirst();
+ Mutation m = batchOp.operations[i];
Durability tmpDur = getEffectiveDurability(m.getDurability());
if (tmpDur.ordinal() > durability.ordinal()) {
durability = tmpDur;
@@ -2237,10 +2218,10 @@ public class HRegion implements HeapSize
// -------------------------
// STEP 5. Append the edit to WAL. Do not sync wal.
// -------------------------
- Mutation first = batchOp.operations[firstIndex].getFirst();
+ Mutation mutation = batchOp.operations[firstIndex];
if (walEdit.size() > 0) {
txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(),
- walEdit, first.getClusterId(), now, this.htableDescriptor);
+ walEdit, mutation.getClusterId(), now, this.htableDescriptor);
}
// -------------------------------
@@ -2250,12 +2231,8 @@ public class HRegion implements HeapSize
this.updatesLock.readLock().unlock();
locked = false;
}
- if (acquiredLocks != null) {
- for (Integer toRelease : acquiredLocks) {
- releaseRowLock(toRelease);
- }
- acquiredLocks = null;
- }
+ releaseRowLocks(acquiredRowLocks);
+
// -------------------------
// STEP 7. Sync wal.
// -------------------------
@@ -2265,8 +2242,8 @@ public class HRegion implements HeapSize
walSyncSuccessful = true;
// calling the post CP hook for batch mutation
if (!isInReplay && coprocessorHost != null) {
- MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp =
- new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations,
+ MiniBatchOperationInProgress<Mutation> miniBatchOp =
+ new MiniBatchOperationInProgress<Mutation>(batchOp.operations,
batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
coprocessorHost.postBatchMutate(miniBatchOp);
}
@@ -2290,7 +2267,7 @@ public class HRegion implements HeapSize
!= OperationStatusCode.SUCCESS) {
continue;
}
- Mutation m = batchOp.operations[i].getFirst();
+ Mutation m = batchOp.operations[i];
if (m instanceof Put) {
coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
} else {
@@ -2312,12 +2289,7 @@ public class HRegion implements HeapSize
if (locked) {
this.updatesLock.readLock().unlock();
}
-
- if (acquiredLocks != null) {
- for (Integer toRelease : acquiredLocks) {
- releaseRowLock(toRelease);
- }
- }
+ releaseRowLocks(acquiredRowLocks);
// See if the column families were consistent through the whole thing.
// if they were then keep them. If they were not then pass a null.
@@ -2394,8 +2366,8 @@ public class HRegion implements HeapSize
checkFamily(family);
get.addColumn(family, qualifier);
- // Lock row
- Integer lid = getLock(null, get.getRow(), true);
+ // Lock row - note that doBatchMutate will relock this row if called
+ RowLock rowLock = getRowLock(get.getRow());
// wait for all previous transactions to complete (with lock held)
mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
List<KeyValue> result = null;
@@ -2441,27 +2413,23 @@ public class HRegion implements HeapSize
if (matches) {
// All edits for the given row (across all column families) must
// happen atomically.
- doBatchMutate((Mutation)w, lid);
+ doBatchMutate((Mutation)w);
this.checkAndMutateChecksPassed.increment();
return true;
}
this.checkAndMutateChecksFailed.increment();
return false;
} finally {
- releaseRowLock(lid);
+ rowLock.release();
}
} finally {
closeRegionOperation();
}
}
- @SuppressWarnings("unchecked")
- private void doBatchMutate(Mutation mutation, Integer lid) throws IOException,
+ private void doBatchMutate(Mutation mutation) throws IOException,
org.apache.hadoop.hbase.exceptions.DoNotRetryIOException {
- Pair<Mutation, Integer>[] mutateWithLocks = new Pair[] {
- new Pair<Mutation, Integer>(mutation, lid)
- };
- OperationStatus[] batchMutate = this.batchMutate(mutateWithLocks);
+ OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation });
if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
} else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
@@ -2637,7 +2605,7 @@ public class HRegion implements HeapSize
Put p = new Put(row);
p.setFamilyMap(familyMap);
p.setClusterId(HConstants.DEFAULT_CLUSTER_ID);
- doBatchMutate(p, null);
+ doBatchMutate(p);
}
/**
@@ -2688,7 +2656,7 @@ public class HRegion implements HeapSize
* called when a Put/Delete has updated memstore but subequently fails to update
* the wal. This method is then invoked to rollback the memstore.
*/
- private void rollbackMemstore(BatchOperationInProgress<Pair<Mutation, Integer>> batchOp,
+ private void rollbackMemstore(BatchOperationInProgress<Mutation> batchOp,
Map<byte[], List<? extends Cell>>[] familyMaps,
int start, int end) {
int kvsRolledback = 0;
@@ -3198,138 +3166,76 @@ public class HRegion implements HeapSize
}
/**
- * Obtain a lock on the given row. Blocks until success.
- *
- * I know it's strange to have two mappings:
- * <pre>
- * ROWS ==> LOCKS
- * </pre>
- * as well as
- * <pre>
- * LOCKS ==> ROWS
- * </pre>
- * <p>It would be more memory-efficient to just have one mapping;
- * maybe we'll do that in the future.
- *
- * @param row Name of row to lock.
- * @throws IOException
- * @return The id of the held lock.
- */
- public Integer obtainRowLock(final byte [] row) throws IOException {
- startRegionOperation();
- this.writeRequestsCount.increment();
- try {
- return internalObtainRowLock(row, true);
- } finally {
- closeRegionOperation();
- }
- }
-
- /**
- * Obtains or tries to obtain the given row lock.
+ * Tries to acquire a lock on the given row.
* @param waitForLock if true, will block until the lock is available.
* Otherwise, just tries to obtain the lock and returns
- * null if unavailable.
+ * false if unavailable.
+ * @return the row lock if acquired,
+ * null if waitForLock was false and the lock was not acquired
+ * @throws IOException if waitForLock was true and the lock could not be acquired after waiting
*/
- private Integer internalObtainRowLock(final byte[] row, boolean waitForLock)
- throws IOException {
+ public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException {
checkRow(row, "row lock");
startRegionOperation();
try {
HashedBytes rowKey = new HashedBytes(row);
- CountDownLatch rowLatch = new CountDownLatch(1);
+ RowLockContext rowLockContext = new RowLockContext(rowKey);
// loop until we acquire the row lock (unless !waitForLock)
while (true) {
- CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch);
- if (existingLatch == null) {
+ RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
+ if (existingContext == null) {
+ // Row is not already locked by any thread, use newly created context.
+ break;
+ } else if (existingContext.ownedByCurrentThread()) {
+ // Row is already locked by current thread, reuse existing context instead.
+ rowLockContext = existingContext;
break;
} else {
- // row already locked
+ // Row is already locked by some other thread, give up or wait for it
if (!waitForLock) {
return null;
}
try {
- if (!existingLatch.await(this.rowLockWaitDuration,
- TimeUnit.MILLISECONDS)) {
- throw new IOException("Timed out on getting lock for row="
- + Bytes.toStringBinary(row));
+ if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
+ throw new IOException("Timed out waiting for lock for row: " + rowKey);
}
} catch (InterruptedException ie) {
- LOG.warn("internalObtainRowLock interrupted for row=" + Bytes.toStringBinary(row));
+ LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
InterruptedIOException iie = new InterruptedIOException();
iie.initCause(ie);
throw iie;
}
}
}
-
- // loop until we generate an unused lock id
- while (true) {
- Integer lockId = lockIdGenerator.incrementAndGet();
- HashedBytes existingRowKey = lockIds.putIfAbsent(lockId, rowKey);
- if (existingRowKey == null) {
- return lockId;
- } else {
- // lockId already in use, jump generator to a new spot
- lockIdGenerator.set(rand.nextInt());
- }
- }
+
+ // allocate new lock for this thread
+ return rowLockContext.newLock();
} finally {
closeRegionOperation();
}
}
/**
- * Release the row lock!
- * @param lockId The lock ID to release.
- */
- public void releaseRowLock(final Integer lockId) {
- if (lockId == null) return; // null lock id, do nothing
- HashedBytes rowKey = lockIds.remove(lockId);
- if (rowKey == null) {
- LOG.warn("Release unknown lockId: " + lockId);
- return;
- }
- CountDownLatch rowLatch = lockedRows.remove(rowKey);
- if (rowLatch == null) {
- LOG.error("Releases row not locked, lockId: " + lockId + " row: "
- + rowKey);
- return;
- }
- rowLatch.countDown();
- }
-
- /**
- * See if row is currently locked.
- * @param lockId
- * @return boolean
+ * Acqures a lock on the given row.
+ * The same thread may acquire multiple locks on the same row.
+ * @return the acquired row lock
+ * @throws IOException if the lock could not be acquired after waiting
*/
- boolean isRowLocked(final Integer lockId) {
- return lockIds.containsKey(lockId);
+ public RowLock getRowLock(byte[] row) throws IOException {
+ return getRowLock(row, true);
}
/**
- * Returns existing row lock if found, otherwise
- * obtains a new row lock and returns it.
- * @param lockid requested by the user, or null if the user didn't already hold lock
- * @param row the row to lock
- * @param waitForLock if true, will block until the lock is available, otherwise will
- * simply return null if it could not acquire the lock.
- * @return lockid or null if waitForLock is false and the lock was unavailable.
+ * If the given list of row locks is not null, releases all locks.
*/
- public Integer getLock(Integer lockid, byte [] row, boolean waitForLock)
- throws IOException {
- Integer lid = null;
- if (lockid == null) {
- lid = internalObtainRowLock(row, waitForLock);
- } else {
- if (!isRowLocked(lockid)) {
- throw new IOException("Invalid row lock");
+ public void releaseRowLocks(List<RowLock> rowLocks) {
+ if (rowLocks != null) {
+ for (RowLock rowLock : rowLocks) {
+ rowLock.release();
}
- lid = lockid;
+ rowLocks.clear();
}
- return lid;
}
/**
@@ -4599,24 +4505,19 @@ public class HRegion implements HeapSize
MultiVersionConsistencyControl.WriteEntry writeEntry = null;
boolean locked = false;
boolean walSyncSuccessful = false;
- List<Integer> acquiredLocks = null;
+ List<RowLock> acquiredRowLocks = null;
long addedSize = 0;
List<KeyValue> mutations = new ArrayList<KeyValue>();
Collection<byte[]> rowsToLock = processor.getRowsToLock();
try {
// 2. Acquire the row lock(s)
- acquiredLocks = new ArrayList<Integer>(rowsToLock.size());
+ acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
for (byte[] row : rowsToLock) {
- // Attempt to lock all involved rows, fail if one lock times out
- Integer lid = getLock(null, row, true);
- if (lid == null) {
- throw new IOException("Failed to acquire lock on "
- + Bytes.toStringBinary(row));
- }
- acquiredLocks.add(lid);
+ // Attempt to lock all involved rows, throw if any lock times out
+ acquiredRowLocks.add(getRowLock(row));
}
// 3. Region lock
- lock(this.updatesLock.readLock(), acquiredLocks.size());
+ lock(this.updatesLock.readLock(), acquiredRowLocks.size());
locked = true;
long now = EnvironmentEdgeManager.currentTimeMillis();
@@ -4651,12 +4552,8 @@ public class HRegion implements HeapSize
}
// 9. Release row lock(s)
- if (acquiredLocks != null) {
- for (Integer lid : acquiredLocks) {
- releaseRowLock(lid);
- }
- acquiredLocks = null;
- }
+ releaseRowLocks(acquiredRowLocks);
+
// 10. Sync edit log
if (txid != 0) {
syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
@@ -4681,12 +4578,8 @@ public class HRegion implements HeapSize
this.updatesLock.readLock().unlock();
locked = false;
}
- if (acquiredLocks != null) {
- for (Integer lid : acquiredLocks) {
- releaseRowLock(lid);
- }
- }
-
+ // release locks if some were acquired but another timed out
+ releaseRowLocks(acquiredRowLocks);
}
// 12. Run post-process hook
@@ -4781,125 +4674,129 @@ public class HRegion implements HeapSize
startRegionOperation(Operation.APPEND);
this.writeRequestsCount.increment();
WriteEntry w = null;
+ RowLock rowLock = null;
try {
- Integer lid = getLock(null, row, true);
- lock(this.updatesLock.readLock());
- // wait for all prior MVCC transactions to finish - while we hold the row lock
- // (so that we are guaranteed to see the latest state)
- mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
- // now start my own transaction
- w = mvcc.beginMemstoreInsert();
+ rowLock = getRowLock(row);
try {
- long now = EnvironmentEdgeManager.currentTimeMillis();
- // Process each family
- for (Map.Entry<byte[], List<? extends Cell>> family : append.getFamilyMap().entrySet()) {
-
- Store store = stores.get(family.getKey());
- List<KeyValue> kvs = new ArrayList<KeyValue>(family.getValue().size());
-
- // Get previous values for all columns in this family
- Get get = new Get(row);
- for (Cell cell : family.getValue()) {
- KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- get.addColumn(family.getKey(), kv.getQualifier());
- }
- List<KeyValue> results = get(get, false);
-
- // Iterate the input columns and update existing values if they were
- // found, otherwise add new column initialized to the append value
-
- // Avoid as much copying as possible. Every byte is copied at most
- // once.
- // Would be nice if KeyValue had scatter/gather logic
- int idx = 0;
- for (Cell cell : family.getValue()) {
- KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- KeyValue newKV;
- if (idx < results.size()
- && results.get(idx).matchingQualifier(kv.getBuffer(),
- kv.getQualifierOffset(), kv.getQualifierLength())) {
- KeyValue oldKv = results.get(idx);
- // allocate an empty kv once
- newKV = new KeyValue(row.length, kv.getFamilyLength(),
- kv.getQualifierLength(), now, KeyValue.Type.Put,
- oldKv.getValueLength() + kv.getValueLength());
- // copy in the value
- System.arraycopy(oldKv.getBuffer(), oldKv.getValueOffset(),
- newKV.getBuffer(), newKV.getValueOffset(),
- oldKv.getValueLength());
- System.arraycopy(kv.getBuffer(), kv.getValueOffset(),
- newKV.getBuffer(),
- newKV.getValueOffset() + oldKv.getValueLength(),
- kv.getValueLength());
- idx++;
- } else {
- // allocate an empty kv once
- newKV = new KeyValue(row.length, kv.getFamilyLength(),
- kv.getQualifierLength(), now, KeyValue.Type.Put,
- kv.getValueLength());
- // copy in the value
- System.arraycopy(kv.getBuffer(), kv.getValueOffset(),
- newKV.getBuffer(), newKV.getValueOffset(),
- kv.getValueLength());
+ lock(this.updatesLock.readLock());
+ // wait for all prior MVCC transactions to finish - while we hold the row lock
+ // (so that we are guaranteed to see the latest state)
+ mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
+ // now start my own transaction
+ w = mvcc.beginMemstoreInsert();
+ try {
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ // Process each family
+ for (Map.Entry<byte[], List<? extends Cell>> family : append.getFamilyMap().entrySet()) {
+
+ Store store = stores.get(family.getKey());
+ List<KeyValue> kvs = new ArrayList<KeyValue>(family.getValue().size());
+
+ // Get previous values for all columns in this family
+ Get get = new Get(row);
+ for (Cell cell : family.getValue()) {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+ get.addColumn(family.getKey(), kv.getQualifier());
}
- // copy in row, family, and qualifier
- System.arraycopy(kv.getBuffer(), kv.getRowOffset(),
- newKV.getBuffer(), newKV.getRowOffset(), kv.getRowLength());
- System.arraycopy(kv.getBuffer(), kv.getFamilyOffset(),
- newKV.getBuffer(), newKV.getFamilyOffset(),
- kv.getFamilyLength());
- System.arraycopy(kv.getBuffer(), kv.getQualifierOffset(),
- newKV.getBuffer(), newKV.getQualifierOffset(),
- kv.getQualifierLength());
-
- newKV.setMemstoreTS(w.getWriteNumber());
- kvs.add(newKV);
-
- // Append update to WAL
- if (writeToWAL) {
- if (walEdits == null) {
- walEdits = new WALEdit();
+ List<KeyValue> results = get(get, false);
+
+ // Iterate the input columns and update existing values if they were
+ // found, otherwise add new column initialized to the append value
+
+ // Avoid as much copying as possible. Every byte is copied at most
+ // once.
+ // Would be nice if KeyValue had scatter/gather logic
+ int idx = 0;
+ for (Cell cell : family.getValue()) {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+ KeyValue newKV;
+ if (idx < results.size()
+ && results.get(idx).matchingQualifier(kv.getBuffer(),
+ kv.getQualifierOffset(), kv.getQualifierLength())) {
+ KeyValue oldKv = results.get(idx);
+ // allocate an empty kv once
+ newKV = new KeyValue(row.length, kv.getFamilyLength(),
+ kv.getQualifierLength(), now, KeyValue.Type.Put,
+ oldKv.getValueLength() + kv.getValueLength());
+ // copy in the value
+ System.arraycopy(oldKv.getBuffer(), oldKv.getValueOffset(),
+ newKV.getBuffer(), newKV.getValueOffset(),
+ oldKv.getValueLength());
+ System.arraycopy(kv.getBuffer(), kv.getValueOffset(),
+ newKV.getBuffer(),
+ newKV.getValueOffset() + oldKv.getValueLength(),
+ kv.getValueLength());
+ idx++;
+ } else {
+ // allocate an empty kv once
+ newKV = new KeyValue(row.length, kv.getFamilyLength(),
+ kv.getQualifierLength(), now, KeyValue.Type.Put,
+ kv.getValueLength());
+ // copy in the value
+ System.arraycopy(kv.getBuffer(), kv.getValueOffset(),
+ newKV.getBuffer(), newKV.getValueOffset(),
+ kv.getValueLength());
+ }
+ // copy in row, family, and qualifier
+ System.arraycopy(kv.getBuffer(), kv.getRowOffset(),
+ newKV.getBuffer(), newKV.getRowOffset(), kv.getRowLength());
+ System.arraycopy(kv.getBuffer(), kv.getFamilyOffset(),
+ newKV.getBuffer(), newKV.getFamilyOffset(),
+ kv.getFamilyLength());
+ System.arraycopy(kv.getBuffer(), kv.getQualifierOffset(),
+ newKV.getBuffer(), newKV.getQualifierOffset(),
+ kv.getQualifierLength());
+
+ newKV.setMemstoreTS(w.getWriteNumber());
+ kvs.add(newKV);
+
+ // Append update to WAL
+ if (writeToWAL) {
+ if (walEdits == null) {
+ walEdits = new WALEdit();
+ }
+ walEdits.add(newKV);
}
- walEdits.add(newKV);
}
- }
-
- //store the kvs to the temporary memstore before writing HLog
- tempMemstore.put(store, kvs);
- }
-
- // Actually write to WAL now
- if (writeToWAL) {
- // Using default cluster id, as this can only happen in the orginating
- // cluster. A slave cluster receives the final value (not the delta)
- // as a Put.
- txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(),
- walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(),
- this.htableDescriptor);
- } else {
- recordMutationWithoutWal(append.getFamilyMap());
- }
-
- //Actually write to Memstore now
- for (Map.Entry<Store, List<KeyValue>> entry : tempMemstore.entrySet()) {
- Store store = entry.getKey();
- if (store.getFamily().getMaxVersions() == 1) {
- // upsert if VERSIONS for this CF == 1
- size += store.upsert(entry.getValue(), getSmallestReadPoint());
+
+ //store the kvs to the temporary memstore before writing HLog
+ tempMemstore.put(store, kvs);
+ }
+
+ // Actually write to WAL now
+ if (writeToWAL) {
+ // Using default cluster id, as this can only happen in the orginating
+ // cluster. A slave cluster receives the final value (not the delta)
+ // as a Put.
+ txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(),
+ walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(),
+ this.htableDescriptor);
} else {
- // otherwise keep older versions around
- for (Cell cell: entry.getValue()) {
- KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- size += store.add(kv);
+ recordMutationWithoutWal(append.getFamilyMap());
+ }
+
+ //Actually write to Memstore now
+ for (Map.Entry<Store, List<KeyValue>> entry : tempMemstore.entrySet()) {
+ Store store = entry.getKey();
+ if (store.getFamily().getMaxVersions() == 1) {
+ // upsert if VERSIONS for this CF == 1
+ size += store.upsert(entry.getValue(), getSmallestReadPoint());
+ } else {
+ // otherwise keep older versions around
+ for (Cell cell: entry.getValue()) {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+ size += store.add(kv);
+ }
}
+ allKVs.addAll(entry.getValue());
}
- allKVs.addAll(entry.getValue());
+ size = this.addAndGetGlobalMemstoreSize(size);
+ flush = isFlushSize(size);
+ } finally {
+ this.updatesLock.readLock().unlock();
}
- size = this.addAndGetGlobalMemstoreSize(size);
- flush = isFlushSize(size);
} finally {
- this.updatesLock.readLock().unlock();
- releaseRowLock(lid);
+ rowLock.release();
}
if (writeToWAL) {
// sync the transaction log outside the rowlock
@@ -4952,100 +4849,103 @@ public class HRegion implements HeapSize
this.writeRequestsCount.increment();
WriteEntry w = null;
try {
- Integer lid = getLock(null, row, true);
- lock(this.updatesLock.readLock());
- // wait for all prior MVCC transactions to finish - while we hold the row lock
- // (so that we are guaranteed to see the latest state)
- mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
- // now start my own transaction
- w = mvcc.beginMemstoreInsert();
+ RowLock rowLock = getRowLock(row);
try {
- long now = EnvironmentEdgeManager.currentTimeMillis();
- // Process each family
- for (Map.Entry<byte [], List<? extends Cell>> family:
- increment.getFamilyMap().entrySet()) {
-
- Store store = stores.get(family.getKey());
- List<KeyValue> kvs = new ArrayList<KeyValue>(family.getValue().size());
-
- // Get previous values for all columns in this family
- Get get = new Get(row);
- for (Cell cell: family.getValue()) {
- KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- get.addColumn(family.getKey(), kv.getQualifier());
- }
- get.setTimeRange(tr.getMin(), tr.getMax());
- List<KeyValue> results = get(get, false);
-
- // Iterate the input columns and update existing values if they were
- // found, otherwise add new column initialized to the increment amount
- int idx = 0;
- for (Cell cell: family.getValue()) {
- KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- long amount = Bytes.toLong(kv.getValue());
- byte [] qualifier = kv.getQualifier();
- if (idx < results.size() && results.get(idx).matchingQualifier(qualifier)) {
- kv = results.get(idx);
- if(kv.getValueLength() == Bytes.SIZEOF_LONG) {
- amount += Bytes.toLong(kv.getBuffer(), kv.getValueOffset(), Bytes.SIZEOF_LONG);
- } else {
- // throw DoNotRetryIOException instead of IllegalArgumentException
- throw new org.apache.hadoop.hbase.exceptions.DoNotRetryIOException(
- "Attempted to increment field that isn't 64 bits wide");
- }
- idx++;
+ lock(this.updatesLock.readLock());
+ // wait for all prior MVCC transactions to finish - while we hold the row lock
+ // (so that we are guaranteed to see the latest state)
+ mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
+ // now start my own transaction
+ w = mvcc.beginMemstoreInsert();
+ try {
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ // Process each family
+ for (Map.Entry<byte [], List<? extends Cell>> family:
+ increment.getFamilyMap().entrySet()) {
+
+ Store store = stores.get(family.getKey());
+ List<KeyValue> kvs = new ArrayList<KeyValue>(family.getValue().size());
+
+ // Get previous values for all columns in this family
+ Get get = new Get(row);
+ for (Cell cell: family.getValue()) {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+ get.addColumn(family.getKey(), kv.getQualifier());
}
-
- // Append new incremented KeyValue to list
- KeyValue newKV =
- new KeyValue(row, family.getKey(), qualifier, now, Bytes.toBytes(amount));
- newKV.setMemstoreTS(w.getWriteNumber());
- kvs.add(newKV);
-
- // Prepare WAL updates
- if (writeToWAL) {
- if (walEdits == null) {
- walEdits = new WALEdit();
+ get.setTimeRange(tr.getMin(), tr.getMax());
+ List<KeyValue> results = get(get, false);
+
+ // Iterate the input columns and update existing values if they were
+ // found, otherwise add new column initialized to the increment amount
+ int idx = 0;
+ for (Cell cell: family.getValue()) {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+ long amount = Bytes.toLong(kv.getValue());
+ byte [] qualifier = kv.getQualifier();
+ if (idx < results.size() && results.get(idx).matchingQualifier(qualifier)) {
+ kv = results.get(idx);
+ if(kv.getValueLength() == Bytes.SIZEOF_LONG) {
+ amount += Bytes.toLong(kv.getBuffer(), kv.getValueOffset(), Bytes.SIZEOF_LONG);
+ } else {
+ // throw DoNotRetryIOException instead of IllegalArgumentException
+ throw new org.apache.hadoop.hbase.exceptions.DoNotRetryIOException(
+ "Attempted to increment field that isn't 64 bits wide");
+ }
+ idx++;
+ }
+
+ // Append new incremented KeyValue to list
+ KeyValue newKV =
+ new KeyValue(row, family.getKey(), qualifier, now, Bytes.toBytes(amount));
+ newKV.setMemstoreTS(w.getWriteNumber());
+ kvs.add(newKV);
+
+ // Prepare WAL updates
+ if (writeToWAL) {
+ if (walEdits == null) {
+ walEdits = new WALEdit();
+ }
+ walEdits.add(newKV);
}
- walEdits.add(newKV);
}
- }
-
- //store the kvs to the temporary memstore before writing HLog
- tempMemstore.put(store, kvs);
- }
-
- // Actually write to WAL now
- if (writeToWAL) {
- // Using default cluster id, as this can only happen in the orginating
- // cluster. A slave cluster receives the final value (not the delta)
- // as a Put.
- txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(),
- walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(),
- this.htableDescriptor);
- } else {
- recordMutationWithoutWal(increment.getFamilyMap());
- }
- //Actually write to Memstore now
- for (Map.Entry<Store, List<KeyValue>> entry : tempMemstore.entrySet()) {
- Store store = entry.getKey();
- if (store.getFamily().getMaxVersions() == 1) {
- // upsert if VERSIONS for this CF == 1
- size += store.upsert(entry.getValue(), getSmallestReadPoint());
+
+ //store the kvs to the temporary memstore before writing HLog
+ tempMemstore.put(store, kvs);
+ }
+
+ // Actually write to WAL now
+ if (writeToWAL) {
+ // Using default cluster id, as this can only happen in the orginating
+ // cluster. A slave cluster receives the final value (not the delta)
+ // as a Put.
+ txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(),
+ walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(),
+ this.htableDescriptor);
} else {
- // otherwise keep older versions around
- for (Cell cell : entry.getValue()) {
- KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- size += store.add(kv);
+ recordMutationWithoutWal(increment.getFamilyMap());
+ }
+ //Actually write to Memstore now
+ for (Map.Entry<Store, List<KeyValue>> entry : tempMemstore.entrySet()) {
+ Store store = entry.getKey();
+ if (store.getFamily().getMaxVersions() == 1) {
+ // upsert if VERSIONS for this CF == 1
+ size += store.upsert(entry.getValue(), getSmallestReadPoint());
+ } else {
+ // otherwise keep older versions around
+ for (Cell cell : entry.getValue()) {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+ size += store.add(kv);
+ }
}
+ allKVs.addAll(entry.getValue());
}
- allKVs.addAll(entry.getValue());
+ size = this.addAndGetGlobalMemstoreSize(size);
+ flush = isFlushSize(size);
+ } finally {
+ this.updatesLock.readLock().unlock();
}
- size = this.addAndGetGlobalMemstoreSize(size);
- flush = isFlushSize(size);
} finally {
- this.updatesLock.readLock().unlock();
- releaseRowLock(lid);
+ rowLock.release();
}
if (writeToWAL) {
// sync the transaction log outside the rowlock
@@ -5085,22 +4985,32 @@ public class HRegion implements HeapSize
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
- 39 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
- (12 * Bytes.SIZEOF_LONG) +
- 2 * Bytes.SIZEOF_BOOLEAN);
-
+ 38 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
+ (11 * Bytes.SIZEOF_LONG) +
+ 4 * Bytes.SIZEOF_BOOLEAN);
+
+ // woefully out of date - currently missing:
+ // 1 x HashMap - coprocessorServiceHandlers
+ // 6 org.cliffc.high_scale_lib.Counter - numMutationsWithoutWAL, dataInMemoryWithoutWAL,
+ // checkAndMutateChecksPassed, checkAndMutateChecksFailed, readRequestsCount,
+ // writeRequestsCount, updatesBlockedMs
+ // 1 x HRegion$WriteState - writestate
+ // 1 x RegionCoprocessorHost - coprocessorHost
+ // 1 x RegionSplitPolicy - splitPolicy
+ // 1 x MetricsRegion - metricsRegion
+ // 1 x MetricsRegionWrapperImpl - metricsRegionWrapper
public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
ClassSize.OBJECT + // closeLock
(2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing
(3 * ClassSize.ATOMIC_LONG) + // memStoreSize, numPutsWithoutWAL, dataInMemoryWithoutWAL
- ClassSize.ATOMIC_INTEGER + // lockIdGenerator
- (3 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, lockIds, scannerReadPoints
+ (2 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, scannerReadPoints
WriteState.HEAP_SIZE + // writestate
ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
(2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
ClassSize.ARRAYLIST + // recentFlushes
MultiVersionConsistencyControl.FIXED_SIZE // mvcc
+ ClassSize.TREEMAP // maxSeqIdInStores
+ + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
;
@Override
@@ -5109,7 +5019,7 @@ public class HRegion implements HeapSize
for (Store store : this.stores.values()) {
heapSize += store.heapSize();
}
- // this does not take into account row locks, recent flushes, mvcc entries
+ // this does not take into account row locks, recent flushes, mvcc entries, and more
return heapSize;
}
@@ -5673,4 +5583,68 @@ public class HRegion implements HeapSize
*/
void failedBulkLoad(byte[] family, String srcPath) throws IOException;
}
+
+ @VisibleForTesting class RowLockContext {
+ private final HashedBytes row;
+ private final CountDownLatch latch = new CountDownLatch(1);
+ private final Thread thread;
+ private int lockCount = 0;
+
+ RowLockContext(HashedBytes row) {
+ this.row = row;
+ this.thread = Thread.currentThread();
+ }
+
+ boolean ownedByCurrentThread() {
+ return thread == Thread.currentThread();
+ }
+
+ RowLock newLock() {
+ lockCount++;
+ return new RowLock(this);
+ }
+
+ void releaseLock() {
+ if (!ownedByCurrentThread()) {
+ throw new IllegalArgumentException("Lock held by thread: " + thread
+ + " cannot be released by different thread: " + Thread.currentThread());
+ }
+ lockCount--;
+ if (lockCount == 0) {
+ // no remaining locks by the thread, unlock and allow other threads to access
+ RowLockContext existingContext = lockedRows.remove(row);
+ if (existingContext != this) {
+ throw new RuntimeException(
+ "Internal row lock state inconsistent, should not happen, row: " + row);
+ }
+ latch.countDown();
+ }
+ }
+ }
+
+ /**
+ * Row lock held by a given thread.
+ * One thread may acquire multiple locks on the same row simultaneously.
+ * The locks must be released by calling release() from the same thread.
+ */
+ public class RowLock {
+ @VisibleForTesting final RowLockContext context;
+ private boolean released = false;
+
+ @VisibleForTesting RowLock(RowLockContext context) {
+ this.context = context;
+ }
+
+ /**
+ * Release the given lock. If there are no remaining locks held by the current thread
+ * then unlock the row and allow other threads to acquire the lock.
+ * @throws IllegalArgumentException if called by a different thread than the lock owning thread
+ */
+ public void release() {
+ if (!released) {
+ context.releaseLock();
+ released = true;
+ }
+ }
+ }
}
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1504000&r1=1503999&r2=1504000&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Jul 17 05:33:50 2013
@@ -3956,8 +3956,7 @@ public class HRegionServer implements Cl
*/
protected void doBatchOp(final MultiResponse.Builder builder, final HRegion region,
final List<MutationProto> mutations, final CellScanner cells, boolean isReplay) {
- @SuppressWarnings("unchecked")
- Pair<Mutation, Integer>[] mutationsWithLocks = new Pair[mutations.size()];
+ Mutation[] mArray = new Mutation[mutations.size()];
long before = EnvironmentEdgeManager.currentTimeMillis();
boolean batchContainsPuts = false, batchContainsDelete = false;
try {
@@ -3974,7 +3973,7 @@ public class HRegionServer implements Cl
mutation = ProtobufUtil.toDelete(m, cells);
batchContainsDelete = true;
}
- mutationsWithLocks[i++] = new Pair<Mutation, Integer>(mutation, null);
+ mArray[i++] = mutation;
builder.addResult(result);
}
@@ -3983,7 +3982,7 @@ public class HRegionServer implements Cl
cacheFlusher.reclaimMemStoreMemory();
}
- OperationStatus codes[] = region.batchMutate(mutationsWithLocks, isReplay);
+ OperationStatus codes[] = region.batchMutate(mArray);
for (i = 0; i < codes.length; i++) {
switch (codes[i].getOperationStatusCode()) {
case BAD_FAMILY:
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1504000&r1=1503999&r2=1504000&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java Wed Jul 17 05:33:50 2013
@@ -993,7 +993,7 @@ public class RegionCoprocessorHost
* @throws IOException
*/
public boolean preBatchMutate(
- final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
+ final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
boolean bypass = false;
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env : coprocessors) {
@@ -1018,7 +1018,7 @@ public class RegionCoprocessorHost
* @throws IOException
*/
public void postBatchMutate(
- final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
+ final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env : coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java?rev=1504000&r1=1503999&r2=1504000&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java Wed Jul 17 05:33:50 2013
@@ -1061,7 +1061,7 @@ public class HBaseFsck extends Configure
"You may need to restore the previously sidelined .META.");
return false;
}
- meta.put(puts.toArray(new Put[0]));
+ meta.batchMutate(puts.toArray(new Put[0]));
HRegion.closeHRegion(meta);
LOG.info("Success! .META. table rebuilt.");
LOG.info("Old .META. is moved into " + backupDir);
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java?rev=1504000&r1=1503999&r2=1504000&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java Wed Jul 17 05:33:50 2013
@@ -407,7 +407,7 @@ public class SimpleRegionObserver extend
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
- MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
+ MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
RegionCoprocessorEnvironment e = c.getEnvironment();
assertNotNull(e);
assertNotNull(e.getRegion());
@@ -417,7 +417,7 @@ public class SimpleRegionObserver extend
@Override
public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
- final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
+ final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
RegionCoprocessorEnvironment e = c.getEnvironment();
assertNotNull(e);
assertNotNull(e.getRegion());
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java?rev=1504000&r1=1503999&r2=1504000&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java Wed Jul 17 05:33:50 2013
@@ -59,11 +59,8 @@ import org.apache.hadoop.hbase.io.HeapSi
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
-import org.apache.hadoop.hbase.util.Pair;
import org.junit.experimental.categories.Category;
-import com.google.common.collect.Lists;
-
/**
* Testing of HRegion.incrementColumnValue, HRegion.increment,
@@ -528,16 +525,12 @@ public class TestAtomicOperation extends
final MockHRegion region = (MockHRegion) TestHRegion.initHRegion(
Bytes.toBytes(tableName), tableName, conf, Bytes.toBytes(family));
- List<Pair<Mutation, Integer>> putsAndLocks = Lists.newArrayList();
Put[] puts = new Put[1];
Put put = new Put(Bytes.toBytes("r1"));
put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10"));
puts[0] = put;
- Pair<Mutation, Integer> pair = new Pair<Mutation, Integer>(puts[0], null);
-
- putsAndLocks.add(pair);
-
- region.batchMutate(putsAndLocks.toArray(new Pair[0]));
+
+ region.batchMutate(puts);
MultithreadedTestUtil.TestContext ctx =
new MultithreadedTestUtil.TestContext(conf);
ctx.addThread(new PutThread(ctx, region));
@@ -565,15 +558,12 @@ public class TestAtomicOperation extends
}
public void doWork() throws Exception {
- List<Pair<Mutation, Integer>> putsAndLocks = Lists.newArrayList();
Put[] puts = new Put[1];
Put put = new Put(Bytes.toBytes("r1"));
put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50"));
puts[0] = put;
- Pair<Mutation, Integer> pair = new Pair<Mutation, Integer>(puts[0], null);
- putsAndLocks.add(pair);
testStep = TestStep.PUT_STARTED;
- region.batchMutate(putsAndLocks.toArray(new Pair[0]));
+ region.batchMutate(puts);
}
}
@@ -607,43 +597,50 @@ public class TestAtomicOperation extends
}
@Override
- public void releaseRowLock(Integer lockId) {
- if (testStep == TestStep.INIT) {
- super.releaseRowLock(lockId);
- return;
+ public RowLock getRowLock(final byte[] row, boolean waitForLock) throws IOException {
+ if (testStep == TestStep.CHECKANDPUT_STARTED) {
+ latch.countDown();
}
+ return new WrappedRowLock(super.getRowLock(row, waitForLock));
+ }
+
+ public class WrappedRowLock extends RowLock {
- if (testStep == TestStep.PUT_STARTED) {
- try {
- testStep = TestStep.PUT_COMPLETED;
- super.releaseRowLock(lockId);
- // put has been written to the memstore and the row lock has been released, but the
- // MVCC has not been advanced. Prior to fixing HBASE-7051, the following order of
- // operations would cause the non-atomicity to show up:
- // 1) Put releases row lock (where we are now)
- // 2) CheckAndPut grabs row lock and reads the value prior to the put (10)
- // because the MVCC has not advanced
- // 3) Put advances MVCC
- // So, in order to recreate this order, we wait for the checkAndPut to grab the rowLock
- // (see below), and then wait some more to give the checkAndPut time to read the old
- // value.
- latch.await();
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- else if (testStep == TestStep.CHECKANDPUT_STARTED) {
- super.releaseRowLock(lockId);
+ private WrappedRowLock(RowLock rowLock) {
+ super(rowLock.context);
}
- }
- @Override
- public Integer getLock(Integer lockid, byte[] row, boolean waitForLock) throws IOException {
- if (testStep == TestStep.CHECKANDPUT_STARTED) {
- latch.countDown();
+ @Override
+ public void release() {
+ if (testStep == TestStep.INIT) {
+ super.release();
+ return;
+ }
+
+ if (testStep == TestStep.PUT_STARTED) {
+ try {
+ testStep = TestStep.PUT_COMPLETED;
+ super.release();
+ // put has been written to the memstore and the row lock has been released, but the
+ // MVCC has not been advanced. Prior to fixing HBASE-7051, the following order of
+ // operations would cause the non-atomicity to show up:
+ // 1) Put releases row lock (where we are now)
+ // 2) CheckAndPut grabs row lock and reads the value prior to the put (10)
+ // because the MVCC has not advanced
+ // 3) Put advances MVCC
+ // So, in order to recreate this order, we wait for the checkAndPut to grab the rowLock
+ // (see below), and then wait some more to give the checkAndPut time to read the old
+ // value.
+ latch.await();
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ else if (testStep == TestStep.CHECKANDPUT_STARTED) {
+ super.release();
+ }
}
- return super.getLock(lockid, row, waitForLock);
}
}
}
\ No newline at end of file
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1504000&r1=1503999&r2=1504000&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Wed Jul 17 05:33:50 2013
@@ -72,7 +72,6 @@ import org.apache.hadoop.hbase.client.Du
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
@@ -96,6 +95,7 @@ import org.apache.hadoop.hbase.monitorin
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
+import org.apache.hadoop.hbase.regionserver.HRegion.RowLock;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
@@ -107,7 +107,6 @@ import org.apache.hadoop.hbase.util.Byte
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Assert;
@@ -764,7 +763,6 @@ public class TestHRegion extends HBaseTe
}
}
- @SuppressWarnings("unchecked")
public void testBatchPut() throws Exception {
byte[] b = Bytes.toBytes(getName());
byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
@@ -783,7 +781,7 @@ public class TestHRegion extends HBaseTe
puts[i].add(cf, qual, val);
}
- OperationStatus[] codes = this.region.put(puts);
+ OperationStatus[] codes = this.region.batchMutate(puts);
assertEquals(10, codes.length);
for (int i = 0; i < 10; i++) {
assertEquals(OperationStatusCode.SUCCESS, codes[i]
@@ -794,7 +792,7 @@ public class TestHRegion extends HBaseTe
LOG.info("Next a batch put with one invalid family");
puts[5].add(Bytes.toBytes("BAD_CF"), qual, val);
- codes = this.region.put(puts);
+ codes = this.region.batchMutate(puts);
assertEquals(10, codes.length);
for (int i = 0; i < 10; i++) {
assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY :
@@ -804,7 +802,7 @@ public class TestHRegion extends HBaseTe
metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source);
LOG.info("Next a batch put that has to break into two batches to avoid a lock");
- Integer lockedRow = region.obtainRowLock(Bytes.toBytes("row_2"));
+ RowLock rowLock = region.getRowLock(Bytes.toBytes("row_2"));
MultithreadedTestUtil.TestContext ctx =
new MultithreadedTestUtil.TestContext(conf);
@@ -813,7 +811,7 @@ public class TestHRegion extends HBaseTe
TestThread putter = new TestThread(ctx) {
@Override
public void doWork() throws IOException {
- retFromThread.set(region.put(puts));
+ retFromThread.set(region.batchMutate(puts));
}
};
LOG.info("...starting put thread while holding lock");
@@ -829,7 +827,7 @@ public class TestHRegion extends HBaseTe
}
}
LOG.info("...releasing row lock, which should let put thread continue");
- region.releaseRowLock(lockedRow);
+ rowLock.release();
LOG.info("...joining on thread");
ctx.stop();
LOG.info("...checking that next batch was synced");
@@ -840,29 +838,6 @@ public class TestHRegion extends HBaseTe
OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
}
- LOG.info("Nexta, a batch put which uses an already-held lock");
- lockedRow = region.obtainRowLock(Bytes.toBytes("row_2"));
- LOG.info("...obtained row lock");
- List<Pair<Mutation, Integer>> putsAndLocks = Lists.newArrayList();
- for (int i = 0; i < 10; i++) {
- Pair<Mutation, Integer> pair = new Pair<Mutation, Integer>(puts[i], null);
- if (i == 2) pair.setSecond(lockedRow);
- putsAndLocks.add(pair);
- }
-
- codes = region.batchMutate(putsAndLocks.toArray(new Pair[0]));
- LOG.info("...performed put");
- for (int i = 0; i < 10; i++) {
- assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY :
- OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
- }
- // Make sure we didn't do an extra batch
- metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 5, source);
-
- // Make sure we still hold lock
- assertTrue(region.isRowLocked(lockedRow));
- LOG.info("...releasing lock");
- region.releaseRowLock(lockedRow);
} finally {
HRegion.closeHRegion(this.region);
this.region = null;
@@ -891,7 +866,7 @@ public class TestHRegion extends HBaseTe
puts[i].add(cf, qual, val);
}
- OperationStatus[] codes = this.region.put(puts);
+ OperationStatus[] codes = this.region.batchMutate(puts);
assertEquals(10, codes.length);
for (int i = 0; i < 10; i++) {
assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, codes[i]
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java?rev=1504000&r1=1503999&r2=1504000&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java Wed Jul 17 05:33:50 2013
@@ -233,7 +233,7 @@ public class TestParallelPut extends HBa
put.add(fam1, qual1, value);
in[0] = put;
try {
- OperationStatus[] ret = region.put(in);
+ OperationStatus[] ret = region.batchMutate(in);
assertEquals(1, ret.length);
assertEquals(OperationStatusCode.SUCCESS, ret[0].getOperationStatusCode());
assertGet(rowkey, fam1, qual1, value);