You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2017/01/23 10:24:16 UTC

hbase git commit: HBASE-17367 Make HTable#getBufferedMutator thread safe

Repository: hbase
Updated Branches:
  refs/heads/master 07e0a30ef -> ba4a926b6


HBASE-17367 Make HTable#getBufferedMutator thread safe


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ba4a926b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ba4a926b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ba4a926b

Branch: refs/heads/master
Commit: ba4a926b62f2e3858e0fae7e74087e20947fd72f
Parents: 07e0a30
Author: Yu Li <li...@apache.org>
Authored: Mon Jan 23 18:23:24 2017 +0800
Committer: Yu Li <li...@apache.org>
Committed: Mon Jan 23 18:23:24 2017 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/client/HTable.java  | 19 +++----
 .../hbase/client/TestFromClientSide3.java       | 52 ++++++++++++++++++++
 2 files changed, 62 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ba4a926b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 3bb0a77..72d71eb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -108,7 +108,8 @@ public class HTable implements Table {
   private final Configuration configuration;
   private final ConnectionConfiguration connConfiguration;
   @VisibleForTesting
-  BufferedMutatorImpl mutator;
+  volatile BufferedMutatorImpl mutator;
+  private final Object mutatorLock = new Object();
   private boolean closed = false;
   private final int scannerCaching;
   private final long scannerMaxResultSize;
@@ -1333,14 +1334,14 @@ public class HTable implements Table {
   @VisibleForTesting
   BufferedMutator getBufferedMutator() throws IOException {
     if (mutator == null) {
-      this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(
-          new BufferedMutatorParams(tableName)
-              .pool(pool)
-              .writeBufferSize(writeBufferSize)
-              .maxKeyValueSize(connConfiguration.getMaxKeyValueSize())
-              .opertationTimeout(operationTimeout)
-              .rpcTimeout(writeRpcTimeout)
-      );
+      synchronized (mutatorLock) {
+        if (mutator == null) {
+          this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(
+            new BufferedMutatorParams(tableName).pool(pool).writeBufferSize(writeBufferSize)
+                .maxKeyValueSize(connConfiguration.getMaxKeyValueSize())
+                .opertationTimeout(operationTimeout).rpcTimeout(writeRpcTimeout));
+        }
+      }
     }
     return mutator;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba4a926b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
index b863b40..6f9637f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
@@ -28,6 +28,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -749,6 +751,56 @@ public class TestFromClientSide3 {
 
   }
 
+  @Test
+  public void testPutThenGetWithMultipleThreads() throws Exception {
+    TableName TABLE = TableName.valueOf("testParallelPutAndGet");
+    final int THREAD_NUM = 20;
+    final int ROUND_NUM = 10;
+    for (int round = 0; round < ROUND_NUM; round++) {
+      ArrayList<Thread> threads = new ArrayList<>(THREAD_NUM);
+      final AtomicInteger successCnt = new AtomicInteger(0);
+      Table ht = TEST_UTIL.createTable(TABLE, FAMILY);
+      for (int i = 0; i < THREAD_NUM; i++) {
+        final int index = i;
+        Thread t = new Thread(new Runnable() {
+
+          @Override
+          public void run() {
+            final byte[] row = Bytes.toBytes("row-" + index);
+            final byte[] value = Bytes.toBytes("v" + index);
+            try {
+              Put put = new Put(row);
+              put.addColumn(FAMILY, QUALIFIER, value);
+              ht.put(put);
+              Get get = new Get(row);
+              Result result = ht.get(get);
+              byte[] returnedValue = result.getValue(FAMILY, QUALIFIER);
+              if (Bytes.equals(value, returnedValue)) {
+                successCnt.getAndIncrement();
+              } else {
+                LOG.error("Should be equal but not, original value: " + Bytes.toString(value)
+                    + ", returned value: "
+                    + (returnedValue == null ? "null" : Bytes.toString(returnedValue)));
+              }
+            } catch (Throwable e) {
+              // do nothing
+            }
+          }
+        });
+        threads.add(t);
+      }
+      for (Thread t : threads) {
+        t.start();
+      }
+      for (Thread t : threads) {
+        t.join();
+      }
+      assertEquals("Not equal in round " + round, THREAD_NUM, successCnt.get());
+      ht.close();
+      TEST_UTIL.deleteTable(TABLE);
+    }
+  }
+
   private static void assertNoLocks(final TableName tableName) throws IOException, InterruptedException {
     HRegion region = (HRegion) find(tableName);
     assertEquals(0, region.getLockedRows().size());