You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2018/02/21 01:00:31 UTC

[2/4] hbase git commit: HBASE-20017 BufferedMutatorImpl submit the same mutation repeatedly

HBASE-20017 BufferedMutatorImpl submit the same mutation repeatedly

Signed-off-by: Andrew Purtell <ap...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7eaf67a3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7eaf67a3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7eaf67a3

Branch: refs/heads/branch-1.4
Commit: 7eaf67a33e36516d8be7a67ce8dadd0d533dc5e4
Parents: f263137
Author: Chia-Ping Tsai <ch...@gmail.com>
Authored: Sun Feb 18 17:46:08 2018 +0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue Feb 20 16:59:47 2018 -0800

----------------------------------------------------------------------
 .../hbase/client/BufferedMutatorImpl.java       | 69 +++++++++++---------
 .../hadoop/hbase/client/TestAsyncProcess.java   | 45 +++++++++++++
 .../hadoop/hbase/regionserver/StoreScanner.java |  2 +-
 3 files changed, 83 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7eaf67a3/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index 1974be3..d207a82 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -16,6 +16,7 @@
 package org.apache.hadoop.hbase.client;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.io.Closeable;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -59,7 +60,7 @@ import java.util.concurrent.atomic.AtomicLong;
 public class BufferedMutatorImpl implements BufferedMutator {
 
   private static final Log LOG = LogFactory.getLog(BufferedMutatorImpl.class);
-  
+
   private final ExceptionListener listener;
 
   protected ClusterConnection connection; // non-final so can be overridden in test
@@ -228,26 +229,22 @@ public class BufferedMutatorImpl implements BufferedMutator {
     }
 
     if (!synchronous) {
-      QueueRowAccess taker = new QueueRowAccess();
-      try {
+      try (QueueRowAccess taker = createQueueRowAccess()){
         ap.submit(tableName, taker, true, null, false);
         if (ap.hasError()) {
           LOG.debug(tableName + ": One or more of the operations have failed -"
               + " waiting for all operation in progress to finish (successfully or not)");
         }
-      } finally {
-        taker.restoreRemainder();
       }
     }
     if (synchronous || ap.hasError()) {
-      QueueRowAccess taker = new QueueRowAccess();
-      try {
-        while (!taker.isEmpty()) {
+      while (true) {
+        try (QueueRowAccess taker = createQueueRowAccess()){
+          if (taker.isEmpty()) {
+            break;
+          }
           ap.submit(tableName, taker, true, null, false);
-          taker.reset();
         }
-      } finally {
-        taker.restoreRemainder();
       }
 
       RetriesExhaustedWithDetailsException error =
@@ -304,36 +301,35 @@ public class BufferedMutatorImpl implements BufferedMutator {
     return Arrays.asList(writeAsyncBuffer.toArray(new Row[0]));
   }
 
-  private class QueueRowAccess implements RowAccess<Row> {
-    private int remainder = undealtMutationCount.getAndSet(0);
+  @VisibleForTesting
+  QueueRowAccess createQueueRowAccess() {
+    return new QueueRowAccess();
+  }
 
-    void reset() {
-      restoreRemainder();
-      remainder = undealtMutationCount.getAndSet(0);
-    }
+  @VisibleForTesting
+  class QueueRowAccess implements RowAccess<Row>, Closeable {
+    private int remainder = undealtMutationCount.getAndSet(0);
+    private Mutation last = null;
 
     @Override
     public Iterator<Row> iterator() {
       return new Iterator<Row>() {
-        private final Iterator<Mutation> iter = writeAsyncBuffer.iterator();
         private int countDown = remainder;
-        private Mutation last = null;
         @Override
         public boolean hasNext() {
-          if (countDown <= 0) {
-            return false;
-          }
-          return iter.hasNext();
+          return countDown > 0;
         }
         @Override
         public Row next() {
+          restoreLastMutation();
           if (!hasNext()) {
             throw new NoSuchElementException();
           }
-          last = iter.next();
+          last = writeAsyncBuffer.poll();
           if (last == null) {
             throw new NoSuchElementException();
           }
+          currentWriteBufferSize.addAndGet(-last.heapSize());
           --countDown;
           return last;
         }
@@ -342,28 +338,37 @@ public class BufferedMutatorImpl implements BufferedMutator {
           if (last == null) {
             throw new IllegalStateException();
           }
-          iter.remove();
-          currentWriteBufferSize.addAndGet(-last.heapSize());
           --remainder;
+          last = null;
         }
       };
     }
 
+    private void restoreLastMutation() {
+      // restore the last mutation since it isn't submitted
+      if (last != null) {
+        writeAsyncBuffer.add(last);
+        currentWriteBufferSize.addAndGet(last.heapSize());
+        last = null;
+      }
+    }
+
     @Override
     public int size() {
       return remainder;
     }
 
-    void restoreRemainder() {
+    @Override
+    public boolean isEmpty() {
+      return remainder <= 0;
+    }
+    @Override
+    public void close() {
+      restoreLastMutation();
       if (remainder > 0) {
         undealtMutationCount.addAndGet(remainder);
         remainder = 0;
       }
     }
-
-    @Override
-    public boolean isEmpty() {
-      return remainder <= 0;
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7eaf67a3/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index e8c7b73..9ed3001 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hbase.client;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -1813,4 +1814,48 @@ public class TestAsyncProcess {
     LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms");
     Assert.assertTrue("Slept for too long: " + actualSleep + "ms", actualSleep <= expectedSleep);
   }
+
+  @Test
+  public void testQueueRowAccess() throws Exception {
+    ClusterConnection conn = createHConnection();
+    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null,
+      new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(100000));
+    Put p0 = new Put(DUMMY_BYTES_1).addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
+    Put p1 = new Put(DUMMY_BYTES_2).addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2);
+    mutator.mutate(p0);
+    BufferedMutatorImpl.QueueRowAccess ra0 = mutator.createQueueRowAccess();
+    // QueueRowAccess should take all undealt mutations
+    assertEquals(0, mutator.undealtMutationCount.get());
+    mutator.mutate(p1);
+    assertEquals(1, mutator.undealtMutationCount.get());
+    BufferedMutatorImpl.QueueRowAccess ra1 = mutator.createQueueRowAccess();
+    // QueueRowAccess should take all undealt mutations
+    assertEquals(0, mutator.undealtMutationCount.get());
+    assertEquals(1, ra0.size());
+    assertEquals(1, ra1.size());
+    Iterator<Row> iter0 = ra0.iterator();
+    Iterator<Row> iter1 = ra1.iterator();
+    assertTrue(iter0.hasNext());
+    assertTrue(iter1.hasNext());
+    // the next() will poll the mutation from inner buffer and update the buffer count
+    assertTrue(iter0.next() == p0);
+    assertEquals(1, mutator.writeAsyncBuffer.size());
+    assertEquals(p1.heapSize(), mutator.currentWriteBufferSize.get());
+    assertTrue(iter1.next() == p1);
+    assertEquals(0, mutator.writeAsyncBuffer.size());
+    assertEquals(0, mutator.currentWriteBufferSize.get());
+    assertFalse(iter0.hasNext());
+    assertFalse(iter1.hasNext());
+    // ra0 doest handle the mutation so the mutation won't be pushed back to buffer
+    iter0.remove();
+    ra0.close();
+    assertEquals(0, mutator.undealtMutationCount.get());
+    assertEquals(0, mutator.writeAsyncBuffer.size());
+    assertEquals(0, mutator.currentWriteBufferSize.get());
+    // ra1 doesn't handle the mutation so the mutation will be pushed back to buffer
+    ra1.close();
+    assertEquals(1, mutator.undealtMutationCount.get());
+    assertEquals(1, mutator.writeAsyncBuffer.size());
+    assertEquals(p1.heapSize(), mutator.currentWriteBufferSize.get());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7eaf67a3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 632f146..0b2d12f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -847,7 +847,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
       // these scanners are properly closed() whether or not the scan is completed successfully
       // Eagerly creating scanners so that we have the ref counting ticking on the newly created
       // store files. In case of stream scanners this eager creation does not induce performance
-      // penalty because in scans (that uses stream scanners) the next() call is bound to happen.   
+      // penalty because in scans (that uses stream scanners) the next() call is bound to happen.
       List<KeyValueScanner> scanners = store.getScanners(sfs, cacheBlocks, get, usePread,
         isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false);
       flushedstoreFileScanners.addAll(scanners);