You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2015/10/26 18:34:10 UTC
[04/14] hbase git commit: HBASE-14683 Fix Batching in buffered
mutator is awful when adding lists of mutations.
HBASE-14683 Fix Batching in buffered mutator is awful when adding lists of mutations.
Summary: Send the list of mutations to AsyncProcess only after done adding the list otherwise there's a lot of contention
Test Plan: UnitTests.
Differential Revision: https://reviews.facebook.net/D49251
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/caae3b2e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/caae3b2e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/caae3b2e
Branch: refs/heads/hbase-12439
Commit: caae3b2e5fe889d57dc69964e76faffdc16ff7ae
Parents: f9efeaa
Author: Elliott Clark <ec...@apache.org>
Authored: Thu Oct 22 20:24:48 2015 -0700
Committer: Elliott Clark <ec...@apache.org>
Committed: Sat Oct 24 10:05:39 2015 -0700
----------------------------------------------------------------------
.../hbase/client/BufferedMutatorImpl.java | 46 ++++++++------------
.../master/cleaner/TestSnapshotFromMaster.java | 15 ++++---
2 files changed, 25 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/caae3b2e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index 0b222b1..5341d47 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -27,6 +27,7 @@ import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InterruptedIOException;
+import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -98,48 +99,35 @@ public class BufferedMutatorImpl implements BufferedMutator {
@Override
public synchronized void mutate(Mutation m) throws InterruptedIOException,
RetriesExhaustedWithDetailsException {
- doMutate(m);
+ mutate(Arrays.asList(m));
}
@Override
public synchronized void mutate(List<? extends Mutation> ms) throws InterruptedIOException,
RetriesExhaustedWithDetailsException {
- for (Mutation m : ms) {
- doMutate(m);
- }
- }
-
- /**
- * Add the put to the buffer. If the buffer is already too large, sends the buffer to the
- * cluster.
- *
- * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster.
- * @throws InterruptedIOException if we were interrupted.
- */
- private void doMutate(Mutation m) throws InterruptedIOException,
- RetriesExhaustedWithDetailsException {
if (closed) {
throw new IllegalStateException("Cannot put when the BufferedMutator is closed.");
}
- if (!(m instanceof Put) && !(m instanceof Delete)) {
- throw new IllegalArgumentException("Pass a Delete or a Put");
- }
- // This behavior is highly non-intuitive... it does not protect us against
- // 94-incompatible behavior, which is a timing issue because hasError, the below code
- // and setter of hasError are not synchronized. Perhaps it should be removed.
- if (ap.hasError()) {
- writeAsyncBuffer.add(m);
- backgroundFlushCommits(true);
+ for (Mutation m : ms) {
+ if (m instanceof Put) {
+ validatePut((Put) m);
+ }
+ currentWriteBufferSize += m.heapSize();
}
- if (m instanceof Put) {
- validatePut((Put) m);
- }
+ // This behavior is highly non-intuitive... it does not protect us against
+ // 94-incompatible behavior, which is a timing issue because hasError, the below code
+ // and setter of hasError are not synchronized. Perhaps it should be removed.
+ if (ap.hasError()) {
+ writeAsyncBuffer.addAll(ms);
+ backgroundFlushCommits(true);
+ } else {
+ writeAsyncBuffer.addAll(ms);
+ }
- currentWriteBufferSize += m.heapSize();
- writeAsyncBuffer.add(m);
+ // Now try and queue what needs to be queued.
while (currentWriteBufferSize > writeBufferSize) {
backgroundFlushCommits(false);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/caae3b2e/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
index e00134a..687b6ae 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
@@ -111,7 +111,7 @@ public class TestSnapshotFromMaster {
conf.setInt("hbase.hregion.memstore.flush.size", 25000);
// so make sure we get a compaction when doing a load, but keep around some
// files in the store
- conf.setInt("hbase.hstore.compaction.min", 3);
+ conf.setInt("hbase.hstore.compaction.min", 2);
conf.setInt("hbase.hstore.compactionThreshold", 5);
// block writes if we get to 12 store files
conf.setInt("hbase.hstore.blockingStoreFiles", 12);
@@ -288,11 +288,12 @@ public class TestSnapshotFromMaster {
HTableDescriptor htd = new HTableDescriptor(TABLE_NAME);
htd.setCompactionEnabled(false);
UTIL.createTable(htd, new byte[][] { TEST_FAM }, null);
- // load the table (creates 4 hfiles)
- UTIL.loadTable(UTIL.getConnection().getTable(TABLE_NAME), TEST_FAM);
- UTIL.flush(TABLE_NAME);
- // Put some more data into the table so for sure we get more storefiles.
- UTIL.loadTable(UTIL.getConnection().getTable(TABLE_NAME), TEST_FAM);
+
+ // load the table (creates at least 4 hfiles)
+ for ( int i = 0; i < 5; i++) {
+ UTIL.loadTable(UTIL.getConnection().getTable(TABLE_NAME), TEST_FAM);
+ UTIL.flush(TABLE_NAME);
+ }
// disable the table so we can take a snapshot
admin.disableTable(TABLE_NAME);
@@ -319,7 +320,7 @@ public class TestSnapshotFromMaster {
List<HRegion> regions = UTIL.getHBaseCluster().getRegions(TABLE_NAME);
for (HRegion region : regions) {
region.waitForFlushesAndCompactions(); // enable can trigger a compaction, wait for it.
- region.compactStores(); // min is 3 so will compact and archive
+ region.compactStores(); // min is 2 so will compact and archive
}
LOG.info("After compaction File-System state");
FSUtils.logFileSystemState(fs, rootDir, LOG);