You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/02/22 06:59:13 UTC

[24/50] [abbrv] hbase git commit: HBASE-20017 BufferedMutatorImpl submit the same mutation repeatedly

HBASE-20017 BufferedMutatorImpl submit the same mutation repeatedly

Signed-off-by: Andrew Purtell <ap...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/79d9403a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/79d9403a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/79d9403a

Branch: refs/heads/HBASE-19064
Commit: 79d9403a79cca60e614834659e3d9005d5482cac
Parents: 0068b95
Author: Chia-Ping Tsai <ch...@gmail.com>
Authored: Sun Feb 18 21:45:04 2018 +0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue Feb 20 16:59:48 2018 -0800

----------------------------------------------------------------------
 .../hbase/client/BufferedMutatorImpl.java       | 47 +++++++++++++++-----
 .../hadoop/hbase/client/TestAsyncProcess.java   | 44 ++++++++++++++++++
 .../hbase/regionserver/wal/ReaderBase.java      |  2 +-
 3 files changed, 81 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/79d9403a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index 9d24b4d..d4bc811 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -295,7 +295,7 @@ public class BufferedMutatorImpl implements BufferedMutator {
         break;
       }
       AsyncRequestFuture asf;
-      try (QueueRowAccess access = new QueueRowAccess()) {
+      try (QueueRowAccess access = createQueueRowAccess()) {
         if (access.isEmpty()) {
           // It means someone has gotten the ticker to run the flush.
           break;
@@ -406,16 +406,46 @@ public class BufferedMutatorImpl implements BufferedMutator {
     return currentWriteBufferSize.get();
   }
 
+  /**
+   * Count the mutations which haven't been processed.
+   * @return count of undealt mutation
+   */
   @VisibleForTesting
   int size() {
     return undealtMutationCount.get();
   }
 
-  private class QueueRowAccess implements RowAccess<Row>, Closeable {
+  /**
+   * Count the mutations which haven't been flushed
+   * @return count of unflushed mutation
+   */
+  @VisibleForTesting
+  int getUnflushedSize() {
+    return writeAsyncBuffer.size();
+  }
+
+  @VisibleForTesting
+  QueueRowAccess createQueueRowAccess() {
+    return new QueueRowAccess();
+  }
+
+  @VisibleForTesting
+  class QueueRowAccess implements RowAccess<Row>, Closeable {
     private int remainder = undealtMutationCount.getAndSet(0);
+    private Mutation last = null;
+
+    private void restoreLastMutation() {
+      // restore the last mutation since it isn't submitted
+      if (last != null) {
+        writeAsyncBuffer.add(last);
+        currentWriteBufferSize.addAndGet(last.heapSize());
+        last = null;
+      }
+    }
 
     @Override
     public void close() {
+      restoreLastMutation();
       if (remainder > 0) {
         undealtMutationCount.addAndGet(remainder);
         remainder = 0;
@@ -425,25 +455,22 @@ public class BufferedMutatorImpl implements BufferedMutator {
     @Override
     public Iterator<Row> iterator() {
       return new Iterator<Row>() {
-        private final Iterator<Mutation> iter = writeAsyncBuffer.iterator();
         private int countDown = remainder;
-        private Mutation last = null;
         @Override
         public boolean hasNext() {
-          if (countDown <= 0) {
-            return false;
-          }
-          return iter.hasNext();
+          return countDown > 0;
         }
         @Override
         public Row next() {
+          restoreLastMutation();
           if (!hasNext()) {
             throw new NoSuchElementException();
           }
-          last = iter.next();
+          last = writeAsyncBuffer.poll();
           if (last == null) {
             throw new NoSuchElementException();
           }
+          currentWriteBufferSize.addAndGet(-last.heapSize());
           --countDown;
           return last;
         }
@@ -452,8 +479,6 @@ public class BufferedMutatorImpl implements BufferedMutator {
           if (last == null) {
             throw new IllegalStateException();
           }
-          iter.remove();
-          currentWriteBufferSize.addAndGet(-last.heapSize());
           --remainder;
           last = null;
         }

http://git-wip-us.apache.org/repos/asf/hbase/blob/79d9403a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 2979dcd..4a2ed8d 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -1795,4 +1795,48 @@ public class TestAsyncProcess {
     LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms");
     Assert.assertTrue("Slept for too long: " + actualSleep + "ms", actualSleep <= expectedSleep);
   }
+
+  @Test
+  public void testQueueRowAccess() throws Exception {
+    ClusterConnection conn = createHConnection();
+    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null,
+      new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(100000));
+    Put p0 = new Put(DUMMY_BYTES_1).addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
+    Put p1 = new Put(DUMMY_BYTES_2).addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2);
+    mutator.mutate(p0);
+    BufferedMutatorImpl.QueueRowAccess ra0 = mutator.createQueueRowAccess();
+    // QueueRowAccess should take all undealt mutations
+    assertEquals(0, mutator.size());
+    mutator.mutate(p1);
+    assertEquals(1, mutator.size());
+    BufferedMutatorImpl.QueueRowAccess ra1 = mutator.createQueueRowAccess();
+    // QueueRowAccess should take all undealt mutations
+    assertEquals(0, mutator.size());
+    assertEquals(1, ra0.size());
+    assertEquals(1, ra1.size());
+    Iterator<Row> iter0 = ra0.iterator();
+    Iterator<Row> iter1 = ra1.iterator();
+    assertTrue(iter0.hasNext());
+    assertTrue(iter1.hasNext());
+    // the next() will poll the mutation from inner buffer and update the buffer count
+    assertTrue(iter0.next() == p0);
+    assertEquals(1, mutator.getUnflushedSize());
+    assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize());
+    assertTrue(iter1.next() == p1);
+    assertEquals(0, mutator.getUnflushedSize());
+    assertEquals(0, mutator.getCurrentWriteBufferSize());
+    assertFalse(iter0.hasNext());
+    assertFalse(iter1.hasNext());
+    // ra0 doest handle the mutation so the mutation won't be pushed back to buffer
+    iter0.remove();
+    ra0.close();
+    assertEquals(0, mutator.size());
+    assertEquals(0, mutator.getUnflushedSize());
+    assertEquals(0, mutator.getCurrentWriteBufferSize());
+    // ra1 doesn't handle the mutation so the mutation will be pushed back to buffer
+    ra1.close();
+    assertEquals(1, mutator.size());
+    assertEquals(1, mutator.getUnflushedSize());
+    assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/79d9403a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
index f242cef..4338f6d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
@@ -139,7 +139,7 @@ public abstract class ReaderBase implements AbstractFSWALProvider.Reader {
    * Initializes the compression after the shared stuff has been initialized. Called once.
    */
   protected abstract void initAfterCompression() throws IOException;
-  
+
   /**
    * Initializes the compression after the shared stuff has been initialized. Called once.
    * @param cellCodecClsName class name of cell Codec