You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/08/16 19:41:00 UTC

svn commit: r1373943 - in /hbase/trunk: hbase-common/src/main/java/org/apache/hadoop/hbase/ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/

Author: larsh
Date: Thu Aug 16 17:40:59 2012
New Revision: 1373943

URL: http://svn.apache.org/viewvc?rev=1373943&view=rev
Log:
HBASE-6561 Gets/Puts with many columns send the RegionServer into an 'endless' loop

Modified:
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1373943&r1=1373942&r2=1373943&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java Thu Aug 16 17:40:59 2012
@@ -202,6 +202,9 @@ public final class HConstants {
   /** Parameter name for how often a region should should perform a major compaction */
   public static final String MAJOR_COMPACTION_PERIOD = "hbase.hregion.majorcompaction";
 
+  /** Parameter name for the maximum batch of KVs to be used in flushes and compactions */
+  public static final String COMPACTION_KV_MAX = "hbase.hstore.compaction.kv.max";
+
   /** Parameter name for HBase instance root directory */
   public static final String HBASE_DIR = "hbase.rootdir";
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java?rev=1373943&r1=1373942&r2=1373943&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java Thu Aug 16 17:40:59 2012
@@ -110,7 +110,7 @@ class Compactor extends Configured {
       .getScannersForStoreFiles(filesToCompact, false, false, true);
 
     // Get some configs
-    int compactionKVMax = getConf().getInt("hbase.hstore.compaction.kv.max", 10);
+    int compactionKVMax = getConf().getInt(HConstants.COMPACTION_KV_MAX, 10);
     Compression.Algorithm compression = store.getFamily().getCompression();
     // Avoid overriding compression setting for major compactions if the user
     // has not specified it separately

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1373943&r1=1373942&r2=1373943&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Thu Aug 16 17:40:59 2012
@@ -674,6 +674,10 @@ public class MemStore implements HeapSiz
     private KeyValue kvsetNextRow = null;
     private KeyValue snapshotNextRow = null;
 
+    // last iterated KVs for kvset and snapshot (to restore iterator state after reseek)
+    private KeyValue kvsetItRow = null;
+    private KeyValue snapshotItRow = null;
+    
     // iterator based scanning.
     private Iterator<KeyValue> kvsetIt;
     private Iterator<KeyValue> snapshotIt;
@@ -682,10 +686,6 @@ public class MemStore implements HeapSiz
     private KeyValueSkipListSet kvsetAtCreation;
     private KeyValueSkipListSet snapshotAtCreation;
 
-    // Sub lists on which we're iterating
-    private SortedSet<KeyValue> kvTail;
-    private SortedSet<KeyValue> snapshotTail;
-
     // the pre-calculated KeyValue to be returned by peek() or next()
     private KeyValue theNext;
 
@@ -717,17 +717,29 @@ public class MemStore implements HeapSiz
       snapshotAtCreation = snapshot;
     }
 
-    protected KeyValue getNext(Iterator<KeyValue> it) {
+    private KeyValue getNext(Iterator<KeyValue> it) {
       long readPoint = MultiVersionConsistencyControl.getThreadReadPoint();
 
-      while (it.hasNext()) {
-        KeyValue v = it.next();
-        if (v.getMemstoreTS() <= readPoint) {
-          return v;
+      KeyValue v = null;
+      try {
+        while (it.hasNext()) {
+          v = it.next();
+          if (v.getMemstoreTS() <= readPoint) {
+            return v;
+          }
         }
-      }
 
-      return null;
+        return null;
+      } finally {
+        if (v != null) {
+          // in all cases, remember the last KV iterated to
+          if (it == snapshotIt) {
+            snapshotItRow = v;
+          } else {
+            kvsetItRow = v;
+          }
+        }
+      }
     }
 
     /**
@@ -746,8 +758,10 @@ public class MemStore implements HeapSiz
 
       // kvset and snapshot will never be null.
       // if tailSet can't find anything, SortedSet is empty (not null).
-      kvTail = kvsetAtCreation.tailSet(key);
-      snapshotTail = snapshotAtCreation.tailSet(key);
+      kvsetIt = kvsetAtCreation.tailSet(key).iterator();
+      snapshotIt = snapshotAtCreation.tailSet(key).iterator();
+      kvsetItRow = null;
+      snapshotItRow = null;
 
       return seekInSubLists(key);
     }
@@ -757,9 +771,6 @@ public class MemStore implements HeapSiz
      * (Re)initialize the iterators after a seek or a reseek.
      */
     private synchronized boolean seekInSubLists(KeyValue key){
-      kvsetIt = kvTail.iterator();
-      snapshotIt = snapshotTail.iterator();
-
       kvsetNextRow = getNext(kvsetIt);
       snapshotNextRow = getNext(snapshotIt);
 
@@ -779,25 +790,20 @@ public class MemStore implements HeapSiz
     @Override
     public synchronized boolean reseek(KeyValue key) {
       /*
-      See HBASE-4195 & HBASE-3855 for the background on this implementation.
+      See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
       This code is executed concurrently with flush and puts, without locks.
       Two points must be known when working on this code:
       1) It's not possible to use the 'kvTail' and 'snapshot'
        variables, as they are modified during a flush.
-      2) The ideal implementation for performances would use the sub skip list
+      2) The ideal implementation for performance would use the sub skip list
        implicitly pointed by the iterators 'kvsetIt' and
        'snapshotIt'. Unfortunately the Java API does not offer a method to
-       get it. So we're using the skip list that we kept when we created
-       the iterators. As these iterators could have been moved forward after
-       their creation, we're doing a kind of rewind here. It has a small
-       performance impact (we're using a wider list than necessary), and we
-       could see values that were not here when we read the list the first
-       time. We expect that the new values will be skipped by the test on
-       readpoint performed in the next() function.
+       get it. So we remember the last keys we iterated to and restore
+       the reseeked set to at least that point.
        */
 
-      kvTail = kvTail.tailSet(key);
-      snapshotTail = snapshotTail.tailSet(key);
+      kvsetIt = kvsetAtCreation.tailSet(getHighest(key, kvsetItRow)).iterator();
+      snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator();
 
       return seekInSubLists(key);
     }
@@ -838,7 +844,7 @@ public class MemStore implements HeapSiz
      * This uses comparator.compare() to compare the KeyValue using the memstore
      * comparator.
      */
-    protected KeyValue getLowest(KeyValue first, KeyValue second) {
+    private KeyValue getLowest(KeyValue first, KeyValue second) {
       if (first == null && second == null) {
         return null;
       }
@@ -849,12 +855,31 @@ public class MemStore implements HeapSiz
       return (first != null ? first : second);
     }
 
+    /*
+     * Returns the higher of the two key values, or null if they are both null.
+     * This uses comparator.compare() to compare the KeyValue using the memstore
+     * comparator.
+     */
+    private KeyValue getHighest(KeyValue first, KeyValue second) {
+      if (first == null && second == null) {
+        return null;
+      }
+      if (first != null && second != null) {
+        int compare = comparator.compare(first, second);
+        return (compare > 0 ? first : second);
+      }
+      return (first != null ? first : second);
+    }
+
     public synchronized void close() {
       this.kvsetNextRow = null;
       this.snapshotNextRow = null;
 
       this.kvsetIt = null;
       this.snapshotIt = null;
+
+      this.kvsetItRow = null;
+      this.snapshotItRow = null;
     }
 
     /**

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1373943&r1=1373942&r2=1373943&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Aug 16 17:40:59 2012
@@ -705,9 +705,9 @@ public class Store extends SchemaConfigu
     if (scanner == null) {
       Scan scan = new Scan();
       scan.setMaxVersions(scanInfo.getMaxVersions());
-      scanner = new StoreScanner(this, scanInfo, scan, Collections.singletonList(new CollectionBackedScanner(
-          set, this.comparator)), ScanType.MINOR_COMPACT, this.region.getSmallestReadPoint(),
-          HConstants.OLDEST_TIMESTAMP);
+      scanner = new StoreScanner(this, scanInfo, scan,
+          Collections.singletonList(memstoreScanner), ScanType.MINOR_COMPACT,
+          this.region.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
     }
     if (getHRegion().getCoprocessorHost() != null) {
       InternalScanner cpScanner =
@@ -719,6 +719,7 @@ public class Store extends SchemaConfigu
       scanner = cpScanner;
     }
     try {
+      int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
       // TODO:  We can fail in the below block before we complete adding this
       // flush to list of store files.  Add cleanup of anything put on filesystem
       // if we fail.
@@ -732,7 +733,7 @@ public class Store extends SchemaConfigu
           List<KeyValue> kvs = new ArrayList<KeyValue>();
           boolean hasMore;
           do {
-            hasMore = scanner.next(kvs);
+            hasMore = scanner.next(kvs, compactionKVMax);
             if (!kvs.isEmpty()) {
               for (KeyValue kv : kvs) {
                 // If we know that this KV is going to be included always, then let us