You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/08/25 18:35:15 UTC
svn commit: r1161634 - in /hbase/trunk: ./
src/main/java/org/apache/hadoop/hbase/regionserver/
src/main/java/org/apache/hadoop/hbase/util/
src/test/java/org/apache/hadoop/hbase/regionserver/
Author: stack
Date: Thu Aug 25 16:35:15 2011
New Revision: 1161634
URL: http://svn.apache.org/viewvc?rev=1161634&view=rev
Log:
HBASE-4241 Optimize flushing of the Memstore
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueScanFixture.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1161634&r1=1161633&r2=1161634&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Thu Aug 25 16:35:15 2011
@@ -403,6 +403,7 @@ Release 0.91.0 - Unreleased
HBASE-4199 blockCache summary - backend (Doug Meil)
HBASE-4240 Allow Loadbalancer to be pluggable
HBASE-4244 Refactor bin/hbase help
+ HBASE-4241 Optimize flushing of the Memstore (Lars Hofhansl)
TASKS
HBASE-3559 Move report of split to master OFF the heartbeat channel
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1161634&r1=1161633&r2=1161634&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Aug 25 16:35:15 2011
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.monitorin
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.CollectionBackedScanner;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions;
@@ -90,6 +91,7 @@ public class Store implements HeapSize {
// ttl in milliseconds.
protected long ttl;
protected int minVersions;
+ protected int maxVersions;
long majorCompactionTime;
private final int minFilesToCompact;
private final int maxFilesToCompact;
@@ -178,6 +180,7 @@ public class Store implements HeapSize {
this.ttl *= 1000;
}
this.minVersions = family.getMinVersions();
+ this.maxVersions = family.getMaxVersions();
this.memstore = new MemStore(conf, this.comparator);
this.storeNameStr = Bytes.toString(this.family.getName());
@@ -481,34 +484,45 @@ public class Store implements HeapSize {
if (set.size() == 0) {
return null;
}
- long oldestTimestamp = System.currentTimeMillis() - ttl;
- // TODO: We can fail in the below block before we complete adding this
- // flush to list of store files. Add cleanup of anything put on filesystem
- // if we fail.
- synchronized (flushLock) {
- status.setStatus("Flushing " + this + ": creating writer");
- // A. Write the map out to the disk
- writer = createWriterInTmp(set.size());
- writer.setTimeRangeTracker(snapshotTimeRangeTracker);
- try {
- for (KeyValue kv: set) {
- // If minVersion > 0 we will wait until the next compaction to
- // collect expired KVs. (following the logic for maxVersions).
- // TODO: As Jonathan Gray points this can be optimized
- // (see HBASE-4241)
- if (minVersions > 0 || !isExpired(kv, oldestTimestamp)) {
- writer.append(kv);
- flushed += this.memstore.heapSizeChange(kv, true);
+ Scan scan = new Scan();
+ scan.setMaxVersions(maxVersions);
+ // Use a store scanner to find which rows to flush.
+ // Note that we need to retain deletes, hence
+ // pass true as the StoreScanner's retainDeletesInOutput argument.
+ InternalScanner scanner = new StoreScanner(this, scan,
+ Collections.singletonList(new CollectionBackedScanner(set,
+ this.comparator)), true);
+ try {
+ // TODO: We can fail in the below block before we complete adding this
+ // flush to list of store files. Add cleanup of anything put on filesystem
+ // if we fail.
+ synchronized (flushLock) {
+ status.setStatus("Flushing " + this + ": creating writer");
+ // A. Write the map out to the disk
+ writer = createWriterInTmp(set.size());
+ writer.setTimeRangeTracker(snapshotTimeRangeTracker);
+ try {
+ List<KeyValue> kvs = new ArrayList<KeyValue>();
+ while (scanner.next(kvs)) {
+ if (!kvs.isEmpty()) {
+ for (KeyValue kv : kvs) {
+ writer.append(kv);
+ flushed += this.memstore.heapSizeChange(kv, true);
+ }
+ kvs.clear();
+ }
}
+ } finally {
+ // Write out the log sequence number that corresponds to this output
+ // hfile. The hfile is current up to and including logCacheFlushId.
+ status.setStatus("Flushing " + this + ": appending metadata");
+ writer.appendMetadata(logCacheFlushId, false);
+ status.setStatus("Flushing " + this + ": closing flushed file");
+ writer.close();
}
- } finally {
- // Write out the log sequence number that corresponds to this output
- // hfile. The hfile is current up to and including logCacheFlushId.
- status.setStatus("Flushing " + this + ": appending metadata");
- writer.appendMetadata(logCacheFlushId, false);
- status.setStatus("Flushing " + this + ": closing flushed file");
- writer.close();
}
+ } finally {
+ scanner.close();
}
// Write-out finished successfully, move into the right spot
@@ -1734,7 +1748,7 @@ public class Store implements HeapSize {
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT + (15 * ClassSize.REFERENCE) +
(8 * Bytes.SIZEOF_LONG) + (1 * Bytes.SIZEOF_DOUBLE) +
- (5 * Bytes.SIZEOF_INT) + (3 * Bytes.SIZEOF_BOOLEAN));
+ (6 * Bytes.SIZEOF_INT) + (3 * Bytes.SIZEOF_BOOLEAN));
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
ClassSize.OBJECT + ClassSize.REENTRANT_LOCK +
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java?rev=1161634&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java Thu Aug 25 16:35:15 2011
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.SortedSet;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+
+/**
+ * Utility scanner that wraps a sortable collection and serves
+ * as a KeyValueScanner.
+ */
+public class CollectionBackedScanner implements KeyValueScanner {
+ final private Iterable<KeyValue> data;
+ final KeyValue.KVComparator comparator;
+ private Iterator<KeyValue> iter;
+ private KeyValue current;
+
+ public CollectionBackedScanner(SortedSet<KeyValue> set) {
+ this(set, KeyValue.COMPARATOR);
+ }
+
+ public CollectionBackedScanner(SortedSet<KeyValue> set,
+ KeyValue.KVComparator comparator) {
+ this.comparator = comparator;
+ data = set;
+ init();
+ }
+
+ public CollectionBackedScanner(List<KeyValue> list) {
+ this(list, KeyValue.COMPARATOR);
+ }
+
+ public CollectionBackedScanner(List<KeyValue> list,
+ KeyValue.KVComparator comparator) {
+ Collections.sort(list, comparator);
+ this.comparator = comparator;
+ data = list;
+ init();
+ }
+
+ public CollectionBackedScanner(KeyValue.KVComparator comparator,
+ KeyValue... array) {
+ this.comparator = comparator;
+
+ List<KeyValue> tmp = new ArrayList<KeyValue>(array.length);
+ for( int i = 0; i < array.length ; ++i) {
+ tmp.add(array[i]);
+ }
+ Collections.sort(tmp, comparator);
+ data = tmp;
+ init();
+ }
+
+ private void init() {
+ iter = data.iterator();
+ if(iter.hasNext()){
+ current = iter.next();
+ }
+ }
+
+ @Override
+ public KeyValue peek() {
+ return current;
+ }
+
+ @Override
+ public KeyValue next() {
+ KeyValue oldCurrent = current;
+ if(iter.hasNext()){
+ current = iter.next();
+ } else {
+ current = null;
+ }
+ return oldCurrent;
+ }
+
+ @Override
+ public boolean seek(KeyValue seekKv) {
+ // restart iterator
+ iter = data.iterator();
+ return reseek(seekKv);
+ }
+
+ @Override
+ public boolean reseek(KeyValue seekKv) {
+ while(iter.hasNext()){
+ KeyValue next = iter.next();
+ int ret = comparator.compare(next, seekKv);
+ if(ret >= 0){
+ current = next;
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public long getSequenceID() {
+ return 0;
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java?rev=1161634&r1=1161633&r2=1161634&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java Thu Aug 25 16:35:15 2011
@@ -21,11 +21,10 @@
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.util.CollectionBackedScanner;
import org.apache.hadoop.hbase.KeyValue;
import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Collections;
import java.util.List;
/**
@@ -34,21 +33,10 @@ import java.util.List;
* to the provided comparator, and then the whole thing pretends
* to be a store file scanner.
*/
-public class KeyValueScanFixture implements KeyValueScanner {
- ArrayList<KeyValue> data;
- Iterator<KeyValue> iter = null;
- KeyValue current = null;
- KeyValue.KVComparator comparator;
-
+public class KeyValueScanFixture extends CollectionBackedScanner {
public KeyValueScanFixture(KeyValue.KVComparator comparator,
KeyValue... incData) {
- this.comparator = comparator;
-
- data = new ArrayList<KeyValue>(incData.length);
- for( int i = 0; i < incData.length ; ++i) {
- data.add(incData[i]);
- }
- Collections.sort(data, this.comparator);
+ super(comparator, incData);
}
public static List<KeyValueScanner> scanFixture(KeyValue[] ... kvArrays) {
@@ -58,54 +46,4 @@ public class KeyValueScanFixture impleme
}
return scanners;
}
-
-
- @Override
- public KeyValue peek() {
- return this.current;
- }
-
- @Override
- public KeyValue next() {
- KeyValue res = current;
-
- if (iter.hasNext())
- current = iter.next();
- else
- current = null;
- return res;
- }
-
- @Override
- public boolean seek(KeyValue key) {
- // start at beginning.
- iter = data.iterator();
- int cmp;
- KeyValue kv = null;
- do {
- if (!iter.hasNext()) {
- current = null;
- return false;
- }
- kv = iter.next();
- cmp = comparator.compare(key, kv);
- } while (cmp > 0);
- current = kv;
- return true;
- }
-
- @Override
- public boolean reseek(KeyValue key) {
- return seek(key);
- }
-
- @Override
- public void close() {
- // noop.
- }
-
- @Override
- public long getSequenceID() {
- return 0;
- }
}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java?rev=1161634&r1=1161633&r2=1161634&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java Thu Aug 25 16:35:15 2011
@@ -29,6 +29,7 @@ import java.util.List;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CollectionBackedScanner;
public class TestKeyValueHeap extends HBaseTestCase {
@@ -208,31 +209,11 @@ public class TestKeyValueHeap extends HB
}
}
- private static class Scanner implements KeyValueScanner {
- private Iterator<KeyValue> iter;
- private KeyValue current;
+ private static class Scanner extends CollectionBackedScanner {
private boolean closed = false;
public Scanner(List<KeyValue> list) {
- Collections.sort(list, KeyValue.COMPARATOR);
- iter = list.iterator();
- if(iter.hasNext()){
- current = iter.next();
- }
- }
-
- public KeyValue peek() {
- return current;
- }
-
- public KeyValue next() {
- KeyValue oldCurrent = current;
- if(iter.hasNext()){
- current = iter.next();
- } else {
- current = null;
- }
- return oldCurrent;
+ super(list);
}
public void close(){
@@ -242,28 +223,6 @@ public class TestKeyValueHeap extends HB
public boolean isClosed() {
return closed;
}
-
- public boolean seek(KeyValue seekKv) {
- while(iter.hasNext()){
- KeyValue next = iter.next();
- int ret = KeyValue.COMPARATOR.compare(next, seekKv);
- if(ret >= 0){
- current = next;
- return true;
- }
- }
- return false;
- }
-
- @Override
- public boolean reseek(KeyValue key) throws IOException {
- return seek(key);
- }
-
- @Override
- public long getSequenceID() {
- return 0;
- }
}
}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueScanFixture.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueScanFixture.java?rev=1161634&r1=1161633&r2=1161634&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueScanFixture.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueScanFixture.java Thu Aug 25 16:35:15 2011
@@ -42,8 +42,6 @@ public class TestKeyValueScanFixture ext
KeyValueScanner scan = new KeyValueScanFixture(
KeyValue.COMPARATOR, kvs);
- // test simple things.
- assertNull(scan.peek());
KeyValue kv = KeyValue.createFirstOnRow(Bytes.toBytes("RowA"));
// should seek to this:
assertTrue(scan.seek(kv));
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java?rev=1161634&r1=1161633&r2=1161634&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java Thu Aug 25 16:35:15 2011
@@ -315,14 +315,12 @@ public class TestMinVersions extends HBa
// now flush
region.flushcache();
- region.compactStores();
- // oldest version still exists
- // flushing/minor compactions can't get rid of these, anymore
+ // with HBASE-4241 a flush will eliminate the expired rows
g = new Get(T1);
g.setTimeRange(0L, ts-2);
r = region.get(g, null);
- checkResult(r, c0, T1);
+ assertTrue(r.isEmpty());
// major compaction
region.compactStores(true);
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=1161634&r1=1161633&r2=1161634&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java Thu Aug 25 16:35:15 2011
@@ -122,6 +122,9 @@ public class TestStore extends TestCase
Path logdir = new Path(DIR+methodName+"/logs");
Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME);
HColumnDescriptor hcd = new HColumnDescriptor(family);
+ // some of the tests write 4 versions and then flush
+ // (with HBASE-4241, lower versions are collected on flush)
+ hcd.setMaxVersions(4);
FileSystem fs = FileSystem.get(conf);
fs.delete(logdir, true);