You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2017/01/23 23:02:12 UTC
[39/50] [abbrv] hbase git commit: HBASE-17367 Make
HTable#getBufferedMutator thread safe
HBASE-17367 Make HTable#getBufferedMutator thread safe
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ba4a926b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ba4a926b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ba4a926b
Branch: refs/heads/HBASE-16961
Commit: ba4a926b62f2e3858e0fae7e74087e20947fd72f
Parents: 07e0a30
Author: Yu Li <li...@apache.org>
Authored: Mon Jan 23 18:23:24 2017 +0800
Committer: Yu Li <li...@apache.org>
Committed: Mon Jan 23 18:23:24 2017 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/client/HTable.java | 19 +++----
.../hbase/client/TestFromClientSide3.java | 52 ++++++++++++++++++++
2 files changed, 62 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/ba4a926b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 3bb0a77..72d71eb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -108,7 +108,8 @@ public class HTable implements Table {
private final Configuration configuration;
private final ConnectionConfiguration connConfiguration;
@VisibleForTesting
- BufferedMutatorImpl mutator;
+ volatile BufferedMutatorImpl mutator;
+ private final Object mutatorLock = new Object();
private boolean closed = false;
private final int scannerCaching;
private final long scannerMaxResultSize;
@@ -1333,14 +1334,14 @@ public class HTable implements Table {
@VisibleForTesting
BufferedMutator getBufferedMutator() throws IOException {
if (mutator == null) {
- this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(
- new BufferedMutatorParams(tableName)
- .pool(pool)
- .writeBufferSize(writeBufferSize)
- .maxKeyValueSize(connConfiguration.getMaxKeyValueSize())
- .opertationTimeout(operationTimeout)
- .rpcTimeout(writeRpcTimeout)
- );
+ synchronized (mutatorLock) {
+ if (mutator == null) {
+ this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(
+ new BufferedMutatorParams(tableName).pool(pool).writeBufferSize(writeBufferSize)
+ .maxKeyValueSize(connConfiguration.getMaxKeyValueSize())
+ .opertationTimeout(operationTimeout).rpcTimeout(writeRpcTimeout));
+ }
+ }
}
return mutator;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ba4a926b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
index b863b40..6f9637f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
@@ -28,6 +28,8 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -749,6 +751,56 @@ public class TestFromClientSide3 {
}
+ @Test
+ public void testPutThenGetWithMultipleThreads() throws Exception {
+ TableName TABLE = TableName.valueOf("testParallelPutAndGet");
+ final int THREAD_NUM = 20;
+ final int ROUND_NUM = 10;
+ for (int round = 0; round < ROUND_NUM; round++) {
+ ArrayList<Thread> threads = new ArrayList<>(THREAD_NUM);
+ final AtomicInteger successCnt = new AtomicInteger(0);
+ Table ht = TEST_UTIL.createTable(TABLE, FAMILY);
+ for (int i = 0; i < THREAD_NUM; i++) {
+ final int index = i;
+ Thread t = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ final byte[] row = Bytes.toBytes("row-" + index);
+ final byte[] value = Bytes.toBytes("v" + index);
+ try {
+ Put put = new Put(row);
+ put.addColumn(FAMILY, QUALIFIER, value);
+ ht.put(put);
+ Get get = new Get(row);
+ Result result = ht.get(get);
+ byte[] returnedValue = result.getValue(FAMILY, QUALIFIER);
+ if (Bytes.equals(value, returnedValue)) {
+ successCnt.getAndIncrement();
+ } else {
+ LOG.error("Should be equal but not, original value: " + Bytes.toString(value)
+ + ", returned value: "
+ + (returnedValue == null ? "null" : Bytes.toString(returnedValue)));
+ }
+ } catch (Throwable e) {
+ // do nothing
+ }
+ }
+ });
+ threads.add(t);
+ }
+ for (Thread t : threads) {
+ t.start();
+ }
+ for (Thread t : threads) {
+ t.join();
+ }
+ assertEquals("Not equal in round " + round, THREAD_NUM, successCnt.get());
+ ht.close();
+ TEST_UTIL.deleteTable(TABLE);
+ }
+ }
+
private static void assertNoLocks(final TableName tableName) throws IOException, InterruptedException {
HRegion region = (HRegion) find(tableName);
assertEquals(0, region.getLockedRows().size());