You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/08/16 19:41:00 UTC
svn commit: r1373943 - in /hbase/trunk:
hbase-common/src/main/java/org/apache/hadoop/hbase/
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/
Author: larsh
Date: Thu Aug 16 17:40:59 2012
New Revision: 1373943
URL: http://svn.apache.org/viewvc?rev=1373943&view=rev
Log:
HBASE-6561 Gets/Puts with many columns send the RegionServer into an 'endless' loop
Modified:
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1373943&r1=1373942&r2=1373943&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java Thu Aug 16 17:40:59 2012
@@ -202,6 +202,9 @@ public final class HConstants {
/** Parameter name for how often a region should should perform a major compaction */
public static final String MAJOR_COMPACTION_PERIOD = "hbase.hregion.majorcompaction";
+ /** Parameter name for the maximum batch of KVs to be used in flushes and compactions */
+ public static final String COMPACTION_KV_MAX = "hbase.hstore.compaction.kv.max";
+
/** Parameter name for HBase instance root directory */
public static final String HBASE_DIR = "hbase.rootdir";
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java?rev=1373943&r1=1373942&r2=1373943&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java Thu Aug 16 17:40:59 2012
@@ -110,7 +110,7 @@ class Compactor extends Configured {
.getScannersForStoreFiles(filesToCompact, false, false, true);
// Get some configs
- int compactionKVMax = getConf().getInt("hbase.hstore.compaction.kv.max", 10);
+ int compactionKVMax = getConf().getInt(HConstants.COMPACTION_KV_MAX, 10);
Compression.Algorithm compression = store.getFamily().getCompression();
// Avoid overriding compression setting for major compactions if the user
// has not specified it separately
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1373943&r1=1373942&r2=1373943&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Thu Aug 16 17:40:59 2012
@@ -674,6 +674,10 @@ public class MemStore implements HeapSiz
private KeyValue kvsetNextRow = null;
private KeyValue snapshotNextRow = null;
+ // last iterated KVs for kvset and snapshot (to restore iterator state after reseek)
+ private KeyValue kvsetItRow = null;
+ private KeyValue snapshotItRow = null;
+
// iterator based scanning.
private Iterator<KeyValue> kvsetIt;
private Iterator<KeyValue> snapshotIt;
@@ -682,10 +686,6 @@ public class MemStore implements HeapSiz
private KeyValueSkipListSet kvsetAtCreation;
private KeyValueSkipListSet snapshotAtCreation;
- // Sub lists on which we're iterating
- private SortedSet<KeyValue> kvTail;
- private SortedSet<KeyValue> snapshotTail;
-
// the pre-calculated KeyValue to be returned by peek() or next()
private KeyValue theNext;
@@ -717,17 +717,29 @@ public class MemStore implements HeapSiz
snapshotAtCreation = snapshot;
}
- protected KeyValue getNext(Iterator<KeyValue> it) {
+ private KeyValue getNext(Iterator<KeyValue> it) {
long readPoint = MultiVersionConsistencyControl.getThreadReadPoint();
- while (it.hasNext()) {
- KeyValue v = it.next();
- if (v.getMemstoreTS() <= readPoint) {
- return v;
+ KeyValue v = null;
+ try {
+ while (it.hasNext()) {
+ v = it.next();
+ if (v.getMemstoreTS() <= readPoint) {
+ return v;
+ }
}
- }
- return null;
+ return null;
+ } finally {
+ if (v != null) {
+ // in all cases, remember the last KV iterated to
+ if (it == snapshotIt) {
+ snapshotItRow = v;
+ } else {
+ kvsetItRow = v;
+ }
+ }
+ }
}
/**
@@ -746,8 +758,10 @@ public class MemStore implements HeapSiz
// kvset and snapshot will never be null.
// if tailSet can't find anything, SortedSet is empty (not null).
- kvTail = kvsetAtCreation.tailSet(key);
- snapshotTail = snapshotAtCreation.tailSet(key);
+ kvsetIt = kvsetAtCreation.tailSet(key).iterator();
+ snapshotIt = snapshotAtCreation.tailSet(key).iterator();
+ kvsetItRow = null;
+ snapshotItRow = null;
return seekInSubLists(key);
}
@@ -757,9 +771,6 @@ public class MemStore implements HeapSiz
* (Re)initialize the iterators after a seek or a reseek.
*/
private synchronized boolean seekInSubLists(KeyValue key){
- kvsetIt = kvTail.iterator();
- snapshotIt = snapshotTail.iterator();
-
kvsetNextRow = getNext(kvsetIt);
snapshotNextRow = getNext(snapshotIt);
@@ -779,25 +790,20 @@ public class MemStore implements HeapSiz
@Override
public synchronized boolean reseek(KeyValue key) {
/*
- See HBASE-4195 & HBASE-3855 for the background on this implementation.
+ See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
This code is executed concurrently with flush and puts, without locks.
Two points must be known when working on this code:
1) It's not possible to use the 'kvTail' and 'snapshot'
variables, as they are modified during a flush.
- 2) The ideal implementation for performances would use the sub skip list
+ 2) The ideal implementation for performance would use the sub skip list
implicitly pointed by the iterators 'kvsetIt' and
'snapshotIt'. Unfortunately the Java API does not offer a method to
- get it. So we're using the skip list that we kept when we created
- the iterators. As these iterators could have been moved forward after
- their creation, we're doing a kind of rewind here. It has a small
- performance impact (we're using a wider list than necessary), and we
- could see values that were not here when we read the list the first
- time. We expect that the new values will be skipped by the test on
- readpoint performed in the next() function.
+ get it. So we remember the last keys we iterated to and restore
+ the reseeked set to at least that point.
*/
- kvTail = kvTail.tailSet(key);
- snapshotTail = snapshotTail.tailSet(key);
+ kvsetIt = kvsetAtCreation.tailSet(getHighest(key, kvsetItRow)).iterator();
+ snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator();
return seekInSubLists(key);
}
@@ -838,7 +844,7 @@ public class MemStore implements HeapSiz
* This uses comparator.compare() to compare the KeyValue using the memstore
* comparator.
*/
- protected KeyValue getLowest(KeyValue first, KeyValue second) {
+ private KeyValue getLowest(KeyValue first, KeyValue second) {
if (first == null && second == null) {
return null;
}
@@ -849,12 +855,31 @@ public class MemStore implements HeapSiz
return (first != null ? first : second);
}
+ /*
+ * Returns the higher of the two key values, or null if they are both null.
+ * This uses comparator.compare() to compare the KeyValue using the memstore
+ * comparator.
+ */
+ private KeyValue getHighest(KeyValue first, KeyValue second) {
+ if (first == null && second == null) {
+ return null;
+ }
+ if (first != null && second != null) {
+ int compare = comparator.compare(first, second);
+ return (compare > 0 ? first : second);
+ }
+ return (first != null ? first : second);
+ }
+
public synchronized void close() {
this.kvsetNextRow = null;
this.snapshotNextRow = null;
this.kvsetIt = null;
this.snapshotIt = null;
+
+ this.kvsetItRow = null;
+ this.snapshotItRow = null;
}
/**
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1373943&r1=1373942&r2=1373943&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Aug 16 17:40:59 2012
@@ -705,9 +705,9 @@ public class Store extends SchemaConfigu
if (scanner == null) {
Scan scan = new Scan();
scan.setMaxVersions(scanInfo.getMaxVersions());
- scanner = new StoreScanner(this, scanInfo, scan, Collections.singletonList(new CollectionBackedScanner(
- set, this.comparator)), ScanType.MINOR_COMPACT, this.region.getSmallestReadPoint(),
- HConstants.OLDEST_TIMESTAMP);
+ scanner = new StoreScanner(this, scanInfo, scan,
+ Collections.singletonList(memstoreScanner), ScanType.MINOR_COMPACT,
+ this.region.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
}
if (getHRegion().getCoprocessorHost() != null) {
InternalScanner cpScanner =
@@ -719,6 +719,7 @@ public class Store extends SchemaConfigu
scanner = cpScanner;
}
try {
+ int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
// TODO: We can fail in the below block before we complete adding this
// flush to list of store files. Add cleanup of anything put on filesystem
// if we fail.
@@ -732,7 +733,7 @@ public class Store extends SchemaConfigu
List<KeyValue> kvs = new ArrayList<KeyValue>();
boolean hasMore;
do {
- hasMore = scanner.next(kvs);
+ hasMore = scanner.next(kvs, compactionKVMax);
if (!kvs.isEmpty()) {
for (KeyValue kv : kvs) {
// If we know that this KV is going to be included always, then let us