You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/12/01 01:08:09 UTC
hbase git commit: HBASE-18233 We shouldn't wait for readlock in
doMiniBatchMutation in case of deadlock (Allan Yang)
Repository: hbase
Updated Branches:
refs/heads/branch-1.4 21eb8ba6d -> 6c490625a
HBASE-18233 We shouldn't wait for readlock in doMiniBatchMutation in case of deadlock (Allan Yang)
This patch plus a sorting of the batch (HBASE-17924) fixes a regression
in Increment/CheckAndPut-style operations.
Signed-off-by: Yu Li <ca...@gmail.com>
Signed-off-by: Allan Yang <al...@163.com>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6c490625
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6c490625
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6c490625
Branch: refs/heads/branch-1.4
Commit: 6c490625aa102ed33a99352b7b308e2a9c2f3c35
Parents: 21eb8ba
Author: Michael Stack <st...@apache.org>
Authored: Tue Nov 28 09:14:58 2017 -0800
Committer: Michael Stack <st...@apache.org>
Committed: Thu Nov 30 17:08:06 2017 -0800
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/HRegion.java | 100 +++++++++----
.../hadoop/hbase/client/TestMultiParallel.java | 148 +++++++++++++++++++
.../hbase/regionserver/TestAtomicOperation.java | 5 +-
3 files changed, 225 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/6c490625/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 1b178f6..06f2990 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2191,7 +2191,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* Should the store be flushed because it is old enough.
* <p>
* Every FlushPolicy should call this to determine whether a store is old enough to flush(except
- * that you always flush all stores). Otherwise the {@link #shouldFlush()} method will always
+ * that you always flush all stores). Otherwise the shouldFlush method will always
* returns true which will make a lot of flush requests.
*/
boolean shouldFlushStore(Store store) {
@@ -3243,11 +3243,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
continue;
}
+
+ //HBASE-18233
// If we haven't got any rows in our batch, we should block to
- // get the next one.
+ // get the next one's read lock. We need at least one row to mutate.
+ // If we have got rows, do not block when lock is not available,
+ // so that we can fail fast and go on with the rows with locks in
+ // the batch. By doing this, we can reduce contention and prevent
+ // possible deadlocks.
+ // The unfinished rows in the batch will be detected in batchMutate,
+ // and it wil try to finish them by calling doMiniBatchMutation again.
+ boolean shouldBlock = numReadyToWrite == 0;
RowLock rowLock = null;
try {
- rowLock = getRowLockInternal(mutation.getRow(), true);
+ rowLock = getRowLockInternal(mutation.getRow(), true, shouldBlock);
} catch (TimeoutIOException e) {
// We will retry when other exceptions, but we should stop if we timeout .
throw e;
@@ -3256,8 +3265,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
+ Bytes.toStringBinary(mutation.getRow()), ioe);
}
if (rowLock == null) {
- // We failed to grab another lock
- break; // stop acquiring more rows for this batch
+ // We failed to grab another lock. Stop acquiring more rows for this
+ // batch and go on with the gotten ones
+ break;
+
} else {
acquiredRowLocks.add(rowLock);
}
@@ -3356,7 +3367,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
checkAndPrepareMutation(cpMutation, isInReplay, cpFamilyMap, now);
// Acquire row locks. If not, the whole batch will fail.
- acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true));
+ acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true, true));
// Returned mutations from coprocessor correspond to the Mutation at index i. We can
// directly add the cells from those mutations to the familyMaps of this mutation.
@@ -3676,7 +3687,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
get.addColumn(family, qualifier);
checkRow(row, "checkAndMutate");
// Lock row - note that doBatchMutate will relock this row if called
- RowLock rowLock = getRowLockInternal(get.getRow(), false);
+ RowLock rowLock = getRowLockInternal(get.getRow());
// wait for all previous transactions to complete (with lock held)
mvcc.await();
try {
@@ -3786,7 +3797,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
get.addColumn(family, qualifier);
checkRow(row, "checkAndRowMutate");
// Lock row - note that doBatchMutate will relock this row if called
- RowLock rowLock = getRowLockInternal(get.getRow(), false);
+ RowLock rowLock = getRowLockInternal(get.getRow());
// wait for all previous transactions to complete (with lock held)
mvcc.await();
try {
@@ -4025,10 +4036,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* <b>not</b> check the families for validity.
*
* @param familyMap Map of kvs per family
- * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction.
- * If null, then this method internally creates a mvcc transaction.
- * @param output newly added KVs into memstore
- * @param isInReplay true when adding replayed KVs into memstore
* @return the additional memory usage of the memstore caused by the
* new entries.
* @throws IOException
@@ -5429,7 +5436,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* Get an exclusive ( write lock ) lock on a given row.
* @param row Which row to lock.
* @return A locked RowLock. The lock is exclusive and already aqquired.
- * @throws IOException
+ * @throws IOException if any error occurred
*/
public RowLock getRowLock(byte[] row) throws IOException {
return getRowLock(row, false);
@@ -5437,12 +5444,36 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public RowLock getRowLock(byte[] row, boolean readLock) throws IOException {
+ return getRowLock(row, readLock, true);
+ }
+
+ /**
+ *
+ * Get a row lock for the specified row. All locks are reentrant.
+ *
+ * Before calling this function make sure that a region operation has already been
+ * started (the calling thread has already acquired the region-close-guard lock).
+ * @param row The row actions will be performed against
+ * @param readLock is the lock reader or writer. True indicates that a non-exlcusive
+ * lock is requested
+ * @param waitForLock whether should wait for this lock
+ * @return A locked RowLock, or null if {@code waitForLock} set to false and tryLock failed
+ * @throws IOException if any error occurred
+ */
+ public RowLock getRowLock(byte[] row, boolean readLock, boolean waitForLock) throws IOException {
// Make sure the row is inside of this region before getting the lock for it.
checkRow(row, "row lock");
- return getRowLockInternal(row, readLock);
+ return getRowLockInternal(row, readLock, waitForLock);
}
- protected RowLock getRowLockInternal(byte[] row, boolean readLock) throws IOException {
+ // getRowLock calls checkRow. Call this to skip checkRow.
+ protected RowLock getRowLockInternal(byte[] row)
+ throws IOException {
+ return getRowLockInternal(row, false, true);
+ }
+
+ protected RowLock getRowLockInternal(byte[] row, boolean readLock, boolean waitForLock)
+ throws IOException {
// create an object to use a a key in the row lock map
HashedBytes rowKey = new HashedBytes(row);
@@ -5493,18 +5524,36 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
- if (timeout <= 0 || !result.getLock().tryLock(timeout, TimeUnit.MILLISECONDS)) {
+ boolean lockAvailable = false;
+ if (timeout > 0) {
+ if (waitForLock) {
+ // if waiting for lock, wait for timeout milliseconds
+ lockAvailable = result.getLock().tryLock(timeout, TimeUnit.MILLISECONDS);
+ } else {
+ // If we are not waiting, tryLock() returns immediately whether we have the lock or not.
+ lockAvailable = result.getLock().tryLock();
+ }
+ }
+ if (!lockAvailable) {
if (traceScope != null) {
traceScope.getSpan().addTimelineAnnotation("Failed to get row lock");
}
result = null;
- String message = "Timed out waiting for lock for row: " + rowKey + " in region "
- + getRegionInfo().getEncodedName();
- if (reachDeadlineFirst) {
- throw new TimeoutIOException(message);
+ String message = "Timed out waiting for lock for row: " + rowKey + " in region " +
+ getRegionInfo().getEncodedName() + ", timeout=" + timeout + ", deadlined=" +
+ reachDeadlineFirst + ", waitForLock=" + waitForLock;
+ if (waitForLock) {
+ if (reachDeadlineFirst) {
+ LOG.info("TIMEOUT: " + message);
+ throw new TimeoutIOException(message);
+ } else {
+ // If timeToDeadline is larger than rowLockWaitDuration, we can not drop the request.
+ LOG.info("IOE " + message);
+ throw new IOException(message);
+ }
} else {
- // If timeToDeadline is larger than rowLockWaitDuration, we can not drop the request.
- throw new IOException(message);
+ // We are here if we did a tryLock w/o waiting on it.
+ return null;
}
}
rowLockContext.setThreadName(Thread.currentThread().getName());
@@ -7393,7 +7442,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
for (byte[] row : rowsToLock) {
// Attempt to lock all involved rows, throw if any lock times out
// use a writer lock for mixed reads and writes
- acquiredRowLocks.add(getRowLockInternal(row, false));
+ acquiredRowLocks.add(getRowLockInternal(row));
}
// 3. Region lock
lock(this.updatesLock.readLock(), acquiredRowLocks.isEmpty() ? 1 : acquiredRowLocks.size());
@@ -7625,7 +7674,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
WALKey walKey = null;
boolean doRollBackMemstore = false;
try {
- rowLock = getRowLockInternal(row, false);
+ rowLock = getRowLockInternal(row);
assert rowLock != null;
try {
lock(this.updatesLock.readLock());
@@ -7907,7 +7956,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
Map<Store, List<Cell>> forMemStore = new HashMap<>();
Durability effectiveDurability = getEffectiveDurability(increment.getDurability());
try {
- rowLock = getRowLockInternal(increment.getRow(), false);
+ rowLock = getRowLockInternal(increment.getRow());
long txid = 0;
try {
lock(this.updatesLock.readLock());
@@ -8126,7 +8175,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* from <code>incrementCoordinates</code> only.
* @param increment
* @param columnFamily
- * @param incrementCoordinates
* @return Return the Cells to Increment
* @throws IOException
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/6c490625/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
index 4a80a26..2e5cc54 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@@ -63,6 +64,7 @@ public class TestMultiParallel {
private static final byte[] QUALIFIER = Bytes.toBytes("qual");
private static final String FAMILY = "family";
private static final TableName TEST_TABLE = TableName.valueOf("multi_test_table");
+ private static final TableName TEST_TABLE2 = TableName.valueOf("multi_test_table2");
private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY);
private static final byte[] ONE_ROW = Bytes.toBytes("xxx");
private static final byte [][] KEYS = makeKeys();
@@ -761,4 +763,150 @@ public class TestMultiParallel {
validateEmpty(result);
}
}
+
+ private static class MultiThread extends Thread {
+ public Throwable throwable = null;
+ private CountDownLatch endLatch;
+ private CountDownLatch beginLatch;
+ List<Put> puts;
+ public MultiThread(List<Put> puts, CountDownLatch beginLatch, CountDownLatch endLatch) {
+ this.puts = puts;
+ this.beginLatch = beginLatch;
+ this.endLatch = endLatch;
+ }
+ @Override
+ public void run() {
+ LOG.info("Start multi");
+ HTable table = null;
+ try {
+ table = new HTable(UTIL.getConfiguration(), TEST_TABLE2);
+ table.setAutoFlush(false);
+ beginLatch.await();
+ for (int i = 0; i < 100; i++) {
+ for(Put put : puts) {
+ table.put(put);
+ }
+ table.flushCommits();
+ }
+ } catch (Throwable t) {
+ throwable = t;
+ LOG.warn("Error when put:", t);
+ } finally {
+ endLatch.countDown();
+ if(table != null) {
+ try {
+ table.close();
+ } catch (IOException ioe) {
+ LOG.error("Error when close table", ioe);
+ }
+ }
+ }
+ LOG.info("End multi");
+ }
+ }
+
+
+ private static class IncrementThread extends Thread {
+ public Throwable throwable = null;
+ private CountDownLatch endLatch;
+ private CountDownLatch beginLatch;
+ List<Put> puts;
+ public IncrementThread(List<Put> puts, CountDownLatch beginLatch, CountDownLatch endLatch) {
+ this.puts = puts;
+ this.beginLatch = beginLatch;
+ this.endLatch = endLatch;
+ }
+ @Override
+ public void run() {
+ LOG.info("Start inc");
+ HTable table = null;
+ try {
+ table = new HTable(UTIL.getConfiguration(), TEST_TABLE2);
+ beginLatch.await();
+ for (int i = 0; i < 100; i++) {
+ for(Put put : puts) {
+ Increment inc = new Increment(put.getRow());
+ inc.addColumn(BYTES_FAMILY, BYTES_FAMILY, 1);
+ table.increment(inc);
+ }
+ }
+ } catch (Throwable t) {
+ throwable = t;
+ LOG.warn("Error when incr:", t);
+ } finally {
+ endLatch.countDown();
+ if(table != null) {
+ try {
+ table.close();
+ } catch (IOException ioe) {
+ LOG.error("Error when close table", ioe);
+ }
+ }
+ }
+ LOG.info("End inc");
+ }
+ }
+
+ /**
+ * UT for HBASE-18233, test for disordered batch mutation thread and
+ * increment won't lock each other
+ * @throws Exception if any error occurred
+ */
+ @Test(timeout=300000)
+ public void testMultiThreadWithRowLocks() throws Exception {
+ //set a short timeout to get timeout exception when getting row lock fail
+ UTIL.getConfiguration().setInt("hbase.rpc.timeout", 2000);
+ UTIL.getConfiguration().setInt("hbase.client.operation.timeout", 4000);
+ UTIL.getConfiguration().setInt("hbase.client.retries.number", 10);
+
+ UTIL.createTable(TEST_TABLE2, BYTES_FAMILY);
+ List<Put> puts = new ArrayList<>();
+ for(int i = 0; i < 10; i++) {
+ Put put = new Put(Bytes.toBytes(i));
+ put.add(BYTES_FAMILY, BYTES_FAMILY, Bytes.toBytes((long)0));
+ puts.add(put);
+ }
+ List<Put> reversePuts = new ArrayList<>(puts);
+ Collections.reverse(reversePuts);
+ int NUM_OF_THREAD = 12;
+ CountDownLatch latch = new CountDownLatch(NUM_OF_THREAD);
+ CountDownLatch beginLatch = new CountDownLatch(1);
+ int threadNum = NUM_OF_THREAD / 4;
+ List<MultiThread> multiThreads = new ArrayList<>();
+ List<IncrementThread> incThreads = new ArrayList<>();
+ for(int i = 0; i < threadNum; i ++) {
+ MultiThread thread = new MultiThread(reversePuts, beginLatch, latch);
+ thread.start();
+ multiThreads.add(thread);
+ }
+ for(int i = 0; i < threadNum; i++) {
+ MultiThread thread = new MultiThread(puts, beginLatch, latch);
+ thread.start();
+ multiThreads.add(thread);
+ }
+ for(int i = 0; i < threadNum; i ++) {
+ IncrementThread thread = new IncrementThread(reversePuts, beginLatch, latch);
+ thread.start();
+ incThreads.add(thread);
+ }
+ for(int i = 0; i < threadNum; i++) {
+ IncrementThread thread = new IncrementThread(puts, beginLatch, latch);
+ thread.start();
+ incThreads.add(thread);
+ }
+ long timeBegin = System.currentTimeMillis();
+ beginLatch.countDown();
+ latch.await();
+ LOG.error("Time took:" + (System.currentTimeMillis() - timeBegin));
+ for(MultiThread thread : multiThreads) {
+ if (thread != null && thread.throwable != null) {
+ LOG.error(thread.throwable);
+ }
+ Assert.assertTrue(thread.throwable == null);
+ }
+ for(IncrementThread thread : incThreads) {
+ Assert.assertTrue(thread.throwable == null);
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6c490625/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
index edef899..d9415df 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
@@ -663,11 +663,12 @@ public class TestAtomicOperation {
}
@Override
- public RowLock getRowLockInternal(final byte[] row, boolean readLock) throws IOException {
+ public RowLock getRowLockInternal(final byte[] row, boolean readLock, boolean waitForLock)
+ throws IOException {
if (testStep == TestStep.CHECKANDPUT_STARTED) {
latch.countDown();
}
- return new WrappedRowLock(super.getRowLockInternal(row, readLock));
+ return new WrappedRowLock(super.getRowLockInternal(row, readLock, waitForLock));
}
public class WrappedRowLock implements RowLock {