You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2017/06/23 05:03:22 UTC
hbase git commit: HBASE-18221 Switch from pread to stream should
happen under HStore's reentrant lock (Ram)
Repository: hbase
Updated Branches:
refs/heads/master 7cc458e12 -> d09200876
HBASE-18221 Switch from pread to stream should happen under HStore's
reentrant lock (Ram)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d0920087
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d0920087
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d0920087
Branch: refs/heads/master
Commit: d092008766c460de329d14d40e9cfd2377dcaf01
Parents: 7cc458e
Author: Ramkrishna <ra...@intel.com>
Authored: Fri Jun 23 10:32:29 2017 +0530
Committer: Ramkrishna <ra...@intel.com>
Committed: Fri Jun 23 10:32:29 2017 +0530
----------------------------------------------------------------------
.../regionserver/DefaultStoreFileManager.java | 16 +-
.../hadoop/hbase/regionserver/HStore.java | 46 ++++++
.../apache/hadoop/hbase/regionserver/Store.java | 28 ++++
.../hbase/regionserver/StoreFileManager.java | 6 +
.../hadoop/hbase/regionserver/StoreScanner.java | 26 ++--
.../regionserver/StripeStoreFileManager.java | 5 +
.../hadoop/hbase/regionserver/TestStore.java | 146 ++++++++++++++++---
7 files changed, 235 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/d0920087/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
index f4f9aa6..915d62f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
@@ -27,10 +27,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Optional;
-import com.google.common.collect.ImmutableCollection;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -40,6 +36,10 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
+import com.google.common.collect.ImmutableCollection;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
/**
* Default implementation of StoreFileManager. Not thread-safe.
*/
@@ -117,6 +117,14 @@ class DefaultStoreFileManager implements StoreFileManager {
}
@Override
+ public final int getCompactedFilesCount() {
+ if (compactedfiles == null) {
+ return 0;
+ }
+ return compactedfiles.size();
+ }
+
+ @Override
public void addCompactionResults(
Collection<StoreFile> newCompactedfiles, Collection<StoreFile> results) {
ArrayList<StoreFile> newStoreFiles = Lists.newArrayList(storefiles);
http://git-wip-us.apache.org/repos/asf/hbase/blob/d0920087/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index ed0f201..9ab52c3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -727,6 +728,11 @@ public class HStore implements Store {
return this.storeEngine.getStoreFileManager().getStorefiles();
}
+ @Override
+ public Collection<StoreFile> getCompactedFiles() {
+ return this.storeEngine.getStoreFileManager().getCompactedfiles();
+ }
+
/**
* This throws a WrongRegionException if the HFile does not fit in this region, or an
* InvalidHFileException if the HFile is not valid.
@@ -1928,6 +1934,41 @@ public class HStore implements Store {
}
@Override
+ public List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners,
+ boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
+ byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
+ boolean includeMemstoreScanner) throws IOException {
+ this.lock.readLock().lock();
+ try {
+ Map<String, StoreFile> name2File =
+ new HashMap<>(getStorefilesCount() + getCompactedFilesCount());
+ for (StoreFile file : getStorefiles()) {
+ name2File.put(file.getFileInfo().getActiveFileName(), file);
+ }
+ if (getCompactedFiles() != null) {
+ for (StoreFile file : getCompactedFiles()) {
+ name2File.put(file.getFileInfo().getActiveFileName(), file);
+ }
+ }
+ List<StoreFile> filesToReopen = new ArrayList<>();
+ for (KeyValueScanner kvs : currentFileScanners) {
+ assert kvs.isFileScanner();
+ if (kvs.peek() == null) {
+ continue;
+ }
+ filesToReopen.add(name2File.get(kvs.getFilePath().getName()));
+ }
+ if (filesToReopen.isEmpty()) {
+ return null;
+ }
+ return getScanners(filesToReopen, cacheBlocks, false, false, matcher, startRow,
+ includeStartRow, stopRow, includeStopRow, readPt, false);
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ @Override
public String toString() {
return this.getColumnFamilyName();
}
@@ -1938,6 +1979,11 @@ public class HStore implements Store {
}
@Override
+ public int getCompactedFilesCount() {
+ return this.storeEngine.getStoreFileManager().getCompactedFilesCount();
+ }
+
+ @Override
public long getMaxStoreFileAge() {
long earliestTS = Long.MAX_VALUE;
for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/d0920087/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index f5e90eb..c0df66a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -61,6 +61,8 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
Collection<StoreFile> getStorefiles();
+ Collection<StoreFile> getCompactedFiles();
+
/**
* Close all the readers We don't need to worry about subsequent requests because the Region
* holds a write lock that will prevent any more reads or writes.
@@ -116,6 +118,27 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
boolean includeStopRow, long readPt) throws IOException;
/**
+ * Recreates the scanners on the current list of active store file scanners
+ * @param currentFileScanners the current set of active store file scanners
+ * @param cacheBlocks cache the blocks or not
+ * @param usePread use pread or not
+ * @param isCompaction is the scanner for compaction
+ * @param matcher the scan query matcher
+ * @param startRow the scan's start row
+ * @param includeStartRow should the scan include the start row
+ * @param stopRow the scan's stop row
+ * @param includeStopRow should the scan include the stop row
+ * @param readPt the read point of the current scane
+ * @param includeMemstoreScanner whether the current scanner should include memstorescanner
+ * @return list of scanners recreated on the current Scanners
+ * @throws IOException
+ */
+ List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners,
+ boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
+ byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
+ boolean includeMemstoreScanner) throws IOException;
+
+ /**
* Create scanners on the given files and if needed on the memstore with no filtering based on TTL
* (that happens further down the line).
* @param files the list of files on which the scanners has to be created
@@ -367,6 +390,11 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
int getStorefilesCount();
/**
+ * @return Count of compacted store files
+ */
+ int getCompactedFilesCount();
+
+ /**
* @return Max age of store files in this store
*/
long getMaxStoreFileAge();
http://git-wip-us.apache.org/repos/asf/hbase/blob/d0920087/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
index 933849c..eb760ed 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
@@ -104,6 +104,12 @@ public interface StoreFileManager {
int getStorefileCount();
/**
+ * Returns the number of compacted files.
+ * @return The number of files.
+ */
+ int getCompactedFilesCount();
+
+ /**
* Gets the store files to scan for a Scan or Get request.
* @param startRow Start row of the request.
* @param stopRow Stop row of the request.
http://git-wip-us.apache.org/repos/asf/hbase/blob/d0920087/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 9849c93..11301d8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -966,7 +966,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
return heap.reseek(kv);
}
- private void trySwitchToStreamRead() {
+ @VisibleForTesting
+ void trySwitchToStreamRead() {
if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null ||
bytesRead < preadMaxBytes) {
return;
@@ -977,34 +978,27 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
scanUsePread = false;
Cell lastTop = heap.peek();
- Map<String, StoreFile> name2File = new HashMap<>(store.getStorefilesCount());
- for (StoreFile file : store.getStorefiles()) {
- name2File.put(file.getFileInfo().getActiveFileName(), file);
- }
- List<StoreFile> filesToReopen = new ArrayList<>();
List<KeyValueScanner> memstoreScanners = new ArrayList<>();
List<KeyValueScanner> scannersToClose = new ArrayList<>();
for (KeyValueScanner kvs : currentScanners) {
if (!kvs.isFileScanner()) {
+ // collect memstorescanners here
memstoreScanners.add(kvs);
} else {
scannersToClose.add(kvs);
- if (kvs.peek() == null) {
- continue;
- }
- filesToReopen.add(name2File.get(kvs.getFilePath().getName()));
}
}
- if (filesToReopen.isEmpty()) {
- return;
- }
List<KeyValueScanner> fileScanners = null;
List<KeyValueScanner> newCurrentScanners;
KeyValueHeap newHeap;
try {
- fileScanners =
- store.getScanners(filesToReopen, cacheBlocks, false, false, matcher, scan.getStartRow(),
- scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), readPt, false);
+ // recreate the scanners on the current file scanners
+ fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false,
+ matcher, scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(),
+ scan.includeStopRow(), readPt, false);
+ if (fileScanners == null) {
+ return;
+ }
seekScanners(fileScanners, lastTop, false, parallelSeekEnabled);
newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size());
newCurrentScanners.addAll(fileScanners);
http://git-wip-us.apache.org/repos/asf/hbase/blob/d0920087/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
index 3c7469e..18a6eec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
@@ -147,6 +147,11 @@ public class StripeStoreFileManager
}
@Override
+ public int getCompactedFilesCount() {
+ return state.allCompactedFilesCached.size();
+ }
+
+ @Override
public void insertNewFiles(Collection<StoreFile> sfs) throws IOException {
CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true);
// Passing null does not cause NPE??
http://git-wip-us.apache.org/repos/asf/hbase/blob/d0920087/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
index 22539c5..2318414 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
@@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
@@ -45,6 +46,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.commons.logging.Log;
@@ -104,7 +106,6 @@ import org.junit.rules.TestName;
import org.mockito.Mockito;
import com.google.common.collect.Lists;
-import java.util.concurrent.atomic.AtomicInteger;
/**
* Test class for the Store
@@ -161,19 +162,19 @@ public class TestStore {
init(methodName, TEST_UTIL.getConfiguration());
}
- private void init(String methodName, Configuration conf)
+ private Store init(String methodName, Configuration conf)
throws IOException {
HColumnDescriptor hcd = new HColumnDescriptor(family);
// some of the tests write 4 versions and then flush
// (with HBASE-4241, lower versions are collected on flush)
hcd.setMaxVersions(4);
- init(methodName, conf, hcd);
+ return init(methodName, conf, hcd);
}
- private void init(String methodName, Configuration conf,
+ private Store init(String methodName, Configuration conf,
HColumnDescriptor hcd) throws IOException {
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
- init(methodName, conf, htd, hcd);
+ return init(methodName, conf, htd, hcd);
}
private Store init(String methodName, Configuration conf, HTableDescriptor htd,
@@ -184,6 +185,11 @@ public class TestStore {
@SuppressWarnings("deprecation")
private Store init(String methodName, Configuration conf, HTableDescriptor htd,
HColumnDescriptor hcd, MyScannerHook hook) throws IOException {
+ return init(methodName, conf, htd, hcd, hook, false);
+ }
+ @SuppressWarnings("deprecation")
+ private Store init(String methodName, Configuration conf, HTableDescriptor htd,
+ HColumnDescriptor hcd, MyScannerHook hook, boolean switchToPread) throws IOException {
//Setting up a Store
Path basedir = new Path(DIR+methodName);
Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
@@ -198,7 +204,8 @@ public class TestStore {
} else {
htd.addFamily(hcd);
}
- ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null);
+ ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false,
+ MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null);
HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
final Configuration walConf = new Configuration(conf);
FSUtils.setRootDir(walConf, basedir);
@@ -208,7 +215,7 @@ public class TestStore {
if (hook == null) {
store = new HStore(region, hcd, conf);
} else {
- store = new MyStore(region, hcd, conf, hook);
+ store = new MyStore(region, hcd, conf, hook, switchToPread);
}
return store;
}
@@ -833,9 +840,10 @@ public class TestStore {
public static class DummyStoreEngine extends DefaultStoreEngine {
public static DefaultCompactor lastCreatedCompactor = null;
+
@Override
- protected void createComponents(
- Configuration conf, Store store, CellComparator comparator) throws IOException {
+ protected void createComponents(Configuration conf, Store store, CellComparator comparator)
+ throws IOException {
super.createComponents(conf, store, comparator);
lastCreatedCompactor = this.compactor;
}
@@ -1039,6 +1047,13 @@ public class TestStore {
return c;
}
+ private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value)
+ throws IOException {
+ Cell c = CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put.getCode(), value);
+ CellUtil.setSequenceId(c, sequenceId);
+ return c;
+ }
+
@Test
public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
Configuration conf = HBaseConfiguration.create();
@@ -1269,35 +1284,130 @@ public class TestStore {
storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
}
- private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) throws IOException {
+ private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook)
+ throws IOException {
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
HColumnDescriptor hcd = new HColumnDescriptor(family);
hcd.setMaxVersions(5);
return (MyStore) init(methodName, conf, htd, hcd, hook);
}
- private static class MyStore extends HStore {
+ class MyStore extends HStore {
private final MyScannerHook hook;
- MyStore(final HRegion region, final HColumnDescriptor family,
- final Configuration confParam, MyScannerHook hook) throws IOException {
+
+ MyStore(final HRegion region, final HColumnDescriptor family, final Configuration confParam,
+ MyScannerHook hook, boolean switchToPread) throws IOException {
super(region, family, confParam);
this.hook = hook;
}
@Override
public List<KeyValueScanner> getScanners(List<StoreFile> files, boolean cacheBlocks,
- boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
- boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
- boolean includeMemstoreScanner) throws IOException {
+ boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
+ boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
+ boolean includeMemstoreScanner) throws IOException {
hook.hook(this);
- return super.getScanners(files, cacheBlocks, usePread,
- isCompaction, matcher, startRow, true, stopRow, false, readPt, includeMemstoreScanner);
+ return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true,
+ stopRow, false, readPt, includeMemstoreScanner);
}
}
private interface MyScannerHook {
void hook(MyStore store) throws IOException;
}
+ @Test
+ public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception {
+ int flushSize = 500;
+ Configuration conf = HBaseConfiguration.create();
+ conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
+ conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0);
+ // Set the lower threshold to invoke the "MERGE" policy
+ HColumnDescriptor hcd = new HColumnDescriptor(family);
+ hcd.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
+ MyStore store = initMyStore(name.getMethodName(), conf, new MyScannerHook() {
+ @Override
+ public void hook(org.apache.hadoop.hbase.regionserver.TestStore.MyStore store)
+ throws IOException {
+ }
+ });
+ MemstoreSize memStoreSize = new MemstoreSize();
+ long ts = System.currentTimeMillis();
+ long seqID = 1l;
+ // Add some data to the region and do some flushes
+ for (int i = 1; i < 10; i++) {
+ store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
+ memStoreSize);
+ }
+ // flush them
+ flushStore(store, seqID);
+ for (int i = 11; i < 20; i++) {
+ store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
+ memStoreSize);
+ }
+ // flush them
+ flushStore(store, seqID);
+ for (int i = 21; i < 30; i++) {
+ store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
+ memStoreSize);
+ }
+ // flush them
+ flushStore(store, seqID);
+
+ assertEquals(3, store.getStorefilesCount());
+ ScanInfo scanInfo = store.getScanInfo();
+ Scan scan = new Scan();
+ scan.addFamily(family);
+ Collection<StoreFile> storefiles2 = store.getStorefiles();
+ ArrayList<StoreFile> actualStorefiles = Lists.newArrayList(storefiles2);
+ StoreScanner storeScanner =
+ (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
+ // get the current heap
+ KeyValueHeap heap = storeScanner.heap;
+ // create more store files
+ for (int i = 31; i < 40; i++) {
+ store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
+ memStoreSize);
+ }
+ // flush them
+ flushStore(store, seqID);
+
+ for (int i = 41; i < 50; i++) {
+ store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
+ memStoreSize);
+ }
+ // flush them
+ flushStore(store, seqID);
+ storefiles2 = store.getStorefiles();
+ ArrayList<StoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2);
+ actualStorefiles1.removeAll(actualStorefiles);
+ // Do compaction
+ List<Exception> exceptions = new ArrayList<Exception>();
+ MyThread thread = new MyThread(storeScanner);
+ thread.start();
+ store.replaceStoreFiles(actualStorefiles, actualStorefiles1);
+ thread.join();
+ KeyValueHeap heap2 = thread.getHeap();
+ assertFalse(heap.equals(heap2));
+ }
+
+ private static class MyThread extends Thread {
+ private StoreScanner scanner;
+ private KeyValueHeap heap;
+
+ public MyThread(StoreScanner scanner) {
+ this.scanner = scanner;
+ }
+
+ public KeyValueHeap getHeap() {
+ return this.heap;
+ }
+
+ public void run() {
+ scanner.trySwitchToStreamRead();
+ heap = scanner.heap;
+ }
+ }
+
private static class MyMemStoreCompactor extends MemStoreCompactor {
private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);