You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2017/07/28 06:07:26 UTC
hbase git commit: HBASE-17131 Avoid livelock caused by
HRegion#processRowsWithLocks
Repository: hbase
Updated Branches:
refs/heads/branch-1.3 488ae4be6 -> f18f916f0
HBASE-17131 Avoid livelock caused by HRegion#processRowsWithLocks
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f18f916f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f18f916f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f18f916f
Branch: refs/heads/branch-1.3
Commit: f18f916f050cf4dc106543d3dc7c6d2f78077661
Parents: 488ae4b
Author: Chia-Ping Tsai <ch...@gmail.com>
Authored: Fri Jul 28 03:06:41 2017 +0800
Committer: Chia-Ping Tsai <ch...@gmail.com>
Committed: Fri Jul 28 14:07:01 2017 +0800
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/HRegion.java | 28 ++--
.../hbase/client/TestFromClientSide3.java | 139 ++++++++++++++++++-
2 files changed, 152 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/f18f916f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index c2dae91..42a2389 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -7203,28 +7203,28 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
- boolean locked;
+ boolean locked = false;
boolean walSyncSuccessful = false;
- List<RowLock> acquiredRowLocks;
+ List<RowLock> acquiredRowLocks = null;
long addedSize = 0;
List<Mutation> mutations = new ArrayList<Mutation>();
Collection<byte[]> rowsToLock = processor.getRowsToLock();
long mvccNum = 0;
WALKey walKey = null;
try {
- // 2. Acquire the row lock(s)
- acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
- for (byte[] row : rowsToLock) {
- // Attempt to lock all involved rows, throw if any lock times out
- // use a writer lock for mixed reads and writes
- acquiredRowLocks.add(getRowLockInternal(row, false));
- }
- // 3. Region lock
- lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size());
- locked = true;
-
- long now = EnvironmentEdgeManager.currentTime();
try {
+ // 2. Acquire the row lock(s)
+ acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
+ for (byte[] row : rowsToLock) {
+ // Attempt to lock all involved rows, throw if any lock times out
+ // use a writer lock for mixed reads and writes
+ acquiredRowLocks.add(getRowLockInternal(row, false));
+ }
+ // 3. Region lock
+ lock(this.updatesLock.readLock(), acquiredRowLocks.isEmpty() ? 1 : acquiredRowLocks.size());
+ locked = true;
+
+ long now = EnvironmentEdgeManager.currentTime();
// 4. Let the processor scan the rows, generate mutations and add
// waledits
doProcessRowWithTimeout(
http://git-wip-us.apache.org/repos/asf/hbase/blob/f18f916f/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
index 09c7e86..08ccc42 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
@@ -19,6 +19,7 @@
*/
package org.apache.hadoop.hbase.client;
+import java.io.IOException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -28,16 +29,36 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
-
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import static junit.framework.Assert.assertFalse;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@@ -486,4 +507,120 @@ public class TestFromClientSide3 {
assertTrue(Arrays.equals(res.getValue(FAMILY, COL_QUAL), VAL_BYTES));
table.close();
}
+
+ @Test(timeout = 30000)
+ public void testMultiRowMutations() throws Exception, Throwable {
+ final TableName tableName = TableName.valueOf("testMultiRowMutations");
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ desc.addCoprocessor(MultiRowMutationEndpoint.class.getName());
+ desc.addCoprocessor(WatiingForMultiMutationsObserver.class.getName());
+ desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000));
+ desc.addFamily(new HColumnDescriptor(FAMILY));
+ TEST_UTIL.getHBaseAdmin().createTable(desc);
+ // new a connection for lower retry number.
+ Configuration copy = new Configuration(TEST_UTIL.getConfiguration());
+ copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+ try (Connection con = ConnectionFactory.createConnection(copy)) {
+ final byte[] row = Bytes.toBytes("ROW-0");
+ final byte[] rowLocked= Bytes.toBytes("ROW-1");
+ final byte[] value0 = Bytes.toBytes("VALUE-0");
+ final byte[] value1 = Bytes.toBytes("VALUE-1");
+ final byte[] value2 = Bytes.toBytes("VALUE-2");
+ assertNoLocks(tableName);
+ ExecutorService putService = Executors.newSingleThreadExecutor();
+ putService.execute(new Runnable() {
+ @Override
+ public void run() {
+ try (Table table = con.getTable(tableName)) {
+ Put put0 = new Put(rowLocked);
+ put0.addColumn(FAMILY, QUALIFIER, value0);
+ // the put will be blocked by WatiingForMultiMutationsObserver.
+ table.put(put0);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ });
+ ExecutorService cpService = Executors.newSingleThreadExecutor();
+ cpService.execute(new Runnable() {
+ @Override
+ public void run() {
+ Put put1 = new Put(row);
+ Put put2 = new Put(rowLocked);
+ put1.addColumn(FAMILY, QUALIFIER, value1);
+ put2.addColumn(FAMILY, QUALIFIER, value2);
+ try (Table table = con.getTable(tableName)) {
+ final MultiRowMutationProtos.MutateRowsRequest request
+ = MultiRowMutationProtos.MutateRowsRequest.newBuilder()
+ .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
+ org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT, put1))
+ .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
+ org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT, put2))
+ .build();
+ table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class, ROW, ROW,
+ new Batch.Call<MultiRowMutationProtos.MultiRowMutationService, MultiRowMutationProtos.MutateRowsResponse>() {
+ public MultiRowMutationProtos.MutateRowsResponse call(MultiRowMutationProtos.MultiRowMutationService instance) throws IOException {
+ ServerRpcController controller = new ServerRpcController();
+ BlockingRpcCallback<MultiRowMutationProtos.MutateRowsResponse> rpcCallback = new BlockingRpcCallback<>();
+ instance.mutateRows(controller, request, rpcCallback);
+ return rpcCallback.get();
+ }
+ });
+ fail("This cp should fail because the target lock is blocked by previous put");
+ } catch (Throwable ex) {
+ }
+ }
+ });
+ cpService.shutdown();
+ cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
+ WatiingForMultiMutationsObserver observer = find(tableName, WatiingForMultiMutationsObserver.class);
+ observer.latch.countDown();
+ putService.shutdown();
+ putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
+ try (Table table = con.getTable(tableName)) {
+ Get g0 = new Get(row);
+ Get g1 = new Get(rowLocked);
+ Result r0 = table.get(g0);
+ Result r1 = table.get(g1);
+ assertTrue(r0.isEmpty());
+ assertFalse(r1.isEmpty());
+ assertTrue(Bytes.equals(r1.getValue(FAMILY, QUALIFIER), value0));
+ }
+ assertNoLocks(tableName);
+ }
+ }
+
+ private static void assertNoLocks(final TableName tableName) throws IOException, InterruptedException {
+ HRegion region = (HRegion) find(tableName);
+ assertEquals(0, region.getLockedRows().size());
+ }
+ private static Region find(final TableName tableName)
+ throws IOException, InterruptedException {
+ HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName);
+ List<Region> regions = rs.getOnlineRegions(tableName);
+ assertEquals(1, regions.size());
+ return regions.get(0);
+ }
+
+ private static <T extends RegionObserver> T find(final TableName tableName,
+ Class<T> clz) throws IOException, InterruptedException {
+ Region region = find(tableName);
+ Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName());
+ assertTrue("The cp instance should be " + clz.getName()
+ + ", current instance is " + cp.getClass().getName(), clz.isInstance(cp));
+ return clz.cast(cp);
+ }
+
+ public static class WatiingForMultiMutationsObserver extends BaseRegionObserver {
+ final CountDownLatch latch = new CountDownLatch(1);
+ @Override
+ public void preBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+ try {
+ latch.await();
+ } catch (InterruptedException ex) {
+ throw new IOException(ex);
+ }
+ }
+ }
}