You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2017/05/18 08:07:32 UTC
hbase git commit: HBASE-18019 Close redundant memstore scanners
Repository: hbase
Updated Branches:
refs/heads/master 62d732302 -> 32d2062b5
HBASE-18019 Close redundant memstore scanners
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/32d2062b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/32d2062b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/32d2062b
Branch: refs/heads/master
Commit: 32d2062b5ccfbdb3630a9fceee98200d36a5c885
Parents: 62d7323
Author: Chia-Ping Tsai <ch...@gmail.com>
Authored: Thu May 18 16:07:21 2017 +0800
Committer: Chia-Ping Tsai <ch...@gmail.com>
Committed: Thu May 18 16:07:21 2017 +0800
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/StoreScanner.java | 11 +--
.../hadoop/hbase/regionserver/TestStore.java | 75 +++++++++++++++++++-
2 files changed, 79 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/32d2062b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 953e911..c7bf78d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -146,10 +146,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
private volatile boolean flushed = false;
// generally we get one file from a flush
private final List<StoreFile> flushedStoreFiles = new ArrayList<>(1);
- // generally we get one memstore scanner from a flush
- private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(1);
+ // Since CompactingMemstore is now default, we get three memstore scanners from a flush
+ private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(3);
// The current list of scanners
- private final List<KeyValueScanner> currentScanners = new ArrayList<>();
+ @VisibleForTesting
+ final List<KeyValueScanner> currentScanners = new ArrayList<>();
// flush update lock
private final ReentrantLock flushLock = new ReentrantLock();
@@ -876,9 +877,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// Seek the new scanners to the last key
seekScanners(scanners, lastTop, false, parallelSeekEnabled);
// remove the older memstore scanner
- for (int i = 0; i < currentScanners.size(); i++) {
+ for (int i = currentScanners.size() - 1; i >=0; i--) {
if (!currentScanners.get(i).isFileScanner()) {
currentScanners.remove(i).close();
+ } else {
+ // we add the memstore scanner to the end of currentScanners
break;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/32d2062b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
index 6ea8eaa..ff213b8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
@@ -32,6 +32,7 @@ import java.io.IOException;
import java.lang.ref.SoftReference;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
@@ -44,9 +45,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import java.util.stream.IntStream;
+import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -958,6 +957,76 @@ public class TestStore {
verify(spiedStore, times(0)).replaceStoreFiles(null, null);
}
+ private long countMemStoreScanner(StoreScanner scanner) {
+ if (scanner.currentScanners == null) {
+ return 0;
+ }
+ return scanner.currentScanners.stream()
+ .filter(s -> !s.isFileScanner())
+ .count();
+ }
+
+ @Test
+ public void testNumberOfMemStoreScannersAfterFlush() throws IOException {
+ long seqId = 100;
+ long timestamp = System.currentTimeMillis();
+ Cell cell0 = CellUtil.createCell(row, family, qf1, timestamp,
+ KeyValue.Type.Put.getCode(), qf1);
+ CellUtil.setSequenceId(cell0, seqId);
+ testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.EMPTY_LIST);
+
+ Cell cell1 = CellUtil.createCell(row, family, qf2, timestamp,
+ KeyValue.Type.Put.getCode(), qf1);
+ CellUtil.setSequenceId(cell1, seqId);
+ testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1));
+
+ seqId = 101;
+ timestamp = System.currentTimeMillis();
+ Cell cell2 = CellUtil.createCell(row2, family, qf2, timestamp,
+ KeyValue.Type.Put.getCode(), qf1);
+ CellUtil.setSequenceId(cell2, seqId);
+ testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2));
+ }
+
+ private void testNumberOfMemStoreScannersAfterFlush(List<Cell> inputCellsBeforeSnapshot,
+ List<Cell> inputCellsAfterSnapshot) throws IOException {
+ init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size());
+ TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ long seqId = Long.MIN_VALUE;
+ for (Cell c : inputCellsBeforeSnapshot) {
+ quals.add(CellUtil.cloneQualifier(c));
+ seqId = Math.max(seqId, c.getSequenceId());
+ }
+ for (Cell c : inputCellsAfterSnapshot) {
+ quals.add(CellUtil.cloneQualifier(c));
+ seqId = Math.max(seqId, c.getSequenceId());
+ }
+ inputCellsBeforeSnapshot.forEach(c -> store.add(c, null));
+ StoreFlushContext storeFlushCtx = store.createFlushContext(id++);
+ storeFlushCtx.prepare();
+ inputCellsAfterSnapshot.forEach(c -> store.add(c, null));
+ int numberOfMemScannersWhenScaning = inputCellsAfterSnapshot.isEmpty() ? 1 : 2;
+ try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) {
+ // snaptshot + active (if it isn't empty)
+ assertEquals(numberOfMemScannersWhenScaning, countMemStoreScanner(s));
+ storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
+ storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
+ boolean more;
+ int cellCount = 0;
+ do {
+ List<Cell> cells = new ArrayList<>();
+ more = s.next(cells);
+ cellCount += cells.size();
+ assertEquals(more ? numberOfMemScannersWhenScaning : 0, countMemStoreScanner(s));
+ } while (more);
+ assertEquals("The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size()
+ + ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(),
+ inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount);
+ // the current scanners is cleared
+ assertEquals(0, countMemStoreScanner(s));
+ }
+ }
+
private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value) throws IOException {
Cell c = CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put.getCode(), value);
CellUtil.setSequenceId(c, sequenceId);