You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2009/07/27 19:21:18 UTC
svn commit: r798225 - in /hadoop/hbase/trunk: ./
src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/io/hfile/
src/java/org/apache/hadoop/hbase/regionserver/
src/test/org/apache/hadoop/hbase/io/hfile/
src/test/org/apache/hadoop/hbase/reg...
Author: stack
Date: Mon Jul 27 17:21:17 2009
New Revision: 798225
URL: http://svn.apache.org/viewvc?rev=798225&view=rev
Log:
HBASE-1686 major compaction can create empty store files, causing AIOOB when trying to read
Modified:
hadoop/hbase/trunk/CHANGES.txt
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/KeyValue.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/hfile/HFile.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileGetScan.java
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/io/hfile/TestHFile.java
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java
Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=798225&r1=798224&r2=798225&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Mon Jul 27 17:21:17 2009
@@ -289,7 +289,8 @@
HBASE-1709 Thrift getRowWithColumns doesn't accept column-family only
(Mathias Lehmann via Stack)
HBASE-1692 Web UI is extremely slow / freezes up if you have many tables
-
+ HBASE-1686 major compaction can create empty store files, causing AIOOB
+ when trying to read
IMPROVEMENTS
HBASE-1089 Add count of regions on filesystem to master UI; add percentage
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/KeyValue.java?rev=798225&r1=798224&r2=798225&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/KeyValue.java Mon Jul 27 17:21:17 2009
@@ -643,6 +643,7 @@
* @return Key as a String.
*/
public static String keyToString(final byte [] b, final int o, final int l) {
+ if (b == null) return "";
int rowlength = Bytes.toShort(b, o);
String row = Bytes.toStringBinary(b, o + Bytes.SIZEOF_SHORT, rowlength);
int columnoffset = o + Bytes.SIZEOF_SHORT + 1 + rowlength;
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=798225&r1=798224&r2=798225&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/hfile/HFile.java Mon Jul 27 17:21:17 2009
@@ -974,13 +974,13 @@
}
/**
- * @return First key in the file.
+ * @return First key in the file. May be null if file has no entries.
*/
public byte [] getFirstKey() {
if (blockIndex == null) {
throw new RuntimeException("Block index not loaded");
}
- return blockIndex.blockKeys[0];
+ return this.blockIndex.isEmpty()? null: this.blockIndex.blockKeys[0];
}
public int getEntries() {
@@ -991,13 +991,13 @@
}
/**
- * @return Last key in the file.
+ * @return Last key in the file. May be null if file has no entries.
*/
public byte [] getLastKey() {
if (!isFileInfoLoaded()) {
throw new RuntimeException("Load file info first");
}
- return this.lastkey;
+ return this.blockIndex.isEmpty()? null: this.lastkey;
}
/**
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=798225&r1=798224&r2=798225&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java Mon Jul 27 17:21:17 2009
@@ -619,19 +619,18 @@
* thread must be able to block for long periods.
*
* <p>During this time, the Store can work as usual, getting values from
- * MapFiles and writing new MapFiles from the memstore.
+ * StoreFiles and writing new StoreFiles from the memstore.
*
- * Existing MapFiles are not destroyed until the new compacted TreeMap is
+ * Existing StoreFiles are not destroyed until the new compacted StoreFile is
* completely written-out to disk.
*
- * The compactLock prevents multiple simultaneous compactions.
+ * <p>The compactLock prevents multiple simultaneous compactions.
* The structureLock prevents us from interfering with other write operations.
*
- * We don't want to hold the structureLock for the whole time, as a compact()
+ * <p>We don't want to hold the structureLock for the whole time, as a compact()
* can be lengthy and we want to allow cache-flushes during this period.
*
- * @param mc True to force a major compaction regardless of
- * thresholds
+ * @param mc True to force a major compaction regardless of thresholds
* @return row to split around if a split is needed, null otherwise
* @throws IOException
*/
@@ -639,29 +638,32 @@
boolean forceSplit = this.regioninfo.shouldSplit(false);
boolean majorcompaction = mc;
synchronized (compactLock) {
- long maxId = -1;
// filesToCompact are sorted oldest to newest.
- List<StoreFile> filesToCompact = null;
- filesToCompact = new ArrayList<StoreFile>(this.storefiles.values());
- if (filesToCompact.size() <= 0) {
+ List<StoreFile> filesToCompact =
+ new ArrayList<StoreFile>(this.storefiles.values());
+ if (filesToCompact.isEmpty()) {
LOG.debug(this.storeNameStr + ": no store files to compact");
return null;
}
- // The max-sequenceID in any of the to-be-compacted TreeMaps is the
- // last key of storefiles.
- maxId = this.storefiles.lastKey().longValue();
+
+ // Max-sequenceID is the last key of the storefiles TreeMap
+ long maxId = this.storefiles.lastKey().longValue();
+
// Check to see if we need to do a major compaction on this region.
// If so, change doMajorCompaction to true to skip the incremental
// compacting below. Only check if doMajorCompaction is not true.
if (!majorcompaction) {
majorcompaction = isMajorCompaction(filesToCompact);
}
+
boolean references = hasReferences(filesToCompact);
if (!majorcompaction && !references &&
(forceSplit || (filesToCompact.size() < compactionThreshold))) {
return checkSplit(forceSplit);
}
- if (!fs.exists(this.regionCompactionDir) && !fs.mkdirs(this.regionCompactionDir)) {
+
+ if (!fs.exists(this.regionCompactionDir) &&
+ !fs.mkdirs(this.regionCompactionDir)) {
LOG.warn("Mkdir on " + this.regionCompactionDir.toString() + " failed");
return checkSplit(forceSplit);
}
@@ -670,7 +672,7 @@
// selection.
int countOfFiles = filesToCompact.size();
long totalSize = 0;
- long[] fileSizes = new long[countOfFiles];
+ long [] fileSizes = new long[countOfFiles];
long skipped = 0;
int point = 0;
for (int i = 0; i < countOfFiles; i++) {
@@ -689,6 +691,7 @@
fileSizes[i] = len;
totalSize += len;
}
+
if (!majorcompaction && !references) {
// Here we select files for incremental compaction.
// The rule is: if the largest(oldest) one is more than twice the
@@ -719,26 +722,17 @@
}
}
- // Step through them, writing to the brand-new file
- HFile.Writer writer = getWriter(this.regionCompactionDir);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Started compaction of " + filesToCompact.size() + " file(s)" +
- (references? ", hasReferences=true,": " ") + " into " +
- FSUtils.getPath(writer.getPath()));
- }
- try {
- compact(writer, filesToCompact, majorcompaction);
- } finally {
- // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
- StoreFile.appendMetadata(writer, maxId, majorcompaction);
- writer.close();
- }
-
+ // Ready to go. Have list of files to compact.
+ LOG.debug("Started compaction of " + filesToCompact.size() + " file(s)" +
+ (references? ", hasReferences=true,": " ") + " into " +
+ FSUtils.getPath(this.regionCompactionDir) + ", seqid=" + maxId);
+ HFile.Writer writer = compact(filesToCompact, majorcompaction, maxId);
// Move the compaction into place.
- completeCompaction(filesToCompact, writer);
+ StoreFile sf = completeCompaction(filesToCompact, writer);
if (LOG.isDebugEnabled()) {
LOG.debug("Completed" + (majorcompaction? " major ": " ") +
"compaction of " + this.storeNameStr +
+ "; new storefile is " + (sf == null? "none": sf.toString()) +
"; store size is " + StringUtils.humanReadableInt(storeSize));
}
}
@@ -834,16 +828,17 @@
* @param writer output writer
* @param filesToCompact which files to compact
* @param majorCompaction true to major compact (prune all deletes, max versions, etc)
+ * @param maxId Readers maximum sequence id.
+ * @return Product of compaction or null if all cells expired or deleted and
+ * nothing made it through the compaction.
* @throws IOException
*/
- private void compact(HFile.Writer writer,
- List<StoreFile> filesToCompact,
- boolean majorCompaction) throws IOException {
- // for each file, obtain a scanner:
+ private HFile.Writer compact(final List<StoreFile> filesToCompact,
+ final boolean majorCompaction, final long maxId)
+ throws IOException {
+ // For each file, obtain a scanner:
KeyValueScanner [] scanners = new KeyValueScanner[filesToCompact.size()];
- // init:
for (int i = 0; i < filesToCompact.size(); ++i) {
- // TODO open a new HFile.Reader w/o block cache.
Reader r = filesToCompact.get(i).getReader();
if (r == null) {
LOG.warn("StoreFile " + filesToCompact.get(i) + " has a null Reader");
@@ -853,82 +848,97 @@
scanners[i] = new StoreFileScanner(r.getScanner(false));
}
+ // Make the instantiation lazy in case compaction produces no product; i.e.
+ // where all source cells are expired or deleted.
+ HFile.Writer writer = null;
+ try {
if (majorCompaction) {
InternalScanner scanner = null;
-
try {
Scan scan = new Scan();
scan.setMaxVersions(family.getMaxVersions());
scanner = new StoreScanner(this, scan, scanners);
-
// since scanner.next() can return 'false' but still be delivering data,
// we have to use a do/while loop.
- ArrayList<KeyValue> row = new ArrayList<KeyValue>();
+ ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
boolean more = true;
- while ( more ) {
- more = scanner.next(row);
+ while (more) {
+ more = scanner.next(kvs);
// output to writer:
- for (KeyValue kv : row) {
+ for (KeyValue kv : kvs) {
+ if (writer == null) writer = getWriter(this.regionCompactionDir);
writer.append(kv);
}
- row.clear();
+ kvs.clear();
}
} finally {
- if (scanner != null)
- scanner.close();
+ if (scanner != null) scanner.close();
}
} else {
MinorCompactingStoreScanner scanner = null;
try {
scanner = new MinorCompactingStoreScanner(this, scanners);
-
- while ( scanner.next(writer) ) { }
+ writer = getWriter(this.regionCompactionDir);
+ while (scanner.next(writer)) {
+ // Nothing to do
+ }
} finally {
if (scanner != null)
scanner.close();
}
}
-
+ } finally {
+ if (writer != null) {
+ StoreFile.appendMetadata(writer, maxId, majorCompaction);
+ writer.close();
+ }
+ }
+ return writer;
}
/*
* It's assumed that the compactLock will be acquired prior to calling this
* method! Otherwise, it is not thread-safe!
*
- * It works by processing a compaction that's been written to disk.
+ * <p>It works by processing a compaction that's been written to disk.
*
* <p>It is usually invoked at the end of a compaction, but might also be
* invoked at HStore startup, if the prior execution died midway through.
*
* <p>Moving the compacted TreeMap into place means:
* <pre>
- * 1) Moving the new compacted MapFile into place
- * 2) Unload all replaced MapFiles, close and collect list to delete.
+ * 1) Moving the new compacted StoreFile into place
+ * 2) Unload all replaced StoreFile, close and collect list to delete.
* 3) Loading the new TreeMap.
* 4) Compute new store size
* </pre>
*
* @param compactedFiles list of files that were compacted
- * @param compactedFile HStoreFile that is the result of the compaction
+ * @param compactedFile StoreFile that is the result of the compaction
+ * @return StoreFile created. May be null.
* @throws IOException
*/
- private void completeCompaction(final List<StoreFile> compactedFiles,
+ private StoreFile completeCompaction(final List<StoreFile> compactedFiles,
final HFile.Writer compactedFile)
throws IOException {
- // 1. Moving the new files into place.
- Path p = null;
- try {
- p = StoreFile.rename(this.fs, compactedFile.getPath(),
- StoreFile.getRandomFilename(fs, this.homedir));
- } catch (IOException e) {
- LOG.error("Failed move of compacted file " + compactedFile.getPath(), e);
- return;
+ // 1. Moving the new files into place -- if there is a new file (may not
+ // be if all cells were expired or deleted).
+ StoreFile result = null;
+ if (compactedFile != null) {
+ Path p = null;
+ try {
+ p = StoreFile.rename(this.fs, compactedFile.getPath(),
+ StoreFile.getRandomFilename(fs, this.homedir));
+ } catch (IOException e) {
+ LOG.error("Failed move of compacted file " + compactedFile.getPath(), e);
+ return null;
+ }
+ result = new StoreFile(this.fs, p, blockcache, this.conf, this.inMemory);
}
- StoreFile finalCompactedFile = new StoreFile(this.fs, p, blockcache,
- this.conf, this.inMemory);
this.lock.writeLock().lock();
try {
try {
+ // 2. Unloading
// 3. Loading the new TreeMap.
// Change this.storefiles so it reflects new state but do not
// delete old store files until we have sent out notification of
@@ -939,10 +949,12 @@
this.storefiles.remove(e.getKey());
}
}
- // Add new compacted Reader and store file.
- Long orderVal = Long.valueOf(finalCompactedFile.getMaxSequenceId());
- this.storefiles.put(orderVal, finalCompactedFile);
- // Tell observers that list of Readers has changed.
+ // If a StoreFile result, move it into place. May be null.
+ if (result != null) {
+ Long orderVal = Long.valueOf(result.getMaxSequenceId());
+ this.storefiles.put(orderVal, result);
+ }
+ // Tell observers that list of StoreFiles has changed.
notifyChangedReadersObservers();
// Finally, delete old store files.
for (StoreFile hsf: compactedFiles) {
@@ -950,11 +962,10 @@
}
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
- LOG.error("Failed replacing compacted files for " +
- this.storeNameStr +
- ". Compacted file is " + finalCompactedFile.toString() +
- ". Files replaced are " + compactedFiles.toString() +
- " some of which may have been already removed", e);
+ LOG.error("Failed replacing compacted files in " + this.storeNameStr +
+ ". Compacted file is " + (result == null? "none": result.toString()) +
+ ". Files replaced " + compactedFiles.toString() +
+ " some of which may have been already removed", e);
}
// 4. Compute new store size
this.storeSize = 0L;
@@ -969,6 +980,7 @@
} finally {
this.lock.writeLock().unlock();
}
+ return result;
}
// ////////////////////////////////////////////////////////////////////////////
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileGetScan.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileGetScan.java?rev=798225&r1=798224&r2=798225&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileGetScan.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileGetScan.java Mon Jul 27 17:21:17 2009
@@ -62,7 +62,7 @@
public void get(List<KeyValue> result) throws IOException {
for(HFileScanner scanner : this.scanners) {
this.matcher.update();
- if(getStoreFile(scanner, result) || matcher.isDone()) {
+ if (getStoreFile(scanner, result) || matcher.isDone()) {
return;
}
}
@@ -77,11 +77,13 @@
*/
public boolean getStoreFile(HFileScanner scanner, List<KeyValue> result)
throws IOException {
- if(scanner.seekTo(startKey.getBuffer(), startKey.getKeyOffset(),
+ if (scanner.seekTo(startKey.getBuffer(), startKey.getKeyOffset(),
startKey.getKeyLength()) == -1) {
// No keys in StoreFile at or after specified startKey
// First row may be = our row, so we have to check anyways.
byte [] firstKey = scanner.getReader().getFirstKey();
+ // Key may be null if storefile is empty.
+ if (firstKey == null) return false;
short rowLen = Bytes.toShort(firstKey, 0, Bytes.SIZEOF_SHORT);
int rowOffset = Bytes.SIZEOF_SHORT;
if (this.matcher.rowComparator.compareRows(firstKey, rowOffset, rowLen,
Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/io/hfile/TestHFile.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/io/hfile/TestHFile.java?rev=798225&r1=798224&r2=798225&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/io/hfile/TestHFile.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/io/hfile/TestHFile.java Mon Jul 27 17:21:17 2009
@@ -52,6 +52,21 @@
private final int minBlockSize = 512;
private static String localFormatter = "%010d";
+ /**
+ * Test empty HFile.
+ * Test all features work reasonably when hfile is empty of entries.
+ * @throws IOException
+ */
+ public void testEmptyHFile() throws IOException {
+ Path f = new Path(ROOT_DIR, getName());
+ Writer w = new Writer(this.fs, f);
+ w.close();
+ Reader r = new Reader(fs, f, null, false);
+ r.loadFileInfo();
+ assertNull(r.getFirstKey());
+ assertNull(r.getLastKey());
+ }
+
// write some records into the tfile
// write them twice
private int writeSomeRecords(Writer writer, int start, int n)
@@ -213,7 +228,7 @@
reader.loadFileInfo();
assertNull(reader.getMetaBlock("non-existant"));
}
-
+
/**
* Make sure the orginals for our compression libs doesn't change on us.
*/
Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=798225&r1=798224&r2=798225&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java Mon Jul 27 17:21:17 2009
@@ -20,6 +20,8 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -27,13 +29,16 @@
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+
/**
* Test compactions
*/
@@ -83,7 +88,41 @@
}
super.tearDown();
}
-
+
+ /**
+ * Test that on a major compaction, if all cells are expired or deleted, then
+ * we'll end up with no product. Make sure scanner over region returns
+ * right answer in this case - and that it just basically works.
+ * @throws IOException
+ */
+ public void testMajorCompactingToNoOutput() throws IOException {
+ createStoreFile(r);
+ for (int i = 0; i < COMPACTION_THRESHOLD; i++) {
+ createStoreFile(r);
+ }
+ // Now delete everything.
+ InternalScanner s = r.getScanner(new Scan());
+ do {
+ List<KeyValue> results = new ArrayList<KeyValue>();
+ boolean result = s.next(results);
+ r.delete(new Delete(results.get(0).getRow()), null, false);
+ if (!result) break;
+ } while(true);
+ // Flush
+ r.flushcache();
+ // Major compact.
+ r.compactStores(true);
+ s = r.getScanner(new Scan());
+ int counter = 0;
+ do {
+ List<KeyValue> results = new ArrayList<KeyValue>();
+ boolean result = s.next(results);
+ if (!result) break;
+ counter++;
+ } while(true);
+ assertEquals(0, counter);
+ }
+
/**
* Run compaction and flushing memstore
* Assert deletes get cleaned up.
@@ -195,7 +234,7 @@
count = count();
assertTrue(count == 0);
}
-
+
private int count() throws IOException {
int count = 0;
for (StoreFile f: this.r.stores.
Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=798225&r1=798224&r2=798225&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java Mon Jul 27 17:21:17 2009
@@ -8,6 +8,8 @@
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentSkipListSet;
+import junit.framework.TestCase;
+
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -16,11 +18,10 @@
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.Progressable;
-import junit.framework.TestCase;
-
/**
* Test class fosr the Store
*/
@@ -87,6 +88,38 @@
//////////////////////////////////////////////////////////////////////////////
// Get tests
//////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Test for hbase-1686.
+ * @throws IOException
+ */
+ public void testEmptyStoreFile() throws IOException {
+ init(this.getName());
+ // Write a store file.
+ this.store.add(new KeyValue(row, family, qf1, null));
+ this.store.add(new KeyValue(row, family, qf2, null));
+ flush(1);
+ // Now put in place an empty store file. Its a little tricky. Have to
+ // do manually with hacked in sequence id.
+ StoreFile f = this.store.getStorefiles().firstEntry().getValue();
+ Path storedir = f.getPath().getParent();
+ long seqid = f.getMaxSequenceId();
+ HBaseConfiguration c = new HBaseConfiguration();
+ FileSystem fs = FileSystem.get(c);
+ Writer w = StoreFile.getWriter(fs, storedir);
+ StoreFile.appendMetadata(w, seqid + 1);
+ w.close();
+ this.store.close();
+ // Reopen it... should pick up two files
+ this.store = new Store(storedir.getParent().getParent(),
+ this.store.getHRegionInfo(),
+ this.store.getFamily(), fs, null, c, null);
+ System.out.println(this.store.getHRegionInfo().getEncodedName());
+ assertEquals(2, this.store.getStorefilesCount());
+ this.store.get(get, qualifiers, result);
+ assertEquals(1, result.size());
+ }
+
/**
* Getting data from memstore only
* @throws IOException