You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2018/02/21 01:00:33 UTC
[4/4] hbase git commit: HBASE-20017 BufferedMutatorImpl submit the
same mutation repeatedly
HBASE-20017 BufferedMutatorImpl submit the same mutation repeatedly
Signed-off-by: Andrew Purtell <ap...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/bc1ac49d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/bc1ac49d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/bc1ac49d
Branch: refs/heads/branch-2
Commit: bc1ac49de2c2f9a54f00dc28a5aeb0dc7b113022
Parents: c5ca3c2
Author: Chia-Ping Tsai <ch...@gmail.com>
Authored: Sun Feb 18 21:45:04 2018 +0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue Feb 20 16:59:48 2018 -0800
----------------------------------------------------------------------
.../hbase/client/BufferedMutatorImpl.java | 47 +++++++++++++++-----
.../hadoop/hbase/client/TestAsyncProcess.java | 44 ++++++++++++++++++
.../hbase/regionserver/wal/ReaderBase.java | 2 +-
3 files changed, 81 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/bc1ac49d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index 9d24b4d..d4bc811 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -295,7 +295,7 @@ public class BufferedMutatorImpl implements BufferedMutator {
break;
}
AsyncRequestFuture asf;
- try (QueueRowAccess access = new QueueRowAccess()) {
+ try (QueueRowAccess access = createQueueRowAccess()) {
if (access.isEmpty()) {
// It means someone has gotten the ticker to run the flush.
break;
@@ -406,16 +406,46 @@ public class BufferedMutatorImpl implements BufferedMutator {
return currentWriteBufferSize.get();
}
+ /**
+ * Count the mutations which haven't been processed.
+ * @return count of undealt mutation
+ */
@VisibleForTesting
int size() {
return undealtMutationCount.get();
}
- private class QueueRowAccess implements RowAccess<Row>, Closeable {
+ /**
+ * Count the mutations which haven't been flushed
+ * @return count of unflushed mutation
+ */
+ @VisibleForTesting
+ int getUnflushedSize() {
+ return writeAsyncBuffer.size();
+ }
+
+ @VisibleForTesting
+ QueueRowAccess createQueueRowAccess() {
+ return new QueueRowAccess();
+ }
+
+ @VisibleForTesting
+ class QueueRowAccess implements RowAccess<Row>, Closeable {
private int remainder = undealtMutationCount.getAndSet(0);
+ private Mutation last = null;
+
+ private void restoreLastMutation() {
+ // restore the last mutation since it isn't submitted
+ if (last != null) {
+ writeAsyncBuffer.add(last);
+ currentWriteBufferSize.addAndGet(last.heapSize());
+ last = null;
+ }
+ }
@Override
public void close() {
+ restoreLastMutation();
if (remainder > 0) {
undealtMutationCount.addAndGet(remainder);
remainder = 0;
@@ -425,25 +455,22 @@ public class BufferedMutatorImpl implements BufferedMutator {
@Override
public Iterator<Row> iterator() {
return new Iterator<Row>() {
- private final Iterator<Mutation> iter = writeAsyncBuffer.iterator();
private int countDown = remainder;
- private Mutation last = null;
@Override
public boolean hasNext() {
- if (countDown <= 0) {
- return false;
- }
- return iter.hasNext();
+ return countDown > 0;
}
@Override
public Row next() {
+ restoreLastMutation();
if (!hasNext()) {
throw new NoSuchElementException();
}
- last = iter.next();
+ last = writeAsyncBuffer.poll();
if (last == null) {
throw new NoSuchElementException();
}
+ currentWriteBufferSize.addAndGet(-last.heapSize());
--countDown;
return last;
}
@@ -452,8 +479,6 @@ public class BufferedMutatorImpl implements BufferedMutator {
if (last == null) {
throw new IllegalStateException();
}
- iter.remove();
- currentWriteBufferSize.addAndGet(-last.heapSize());
--remainder;
last = null;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/bc1ac49d/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 2979dcd..4a2ed8d 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -1795,4 +1795,48 @@ public class TestAsyncProcess {
LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms");
Assert.assertTrue("Slept for too long: " + actualSleep + "ms", actualSleep <= expectedSleep);
}
+
+ @Test
+ public void testQueueRowAccess() throws Exception {
+ ClusterConnection conn = createHConnection();
+ BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null,
+ new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(100000));
+ Put p0 = new Put(DUMMY_BYTES_1).addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
+ Put p1 = new Put(DUMMY_BYTES_2).addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2);
+ mutator.mutate(p0);
+ BufferedMutatorImpl.QueueRowAccess ra0 = mutator.createQueueRowAccess();
+ // QueueRowAccess should take all undealt mutations
+ assertEquals(0, mutator.size());
+ mutator.mutate(p1);
+ assertEquals(1, mutator.size());
+ BufferedMutatorImpl.QueueRowAccess ra1 = mutator.createQueueRowAccess();
+ // QueueRowAccess should take all undealt mutations
+ assertEquals(0, mutator.size());
+ assertEquals(1, ra0.size());
+ assertEquals(1, ra1.size());
+ Iterator<Row> iter0 = ra0.iterator();
+ Iterator<Row> iter1 = ra1.iterator();
+ assertTrue(iter0.hasNext());
+ assertTrue(iter1.hasNext());
+ // the next() will poll the mutation from inner buffer and update the buffer count
+ assertTrue(iter0.next() == p0);
+ assertEquals(1, mutator.getUnflushedSize());
+ assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize());
+ assertTrue(iter1.next() == p1);
+ assertEquals(0, mutator.getUnflushedSize());
+ assertEquals(0, mutator.getCurrentWriteBufferSize());
+ assertFalse(iter0.hasNext());
+ assertFalse(iter1.hasNext());
+ // ra0 doest handle the mutation so the mutation won't be pushed back to buffer
+ iter0.remove();
+ ra0.close();
+ assertEquals(0, mutator.size());
+ assertEquals(0, mutator.getUnflushedSize());
+ assertEquals(0, mutator.getCurrentWriteBufferSize());
+ // ra1 doesn't handle the mutation so the mutation will be pushed back to buffer
+ ra1.close();
+ assertEquals(1, mutator.size());
+ assertEquals(1, mutator.getUnflushedSize());
+ assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize());
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/bc1ac49d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
index f242cef..4338f6d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
@@ -139,7 +139,7 @@ public abstract class ReaderBase implements AbstractFSWALProvider.Reader {
* Initializes the compression after the shared stuff has been initialized. Called once.
*/
protected abstract void initAfterCompression() throws IOException;
-
+
/**
* Initializes the compression after the shared stuff has been initialized. Called once.
* @param cellCodecClsName class name of cell Codec