You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/15 02:04:49 UTC
svn commit: r1183570 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/regionserver/
Author: nspiegelberg
Date: Sat Oct 15 00:04:48 2011
New Revision: 1183570
URL: http://svn.apache.org/viewvc?rev=1183570&view=rev
Log:
HBASE-4241 Optimize flushing of the Memstore
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksRead.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1183570&r1=1183569&r2=1183570&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Sat Oct 15 00:04:48 2011
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.client.Sc
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.CollectionBackedScanner;
/**
* The MemStore holds in-memory modifications to the Store. Modifications
@@ -466,6 +467,15 @@ public class MemStore implements HeapSiz
}
/**
+ * @return scanner on snapshot
+ */
+ public static List<KeyValueScanner> getSnapshotScanners(
+ SortedSet<KeyValue> snapshot, KeyValue.KVComparator comparator) {
+ return Collections.<KeyValueScanner>singletonList(
+ new CollectionBackedScanner(snapshot, comparator));
+ }
+
+ /**
* Check if this memstore may contain the required keys
* @param scan
* @return False if the key definitely does not exist in this Memstore
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1183570&r1=1183569&r2=1183570&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Sat Oct 15 00:04:48 2011
@@ -475,7 +475,7 @@ public class Store extends SchemaConfigu
* @return StoreFile created.
* @throws IOException
*/
- private StoreFile internalFlushCache(final SortedSet<KeyValue> set,
+ private StoreFile internalFlushCache(final SortedSet<KeyValue> snapshot,
final long logCacheFlushId,
TimeRangeTracker snapshotTimeRangeTracker,
MonitoredTask status) throws IOException {
@@ -483,36 +483,53 @@ public class Store extends SchemaConfigu
String fileName;
long flushed = 0;
// Don't flush if there are no entries.
- if (set.size() == 0) {
+ if (snapshot.size() == 0) {
return null;
}
- long oldestTimestamp = System.currentTimeMillis() - ttl;
- // TODO: We can fail in the below block before we complete adding this
- // flush to list of store files. Add cleanup of anything put on filesystem
- // if we fail.
- synchronized (flushLock) {
- status.setStatus("Flushing " + this + ": creating writer");
- // A. Write the map out to the disk
- writer = createWriterInTmp(set.size());
- writer.setTimeRangeTracker(snapshotTimeRangeTracker);
- fileName = writer.getPath().getName();
- int entries = 0;
- try {
- for (KeyValue kv: set) {
- if (!isExpired(kv, oldestTimestamp)) {
- writer.append(kv);
- entries++;
- flushed += this.memstore.heapSizeChange(kv, true);
- }
+
+ // Use a store scanner from snapshot to find out which rows to flush
+ // Note that we need to retain deletes.
+ Scan scan = new Scan();
+ scan.setMaxVersions(family.getMaxVersions());
+ InternalScanner scanner = new StoreScanner(this, scan,
+ MemStore.getSnapshotScanners(snapshot, this.comparator), true);
+
+ try {
+ // TODO: We can fail in the below block before we complete adding this
+ // flush to list of store files. Add cleanup of anything put on filesystem
+ // if we fail.
+ synchronized (flushLock) {
+ status.setStatus("Flushing " + this + ": creating writer");
+ // A. Write the map out to the disk
+ writer = createWriterInTmp(snapshot.size());
+ writer.setTimeRangeTracker(snapshotTimeRangeTracker);
+ fileName = writer.getPath().getName();
+ int entries = 0;
+ try {
+ final List<KeyValue> kvs = new ArrayList<KeyValue>();
+ boolean hasMore;
+ do {
+ hasMore = scanner.next(kvs);
+ if (!kvs.isEmpty()) {
+ for (KeyValue kv : kvs) {
+ writer.append(kv);
+ entries++;
+ flushed += this.memstore.heapSizeChange(kv, true);
+ }
+ kvs.clear();
+ }
+ } while (hasMore);
+ } finally {
+ // Write out the log sequence number that corresponds to this output
+ // hfile. The hfile is current up to and including logCacheFlushId.
+ status.setStatus("Flushing " + this + ": appending metadata");
+ writer.appendMetadata(logCacheFlushId, false);
+ status.setStatus("Flushing " + this + ": closing flushed file");
+ writer.close();
}
- } finally {
- // Write out the log sequence number that corresponds to this output
- // hfile. The hfile is current up to and including logCacheFlushId.
- status.setStatus("Flushing " + this + ": appending metadata");
- writer.appendMetadata(logCacheFlushId, false);
- status.setStatus("Flushing " + this + ": closing flushed file");
- writer.close();
}
+ } finally {
+ scanner.close();
}
Path dstPath = new Path(homedir, fileName);
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java?rev=1183570&r1=1183569&r2=1183570&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java Sat Oct 15 00:04:48 2011
@@ -21,11 +21,10 @@
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.util.CollectionBackedScanner;
import org.apache.hadoop.hbase.KeyValue;
import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Collections;
import java.util.List;
/**
@@ -34,21 +33,11 @@ import java.util.List;
* to the provided comparator, and then the whole thing pretends
* to be a store file scanner.
*/
-public class KeyValueScanFixture extends NonLazyKeyValueScanner {
- ArrayList<KeyValue> data;
- Iterator<KeyValue> iter = null;
- KeyValue current = null;
- KeyValue.KVComparator comparator;
+public class KeyValueScanFixture extends CollectionBackedScanner {
public KeyValueScanFixture(KeyValue.KVComparator comparator,
KeyValue... incData) {
- this.comparator = comparator;
-
- data = new ArrayList<KeyValue>(incData.length);
- for( int i = 0; i < incData.length ; ++i) {
- data.add(incData[i]);
- }
- Collections.sort(data, this.comparator);
+ super(comparator, incData);
}
public static List<KeyValueScanner> scanFixture(KeyValue[] ... kvArrays) {
@@ -58,54 +47,4 @@ public class KeyValueScanFixture extends
}
return scanners;
}
-
-
- @Override
- public KeyValue peek() {
- return this.current;
- }
-
- @Override
- public KeyValue next() {
- KeyValue res = current;
-
- if (iter.hasNext())
- current = iter.next();
- else
- current = null;
- return res;
- }
-
- @Override
- public boolean seek(KeyValue key) {
- // start at beginning.
- iter = data.iterator();
- int cmp;
- KeyValue kv = null;
- do {
- if (!iter.hasNext()) {
- current = null;
- return false;
- }
- kv = iter.next();
- cmp = comparator.compare(key, kv);
- } while (cmp > 0);
- current = kv;
- return true;
- }
-
- @Override
- public boolean reseek(KeyValue key) {
- return seek(key);
- }
-
- @Override
- public void close() {
- // noop.
- }
-
- @Override
- public long getSequenceID() {
- return 0;
- }
}
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksRead.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksRead.java?rev=1183570&r1=1183569&r2=1183570&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksRead.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksRead.java Sat Oct 15 00:04:48 2011
@@ -338,4 +338,29 @@ public class TestBlocksRead extends HBas
verifyData(kvs[1], "row", "col2", 12);
verifyData(kvs[2], "row", "col3", 13);
}
+
+ @Test
+ public void testLazySeekBlocksReadWithDelete() throws Exception {
+ byte [] TABLE = Bytes.toBytes("testLazySeekBlocksReadWithDelete");
+ byte [] FAMILY = Bytes.toBytes("cf1");
+ byte [][] FAMILIES = new byte[][] { FAMILY };
+ KeyValue kvs[];
+
+ HBaseConfiguration conf = getConf();
+ initHRegion(TABLE, getName(), conf, FAMILIES);
+
+ deleteFamily(FAMILY, "row", 200);
+ for (int i = 0 ; i < 100; i++) {
+ putData(FAMILY, "row", "col"+i, i);
+ }
+ putData(FAMILY, "row", "col99", 201);
+
+ region.flushcache();
+ kvs = getData(FAMILY, "row", Arrays.asList("col0"), 2);
+ assertEquals(0, kvs.length);
+
+ kvs = getData(FAMILY, "row", Arrays.asList("col99"), 2);
+ assertEquals(1, kvs.length);
+ verifyData(kvs[0], "row", "col99", 201);
+ }
}
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java?rev=1183570&r1=1183569&r2=1183570&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java Sat Oct 15 00:04:48 2011
@@ -29,6 +29,7 @@ import java.util.List;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CollectionBackedScanner;
public class TestKeyValueHeap extends HBaseTestCase {
@@ -67,25 +68,36 @@ public class TestKeyValueHeap extends HB
//Cases that need to be checked are:
//1. The "smallest" KeyValue is in the same scanners as current
//2. Current scanner gets empty
-
+ KeyValue startKV;
+ Scanner scanner;
+
List<KeyValue> l1 = new ArrayList<KeyValue>();
- l1.add(new KeyValue(row1, fam1, col5, data));
+ startKV = new KeyValue(row1, fam1, col5, data);
+ l1.add(startKV);
l1.add(new KeyValue(row2, fam1, col1, data));
l1.add(new KeyValue(row2, fam1, col2, data));
- scanners.add(new Scanner(l1));
+ scanner = new Scanner(l1);
+ scanner.seek(startKV);
+ scanners.add(scanner);
List<KeyValue> l2 = new ArrayList<KeyValue>();
+ startKV = new KeyValue(row1, fam1, col1, data);
l2.add(new KeyValue(row1, fam1, col1, data));
l2.add(new KeyValue(row1, fam1, col2, data));
- scanners.add(new Scanner(l2));
+ scanner = new Scanner(l2);
+ scanner.seek(startKV);
+ scanners.add(scanner);
List<KeyValue> l3 = new ArrayList<KeyValue>();
+ startKV = new KeyValue(row1, fam1, col3, data);
l3.add(new KeyValue(row1, fam1, col3, data));
l3.add(new KeyValue(row1, fam1, col4, data));
l3.add(new KeyValue(row1, fam2, col1, data));
l3.add(new KeyValue(row1, fam2, col2, data));
l3.add(new KeyValue(row2, fam1, col3, data));
- scanners.add(new Scanner(l3));
+ scanner = new Scanner(l3);
+ scanner.seek(startKV);
+ scanners.add(scanner);
List<KeyValue> expected = new ArrayList<KeyValue>();
expected.add(new KeyValue(row1, fam1, col1, data));
@@ -129,25 +141,36 @@ public class TestKeyValueHeap extends HB
//Cases:
//1. Seek KeyValue that is not in scanner
//2. Check that smallest that is returned from a seek is correct
-
+ KeyValue startKV ;
+ Scanner scanner;
+
List<KeyValue> l1 = new ArrayList<KeyValue>();
- l1.add(new KeyValue(row1, fam1, col5, data));
+ startKV = new KeyValue(row1, fam1, col5, data);
+ l1.add(startKV);
l1.add(new KeyValue(row2, fam1, col1, data));
l1.add(new KeyValue(row2, fam1, col2, data));
- scanners.add(new Scanner(l1));
+ scanner = new Scanner(l1);
+ scanner.seek(startKV);
+ scanners.add(scanner);
List<KeyValue> l2 = new ArrayList<KeyValue>();
+ startKV = new KeyValue(row1, fam1, col1, data);
l2.add(new KeyValue(row1, fam1, col1, data));
l2.add(new KeyValue(row1, fam1, col2, data));
- scanners.add(new Scanner(l2));
+ scanner = new Scanner(l2);
+ scanner.seek(startKV);
+ scanners.add(scanner);
List<KeyValue> l3 = new ArrayList<KeyValue>();
+ startKV = new KeyValue(row1, fam1, col3, data);
l3.add(new KeyValue(row1, fam1, col3, data));
l3.add(new KeyValue(row1, fam1, col4, data));
l3.add(new KeyValue(row1, fam2, col1, data));
l3.add(new KeyValue(row1, fam2, col2, data));
l3.add(new KeyValue(row2, fam1, col3, data));
- scanners.add(new Scanner(l3));
+ scanner = new Scanner(l3);
+ scanner.seek(startKV);
+ scanners.add(scanner);
List<KeyValue> expected = new ArrayList<KeyValue>();
expected.add(new KeyValue(row2, fam1, col1, data));
@@ -175,25 +198,36 @@ public class TestKeyValueHeap extends HB
public void testScannerLeak() throws IOException {
// Test for unclosed scanners (HBASE-1927)
-
+ KeyValue startKV ;
+ Scanner scanner;
+
List<KeyValue> l1 = new ArrayList<KeyValue>();
- l1.add(new KeyValue(row1, fam1, col5, data));
+ startKV = new KeyValue(row1, fam1, col5, data);
+ l1.add(startKV);
l1.add(new KeyValue(row2, fam1, col1, data));
l1.add(new KeyValue(row2, fam1, col2, data));
- scanners.add(new Scanner(l1));
+ scanner = new Scanner(l1);
+ scanner.seek(startKV);
+ scanners.add(scanner);
List<KeyValue> l2 = new ArrayList<KeyValue>();
+ startKV = new KeyValue(row1, fam1, col1, data);
l2.add(new KeyValue(row1, fam1, col1, data));
l2.add(new KeyValue(row1, fam1, col2, data));
- scanners.add(new Scanner(l2));
+ scanner = new Scanner(l2);
+ scanner.seek(startKV);
+ scanners.add(scanner);
List<KeyValue> l3 = new ArrayList<KeyValue>();
+ startKV = new KeyValue(row1, fam1, col3, data);
l3.add(new KeyValue(row1, fam1, col3, data));
l3.add(new KeyValue(row1, fam1, col4, data));
l3.add(new KeyValue(row1, fam2, col1, data));
l3.add(new KeyValue(row1, fam2, col2, data));
l3.add(new KeyValue(row2, fam1, col3, data));
- scanners.add(new Scanner(l3));
+ scanner = new Scanner(l3);
+ scanner.seek(startKV);
+ scanners.add(scanner);
List<KeyValue> l4 = new ArrayList<KeyValue>();
scanners.add(new Scanner(l4));
@@ -203,66 +237,15 @@ public class TestKeyValueHeap extends HB
while(kvh.next() != null);
- for(KeyValueScanner scanner : scanners) {
- assertTrue(((Scanner)scanner).isClosed());
+ for(KeyValueScanner kvScanner : scanners) {
+ assertTrue(((Scanner)kvScanner).isClosed());
}
}
- private static class Scanner extends NonLazyKeyValueScanner {
- private Iterator<KeyValue> iter;
- private KeyValue current;
- private boolean closed = false;
-
+ private static class Scanner extends CollectionBackedScanner {
+
public Scanner(List<KeyValue> list) {
- Collections.sort(list, KeyValue.COMPARATOR);
- iter = list.iterator();
- if(iter.hasNext()){
- current = iter.next();
- }
- }
-
- public KeyValue peek() {
- return current;
- }
-
- public KeyValue next() {
- KeyValue oldCurrent = current;
- if(iter.hasNext()){
- current = iter.next();
- } else {
- current = null;
- }
- return oldCurrent;
- }
-
- public void close(){
- closed = true;
- }
-
- public boolean isClosed() {
- return closed;
- }
-
- public boolean seek(KeyValue seekKv) {
- while(iter.hasNext()){
- KeyValue next = iter.next();
- int ret = KeyValue.COMPARATOR.compare(next, seekKv);
- if(ret >= 0){
- current = next;
- return true;
- }
- }
- return false;
- }
-
- @Override
- public boolean reseek(KeyValue key) throws IOException {
- return seek(key);
- }
-
- @Override
- public long getSequenceID() {
- return 0;
+ super(list,KeyValue.COMPARATOR);
}
}
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=1183570&r1=1183569&r2=1183570&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java Sat Oct 15 00:04:48 2011
@@ -92,6 +92,8 @@ public class TestStore extends TestCase
private static final String DIR = HBaseTestingUtility.getTestDir() + "/TestStore/";
+ private static final int MAX_VERSION = 4;
+
/**
* Setup
* @throws IOException
@@ -121,8 +123,10 @@ public class TestStore extends TestCase
Path logdir = new Path(DIR+methodName+"/logs");
Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME);
HColumnDescriptor hcd = new HColumnDescriptor(family);
+ // with HBASE-4241, lower versions are collected on flush
+ hcd.setMaxVersions(TestStore.MAX_VERSION);
FileSystem fs = FileSystem.get(conf);
-
+
fs.delete(logdir, true);
HTableDescriptor htd = new HTableDescriptor(table);
@@ -549,6 +553,8 @@ public class TestStore extends TestCase
this.store.add(kv);
}
+ assertEquals(Math.max(timestamps1.length, timestamps2.length),
+ TestStore.MAX_VERSION);
List<KeyValue> result;
Get get = new Get(Bytes.toBytes(1));
get.addColumn(family,qf1);