You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2014/12/05 19:28:06 UTC
hbase git commit: HBASE-12639 Backport HBASE-12565 Race condition in
HRegion.batchMutate() causes partial data to be written when region closes
Repository: hbase
Updated Branches:
refs/heads/0.98 5668b6c8e -> 45c8be3fc
HBASE-12639 Backport HBASE-12565 Race condition in HRegion.batchMutate() causes partial data to be written when region closes
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/45c8be3f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/45c8be3f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/45c8be3f
Branch: refs/heads/0.98
Commit: 45c8be3fc76c2813032c576f81439954a498d6d7
Parents: 5668b6c
Author: stack <st...@apache.org>
Authored: Fri Dec 5 10:27:57 2014 -0800
Committer: stack <st...@apache.org>
Committed: Fri Dec 5 10:27:57 2014 -0800
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/HRegion.java | 107 ++++++++++---------
.../hbase/regionserver/TestAtomicOperation.java | 4 +-
.../hadoop/hbase/regionserver/TestHRegion.java | 107 +++++++++++++++----
3 files changed, 144 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/45c8be3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index cebef32..1d554fc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2245,25 +2245,22 @@ public class HRegion implements HeapSize { // , Writable{
/**
* Perform a batch of mutations.
* It supports only Put and Delete mutations and will ignore other types passed.
- * @param mutations the list of mutations
+ * @param batchOp contains the list of mutations
* @return an array of OperationStatus which internally contains the
* OperationStatusCode and the exceptionMessage if any.
* @throws IOException
*/
OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
boolean initialized = false;
- while (!batchOp.isDone()) {
- if (!batchOp.isInReplay()) {
- checkReadOnly();
- }
- checkResources();
-
- long newSize;
- Operation op = Operation.BATCH_MUTATE;
- if (batchOp.isInReplay()) op = Operation.REPLAY_BATCH_MUTATE;
- startRegionOperation(op);
+ Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
+ startRegionOperation(op);
+ try {
+ while (!batchOp.isDone()) {
+ if (!batchOp.isInReplay()) {
+ checkReadOnly();
+ }
+ checkResources();
- try {
if (!initialized) {
this.writeRequestsCount.add(batchOp.operations.length);
if (!batchOp.isInReplay()) {
@@ -2272,13 +2269,13 @@ public class HRegion implements HeapSize { // , Writable{
initialized = true;
}
long addedSize = doMiniBatchMutation(batchOp);
- newSize = this.addAndGetGlobalMemstoreSize(addedSize);
- } finally {
- closeRegionOperation(op);
- }
- if (isFlushSize(newSize)) {
- requestFlush();
+ long newSize = this.addAndGetGlobalMemstoreSize(addedSize);
+ if (isFlushSize(newSize)) {
+ requestFlush();
+ }
}
+ } finally {
+ closeRegionOperation(op);
}
return batchOp.retCodeDetails;
}
@@ -2404,7 +2401,7 @@ public class HRegion implements HeapSize { // , Writable{
boolean shouldBlock = numReadyToWrite == 0;
RowLock rowLock = null;
try {
- rowLock = getRowLock(mutation.getRow(), shouldBlock);
+ rowLock = getRowLockInternal(mutation.getRow(), shouldBlock);
} catch (IOException ioe) {
LOG.warn("Failed getting lock in batch put, row="
+ Bytes.toStringBinary(mutation.getRow()), ioe);
@@ -3575,45 +3572,53 @@ public class HRegion implements HeapSize { // , Writable{
* @throws IOException if waitForLock was true and the lock could not be acquired after waiting
*/
public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException {
- checkRow(row, "row lock");
startRegionOperation();
try {
- HashedBytes rowKey = new HashedBytes(row);
- RowLockContext rowLockContext = new RowLockContext(rowKey);
+ return getRowLockInternal(row, waitForLock);
+ } finally {
+ closeRegionOperation();
+ }
+ }
- // loop until we acquire the row lock (unless !waitForLock)
- while (true) {
- RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
- if (existingContext == null) {
- // Row is not already locked by any thread, use newly created context.
- break;
- } else if (existingContext.ownedByCurrentThread()) {
- // Row is already locked by current thread, reuse existing context instead.
- rowLockContext = existingContext;
- break;
- } else {
- // Row is already locked by some other thread, give up or wait for it
- if (!waitForLock) {
- return null;
- }
- try {
- if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
- throw new IOException("Timed out waiting for lock for row: " + rowKey);
- }
- } catch (InterruptedException ie) {
- LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
- InterruptedIOException iie = new InterruptedIOException();
- iie.initCause(ie);
- throw iie;
+ /**
+ * A version of getRowLock(byte[], boolean) to use when a region operation has already been
+ * started (the calling thread has already acquired the region-close-lock).
+ */
+ protected RowLock getRowLockInternal(byte[] row, boolean waitForLock) throws IOException {
+ checkRow(row, "row lock");
+ HashedBytes rowKey = new HashedBytes(row);
+ RowLockContext rowLockContext = new RowLockContext(rowKey);
+
+ // loop until we acquire the row lock (unless !waitForLock)
+ while (true) {
+ RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
+ if (existingContext == null) {
+ // Row is not already locked by any thread, use newly created context.
+ break;
+ } else if (existingContext.ownedByCurrentThread()) {
+ // Row is already locked by current thread, reuse existing context instead.
+ rowLockContext = existingContext;
+ break;
+ } else {
+ // Row is already locked by some other thread, give up or wait for it
+ if (!waitForLock) {
+ return null;
+ }
+ try {
+ if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
+ throw new IOException("Timed out waiting for lock for row: " + rowKey);
}
+ } catch (InterruptedException ie) {
+ LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
+ InterruptedIOException iie = new InterruptedIOException();
+ iie.initCause(ie);
+ throw iie;
}
}
-
- // allocate new lock for this thread
- return rowLockContext.newLock();
- } finally {
- closeRegionOperation();
}
+
+ // allocate new lock for this thread
+ return rowLockContext.newLock();
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/45c8be3f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
index 257579a..96da717 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
@@ -612,11 +612,11 @@ public class TestAtomicOperation {
}
@Override
- public RowLock getRowLock(final byte[] row, boolean waitForLock) throws IOException {
+ public RowLock getRowLockInternal(final byte[] row, boolean waitForLock) throws IOException {
if (testStep == TestStep.CHECKANDPUT_STARTED) {
latch.countDown();
}
- return new WrappedRowLock(super.getRowLock(row, waitForLock));
+ return new WrappedRowLock(super.getRowLockInternal(row, waitForLock));
}
public class WrappedRowLock extends RowLock {
http://git-wip-us.apache.org/repos/asf/hbase/blob/45c8be3f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index a26aeb5..777b3a5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -1005,12 +1005,11 @@ public class TestHRegion {
}
@Test
- public void testBatchPut() throws Exception {
- byte[] b = Bytes.toBytes(getName());
+ public void testBatchPut_whileNoRowLocksHeld() throws IOException {
byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
byte[] qual = Bytes.toBytes("qual");
byte[] val = Bytes.toBytes("val");
- this.region = initHRegion(b, getName(), CONF, cf);
+ this.region = initHRegion(Bytes.toBytes(getName()), getName(), CONF, cf);
MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
try {
long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
@@ -1040,9 +1039,34 @@ public class TestHRegion {
}
metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source);
+ } finally {
+ HRegion.closeHRegion(this.region);
+ this.region = null;
+ }
+ }
+
+ @Test
+ public void testBatchPut_whileMultipleRowLocksHeld() throws Exception {
+ byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
+ byte[] qual = Bytes.toBytes("qual");
+ byte[] val = Bytes.toBytes("val");
+ this.region = initHRegion(Bytes.toBytes(getName()), getName(), CONF, cf);
+ MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
+ try {
+ long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
+ metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
+
+ final Put[] puts = new Put[10];
+ for (int i = 0; i < 10; i++) {
+ puts[i] = new Put(Bytes.toBytes("row_" + i));
+ puts[i].add(cf, qual, val);
+ }
+ puts[5].add(Bytes.toBytes("BAD_CF"), qual, val);
- LOG.info("Next a batch put that has to break into two batches to avoid a lock");
- RowLock rowLock = region.getRowLock(Bytes.toBytes("row_2"));
+ LOG.info("batchPut will have to break into four batches to avoid row locks");
+ RowLock rowLock1 = region.getRowLock(Bytes.toBytes("row_2"));
+ RowLock rowLock2 = region.getRowLock(Bytes.toBytes("row_4"));
+ RowLock rowLock3 = region.getRowLock(Bytes.toBytes("row_6"));
MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
final AtomicReference<OperationStatus[]> retFromThread = new AtomicReference<OperationStatus[]>();
@@ -1052,36 +1076,77 @@ public class TestHRegion {
retFromThread.set(region.batchMutate(puts));
}
};
- LOG.info("...starting put thread while holding lock");
+ LOG.info("...starting put thread while holding locks");
ctx.addThread(putter);
ctx.startThreads();
- LOG.info("...waiting for put thread to sync first time");
- long startWait = System.currentTimeMillis();
- while (metricsAssertHelper.getCounter("syncTimeNumOps", source) == syncs + 2) {
- Thread.sleep(100);
- if (System.currentTimeMillis() - startWait > 10000) {
- fail("Timed out waiting for thread to sync first minibatch");
+ LOG.info("...waiting for put thread to sync 1st time");
+ waitForCounter(source, "syncTimeNumOps", syncs + 1);
+
+ // Now attempt to close the region from another thread. Prior to HBASE-12565
+ // this would cause the in-progress batchMutate operation to to fail with
+ // exception because it use to release and re-acquire the close-guard lock
+ // between batches. Caller then didn't get status indicating which writes succeeded.
+ // We now expect this thread to block until the batchMutate call finishes.
+ Thread regionCloseThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ HRegion.closeHRegion(region);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
- }
- LOG.info("...releasing row lock, which should let put thread continue");
- rowLock.release();
- LOG.info("...joining on thread");
+ };
+ regionCloseThread.start();
+
+ LOG.info("...releasing row lock 1, which should let put thread continue");
+ rowLock1.release();
+
+ LOG.info("...waiting for put thread to sync 2nd time");
+ waitForCounter(source, "syncTimeNumOps", syncs + 2);
+
+ LOG.info("...releasing row lock 2, which should let put thread continue");
+ rowLock2.release();
+
+ LOG.info("...waiting for put thread to sync 3rd time");
+ waitForCounter(source, "syncTimeNumOps", syncs + 3);
+
+ LOG.info("...releasing row lock 3, which should let put thread continue");
+ rowLock3.release();
+
+ LOG.info("...waiting for put thread to sync 4th time");
+ waitForCounter(source, "syncTimeNumOps", syncs + 4);
+
+ LOG.info("...joining on put thread");
ctx.stop();
- LOG.info("...checking that next batch was synced");
- metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 4, source);
- codes = retFromThread.get();
- for (int i = 0; i < 10; i++) {
+ regionCloseThread.join();
+
+ OperationStatus[] codes = retFromThread.get();
+ for (int i = 0; i < codes.length; i++) {
assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
codes[i].getOperationStatusCode());
}
-
} finally {
HRegion.closeHRegion(this.region);
this.region = null;
}
}
+ private void waitForCounter(MetricsWALSource source, String metricName, long expectedCount)
+ throws InterruptedException {
+ long startWait = System.currentTimeMillis();
+ long currentCount;
+ while ((currentCount = metricsAssertHelper.getCounter(metricName, source)) < expectedCount) {
+ Thread.sleep(100);
+ if (System.currentTimeMillis() - startWait > 10000) {
+ fail(String.format("Timed out waiting for '%s' >= '%s', currentCount=%s", metricName,
+ expectedCount, currentCount));
+ }
+ }
+ }
+
+
@Test
public void testBatchPutWithTsSlop() throws Exception {
byte[] b = Bytes.toBytes(getName());