You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2015/10/26 18:34:10 UTC

[04/14] hbase git commit: HBASE-14683 Fix Batching in buffered mutator is awful when adding lists of mutations.

HBASE-14683 Fix Batching in buffered mutator is awful when adding lists of mutations.

Summary: Send the list of mutations to AsyncProcess only after done adding the list otherwise there's a lot of contention

Test Plan: UnitTests.

Differential Revision: https://reviews.facebook.net/D49251


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/caae3b2e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/caae3b2e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/caae3b2e

Branch: refs/heads/hbase-12439
Commit: caae3b2e5fe889d57dc69964e76faffdc16ff7ae
Parents: f9efeaa
Author: Elliott Clark <ec...@apache.org>
Authored: Thu Oct 22 20:24:48 2015 -0700
Committer: Elliott Clark <ec...@apache.org>
Committed: Sat Oct 24 10:05:39 2015 -0700

----------------------------------------------------------------------
 .../hbase/client/BufferedMutatorImpl.java       | 46 ++++++++------------
 .../master/cleaner/TestSnapshotFromMaster.java  | 15 ++++---
 2 files changed, 25 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/caae3b2e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index 0b222b1..5341d47 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -27,6 +27,7 @@ import com.google.common.annotations.VisibleForTesting;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -98,48 +99,35 @@ public class BufferedMutatorImpl implements BufferedMutator {
   @Override
   public synchronized void mutate(Mutation m) throws InterruptedIOException,
       RetriesExhaustedWithDetailsException {
-    doMutate(m);
+    mutate(Arrays.asList(m));
   }
 
   @Override
   public synchronized void mutate(List<? extends Mutation> ms) throws InterruptedIOException,
       RetriesExhaustedWithDetailsException {
-    for (Mutation m : ms) {
-      doMutate(m);
-    }
-  }
-
-  /**
-   * Add the put to the buffer. If the buffer is already too large, sends the buffer to the
-   * cluster.
-   *
-   * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster.
-   * @throws InterruptedIOException if we were interrupted.
-   */
-  private void doMutate(Mutation m) throws InterruptedIOException,
-      RetriesExhaustedWithDetailsException {
     if (closed) {
       throw new IllegalStateException("Cannot put when the BufferedMutator is closed.");
     }
-    if (!(m instanceof Put) && !(m instanceof Delete)) {
-      throw new IllegalArgumentException("Pass a Delete or a Put");
-    }
 
-    // This behavior is highly non-intuitive... it does not protect us against
-    // 94-incompatible behavior, which is a timing issue because hasError, the below code
-    // and setter of hasError are not synchronized. Perhaps it should be removed.
-    if (ap.hasError()) {
-      writeAsyncBuffer.add(m);
-      backgroundFlushCommits(true);
+    for (Mutation m : ms) {
+      if (m instanceof Put) {
+        validatePut((Put) m);
+      }
+        currentWriteBufferSize += m.heapSize();
     }
 
-    if (m instanceof Put) {
-      validatePut((Put) m);
-    }
+      // This behavior is highly non-intuitive... it does not protect us against
+      // 94-incompatible behavior, which is a timing issue because hasError, the below code
+      // and setter of hasError are not synchronized. Perhaps it should be removed.
+      if (ap.hasError()) {
+        writeAsyncBuffer.addAll(ms);
+        backgroundFlushCommits(true);
+      } else {
+        writeAsyncBuffer.addAll(ms);
+      }
 
-    currentWriteBufferSize += m.heapSize();
-    writeAsyncBuffer.add(m);
 
+    // Now try and queue what needs to be queued.
     while (currentWriteBufferSize > writeBufferSize) {
       backgroundFlushCommits(false);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/caae3b2e/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
index e00134a..687b6ae 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
@@ -111,7 +111,7 @@ public class TestSnapshotFromMaster {
     conf.setInt("hbase.hregion.memstore.flush.size", 25000);
     // so make sure we get a compaction when doing a load, but keep around some
     // files in the store
-    conf.setInt("hbase.hstore.compaction.min", 3);
+    conf.setInt("hbase.hstore.compaction.min", 2);
     conf.setInt("hbase.hstore.compactionThreshold", 5);
     // block writes if we get to 12 store files
     conf.setInt("hbase.hstore.blockingStoreFiles", 12);
@@ -288,11 +288,12 @@ public class TestSnapshotFromMaster {
     HTableDescriptor htd = new HTableDescriptor(TABLE_NAME);
     htd.setCompactionEnabled(false);
     UTIL.createTable(htd, new byte[][] { TEST_FAM }, null);
-    // load the table (creates 4 hfiles)
-    UTIL.loadTable(UTIL.getConnection().getTable(TABLE_NAME), TEST_FAM);
-    UTIL.flush(TABLE_NAME);
-    // Put some more data into the table so for sure we get more storefiles.
-    UTIL.loadTable(UTIL.getConnection().getTable(TABLE_NAME), TEST_FAM);
+
+    // load the table (creates at least 4 hfiles)
+    for ( int i = 0; i < 5; i++) {
+      UTIL.loadTable(UTIL.getConnection().getTable(TABLE_NAME), TEST_FAM);
+      UTIL.flush(TABLE_NAME);
+    }
 
     // disable the table so we can take a snapshot
     admin.disableTable(TABLE_NAME);
@@ -319,7 +320,7 @@ public class TestSnapshotFromMaster {
     List<HRegion> regions = UTIL.getHBaseCluster().getRegions(TABLE_NAME);
     for (HRegion region : regions) {
       region.waitForFlushesAndCompactions(); // enable can trigger a compaction, wait for it.
-      region.compactStores(); // min is 3 so will compact and archive
+      region.compactStores(); // min is 2 so will compact and archive
     }
     LOG.info("After compaction File-System state");
     FSUtils.logFileSystemState(fs, rootDir, LOG);