You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2017/05/18 08:07:32 UTC

hbase git commit: HBASE-18019 Close redundant memstore scanners

Repository: hbase
Updated Branches:
  refs/heads/master 62d732302 -> 32d2062b5


HBASE-18019 Close redundant memstore scanners


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/32d2062b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/32d2062b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/32d2062b

Branch: refs/heads/master
Commit: 32d2062b5ccfbdb3630a9fceee98200d36a5c885
Parents: 62d7323
Author: Chia-Ping Tsai <ch...@gmail.com>
Authored: Thu May 18 16:07:21 2017 +0800
Committer: Chia-Ping Tsai <ch...@gmail.com>
Committed: Thu May 18 16:07:21 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/StoreScanner.java | 11 +--
 .../hadoop/hbase/regionserver/TestStore.java    | 75 +++++++++++++++++++-
 2 files changed, 79 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/32d2062b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 953e911..c7bf78d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -146,10 +146,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   private volatile boolean flushed = false;
   // generally we get one file from a flush
   private final List<StoreFile> flushedStoreFiles = new ArrayList<>(1);
-  // generally we get one memstore scanner from a flush
-  private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(1);
+  // Since CompactingMemstore is now default, we get three memstore scanners from a flush
+  private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(3);
   // The current list of scanners
-  private final List<KeyValueScanner> currentScanners = new ArrayList<>();
+  @VisibleForTesting
+  final List<KeyValueScanner> currentScanners = new ArrayList<>();
   // flush update lock
   private final ReentrantLock flushLock = new ReentrantLock();
 
@@ -876,9 +877,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     // Seek the new scanners to the last key
     seekScanners(scanners, lastTop, false, parallelSeekEnabled);
     // remove the older memstore scanner
-    for (int i = 0; i < currentScanners.size(); i++) {
+    for (int i = currentScanners.size() - 1; i >=0; i--) {
       if (!currentScanners.get(i).isFileScanner()) {
         currentScanners.remove(i).close();
+      } else {
+        // we add the memstore scanner to the end of currentScanners
         break;
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/32d2062b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
index 6ea8eaa..ff213b8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
@@ -32,6 +32,7 @@ import java.io.IOException;
 import java.lang.ref.SoftReference;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
@@ -44,9 +45,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import java.util.stream.IntStream;
+import java.util.stream.Collectors;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -958,6 +957,76 @@ public class TestStore {
     verify(spiedStore, times(0)).replaceStoreFiles(null, null);
   }
 
+  private long countMemStoreScanner(StoreScanner scanner) {
+    if (scanner.currentScanners == null) {
+      return 0;
+    }
+    return scanner.currentScanners.stream()
+            .filter(s -> !s.isFileScanner())
+            .count();
+  }
+
+  @Test
+  public void testNumberOfMemStoreScannersAfterFlush() throws IOException {
+    long seqId = 100;
+    long timestamp = System.currentTimeMillis();
+    Cell cell0 = CellUtil.createCell(row, family, qf1, timestamp,
+            KeyValue.Type.Put.getCode(), qf1);
+    CellUtil.setSequenceId(cell0, seqId);
+    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.EMPTY_LIST);
+
+    Cell cell1 = CellUtil.createCell(row, family, qf2, timestamp,
+            KeyValue.Type.Put.getCode(), qf1);
+    CellUtil.setSequenceId(cell1, seqId);
+    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1));
+
+    seqId = 101;
+    timestamp = System.currentTimeMillis();
+    Cell cell2 = CellUtil.createCell(row2, family, qf2, timestamp,
+            KeyValue.Type.Put.getCode(), qf1);
+     CellUtil.setSequenceId(cell2, seqId);
+    testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2));
+  }
+
+  private void testNumberOfMemStoreScannersAfterFlush(List<Cell> inputCellsBeforeSnapshot,
+      List<Cell> inputCellsAfterSnapshot) throws IOException {
+    init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size());
+    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+    long seqId = Long.MIN_VALUE;
+    for (Cell c : inputCellsBeforeSnapshot) {
+      quals.add(CellUtil.cloneQualifier(c));
+      seqId = Math.max(seqId, c.getSequenceId());
+    }
+    for (Cell c : inputCellsAfterSnapshot) {
+      quals.add(CellUtil.cloneQualifier(c));
+      seqId = Math.max(seqId, c.getSequenceId());
+    }
+    inputCellsBeforeSnapshot.forEach(c -> store.add(c, null));
+    StoreFlushContext storeFlushCtx = store.createFlushContext(id++);
+    storeFlushCtx.prepare();
+    inputCellsAfterSnapshot.forEach(c -> store.add(c, null));
+    int numberOfMemScannersWhenScaning = inputCellsAfterSnapshot.isEmpty() ? 1 : 2;
+    try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) {
+      // snaptshot + active (if it isn't empty)
+      assertEquals(numberOfMemScannersWhenScaning, countMemStoreScanner(s));
+      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
+      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
+      boolean more;
+      int cellCount = 0;
+      do {
+        List<Cell> cells = new ArrayList<>();
+        more = s.next(cells);
+        cellCount += cells.size();
+        assertEquals(more ? numberOfMemScannersWhenScaning : 0, countMemStoreScanner(s));
+      } while (more);
+      assertEquals("The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size()
+          + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(),
+          inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount);
+      // the current scanners is cleared
+      assertEquals(0, countMemStoreScanner(s));
+    }
+  }
+
   private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value) throws IOException {
     Cell c = CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put.getCode(), value);
     CellUtil.setSequenceId(c, sequenceId);