You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2015/06/06 04:33:02 UTC

hbase git commit: HBASE-13827 Delayed scanner close in KeyValueHeap and StoreScanner.

Repository: hbase
Updated Branches:
  refs/heads/master c1be65ecf -> fef6d7f48


HBASE-13827 Delayed scanner close in KeyValueHeap and StoreScanner.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fef6d7f4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fef6d7f4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fef6d7f4

Branch: refs/heads/master
Commit: fef6d7f48c81d63b12be4f53031bdbf208635cac
Parents: c1be65e
Author: anoopsjohn <an...@gmail.com>
Authored: Sat Jun 6 08:02:35 2015 +0530
Committer: anoopsjohn <an...@gmail.com>
Committed: Sat Jun 6 08:02:35 2015 +0530

----------------------------------------------------------------------
 .../hadoop/hbase/io/HalfStoreFileReader.java    |  5 ++
 .../hadoop/hbase/io/hfile/HFileReaderImpl.java  |  5 ++
 .../hadoop/hbase/io/hfile/HFileScanner.java     |  5 ++
 .../hadoop/hbase/regionserver/KeyValueHeap.java | 22 ++++++---
 .../regionserver/ReversedKeyValueHeap.java      |  6 +--
 .../hbase/regionserver/StoreFileScanner.java    | 18 +++----
 .../hadoop/hbase/regionserver/StoreScanner.java | 49 +++++++++++++-------
 .../hbase/regionserver/TestKeyValueHeap.java    | 21 +++++++--
 8 files changed, 88 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/fef6d7f4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
index a95da7b..78c6734 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
@@ -288,6 +288,11 @@ public class HalfStoreFileReader extends StoreFile.Reader {
       public Cell getNextIndexedKey() {
         return null;
       }
+
+      @Override
+      public void close() {
+        this.delegate.close();
+      }
     };
   }
   

http://git-wip-us.apache.org/repos/asf/hbase/blob/fef6d7f4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index 4d1881d..d184d42 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -1077,6 +1077,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
           new KeyValue.KeyOnlyKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
               + blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen));
     }
+
+    @Override
+    public void close() {
+      // HBASE-12295 will add code here.
+    }
   }
 
   public Path getPath() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/fef6d7f4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
index 2b6e011..6b527f6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
@@ -149,4 +149,9 @@ public interface HFileScanner {
    * @return the next key in the index (the key to seek to the next block)
    */
   Cell getNextIndexedKey();
+
+  /**
+   * Close this HFile scanner and do necessary cleanup.
+   */
+  void close();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/fef6d7f4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
index 9220d07..a12e7c3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
@@ -21,8 +21,10 @@ package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.List;
 import java.util.PriorityQueue;
+import java.util.Set;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
@@ -59,7 +61,9 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
   protected KeyValueScanner current = null;
 
   protected KVScannerComparator comparator;
-  
+
+  protected Set<KeyValueScanner> scannersForDelayedClose = new HashSet<KeyValueScanner>();
+
   /**
    * Constructor.  This KeyValueHeap will handle closing of passed in
    * KeyValueScanners.
@@ -87,7 +91,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
         if (scanner.peek() != null) {
           this.heap.add(scanner);
         } else {
-          scanner.close();
+          this.scannersForDelayedClose.add(scanner);
         }
       }
       this.current = pollRealKV();
@@ -108,7 +112,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
     Cell kvReturn = this.current.next();
     Cell kvNext = this.current.peek();
     if (kvNext == null) {
-      this.current.close();
+      this.scannersForDelayedClose.add(this.current);
       this.current = pollRealKV();
     } else {
       KeyValueScanner topScanner = this.heap.peek();
@@ -154,7 +158,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
      */
 
     if (pee == null || !moreCells) {
-      this.current.close();
+      this.scannersForDelayedClose.add(this.current);
     } else {
       this.heap.add(this.current);
     }
@@ -210,6 +214,10 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
   }
 
   public void close() {
+    for (KeyValueScanner scanner : this.scannersForDelayedClose) {
+      scanner.close();
+    }
+    this.scannersForDelayedClose.clear();
     if (this.current != null) {
       this.current.close();
     }
@@ -311,7 +319,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
       }
 
       if (!seekResult) {
-        scanner.close();
+        this.scannersForDelayedClose.add(scanner);
       } else {
         heap.add(scanner);
       }
@@ -364,12 +372,12 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
         } else {
           // Close the scanner because we did a real seek and found out there
           // are no more KVs.
-          kvScanner.close();
+          this.scannersForDelayedClose.add(kvScanner);
         }
       } else {
         // Close the scanner because it has already run out of KVs even before
         // we had to do a real seek on it.
-        kvScanner.close();
+        this.scannersForDelayedClose.add(kvScanner);
       }
       kvScanner = heap.poll();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/fef6d7f4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java
index 5167b4e..6914132 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java
@@ -85,7 +85,7 @@ public class ReversedKeyValueHeap extends KeyValueHeap {
       }
 
       if (!scanner.seekToPreviousRow(seekKey)) {
-        scanner.close();
+        this.scannersForDelayedClose.add(scanner);
       } else {
         heap.add(scanner);
       }
@@ -114,7 +114,7 @@ public class ReversedKeyValueHeap extends KeyValueHeap {
         return current != null;
       }
       if (!scanner.backwardSeek(seekKey)) {
-        scanner.close();
+        this.scannersForDelayedClose.add(scanner);
       } else {
         heap.add(scanner);
       }
@@ -134,7 +134,7 @@ public class ReversedKeyValueHeap extends KeyValueHeap {
       if (this.current.seekToPreviousRow(kvReturn)) {
         this.heap.add(this.current);
       } else {
-        this.current.close();
+        this.scannersForDelayedClose.add(this.current);
       }
       this.current = pollRealKV();
     } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/fef6d7f4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index 42a378d..e7a5af4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -27,8 +27,6 @@ import java.util.List;
 import java.util.SortedSet;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
@@ -46,8 +44,6 @@ import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
  */
 @InterfaceAudience.LimitedPrivate("Coprocessor")
 public class StoreFileScanner implements KeyValueScanner {
-  private static final Log LOG = LogFactory.getLog(HStore.class);
-
   // the reader it comes from:
   private final StoreFile.Reader reader;
   private final HFileScanner hfs;
@@ -158,7 +154,7 @@ public class StoreFileScanner implements KeyValueScanner {
     try {
       try {
         if(!seekAtOrAfter(hfs, key)) {
-          close();
+          this.cur = null;
           return false;
         }
 
@@ -185,7 +181,7 @@ public class StoreFileScanner implements KeyValueScanner {
     try {
       try {
         if (!reseekAtOrAfter(hfs, key)) {
-          close();
+          this.cur = null;
           return false;
         }
         setCurrentCell(hfs.getKeyValue());
@@ -219,7 +215,7 @@ public class StoreFileScanner implements KeyValueScanner {
     Cell startKV = cur;
     while(enforceMVCC
         && cur != null
-        && (cur.getMvccVersion() > readPt)) {
+        && (cur.getSequenceId() > readPt)) {
       hfs.next();
       setCurrentCell(hfs.getKeyValue());
       if (this.stopSkippingKVsIfNextRow
@@ -229,7 +225,6 @@ public class StoreFileScanner implements KeyValueScanner {
     }
 
     if (cur == null) {
-      close();
       return false;
     }
 
@@ -237,8 +232,8 @@ public class StoreFileScanner implements KeyValueScanner {
   }
 
   public void close() {
-    // Nothing to close on HFileScanner?
     cur = null;
+    this.hfs.close();
   }
 
   /**
@@ -421,7 +416,6 @@ public class StoreFileScanner implements KeyValueScanner {
   }
 
   @Override
-  @SuppressWarnings("deprecation")
   public boolean seekToPreviousRow(Cell key) throws IOException {
     try {
       try {
@@ -429,7 +423,7 @@ public class StoreFileScanner implements KeyValueScanner {
             key.getRowLength());
         if (seekCount != null) seekCount.incrementAndGet();
         if (!hfs.seekBefore(seekKey)) {
-          close();
+          this.cur = null;
           return false;
         }
         KeyValue firstKeyOfPreviousRow = KeyValueUtil.createFirstOnRow(hfs.getKeyValue()
@@ -437,7 +431,7 @@ public class StoreFileScanner implements KeyValueScanner {
 
         if (seekCount != null) seekCount.incrementAndGet();
         if (!seekAtOrAfter(hfs, firstKeyOfPreviousRow)) {
-          close();
+          this.cur = null;
           return false;
         }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/fef6d7f4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index cbca57b..4be5c7b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -22,8 +22,10 @@ package org.apache.hadoop.hbase.regionserver;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.NavigableSet;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -84,6 +86,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   protected final long maxRowSize;
   protected final long cellsPerHeartbeatCheck;
 
+  protected Set<KeyValueHeap> heapsForDelayedClose = new HashSet<KeyValueHeap>();
+
   /**
    * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
    * KVs skipped via seeking to next row/column. TODO: estimate them?
@@ -437,17 +441,32 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
 
   @Override
   public void close() {
+    close(true);
+  }
+
+  private void close(boolean withHeapClose){
     lock.lock();
     try {
-    if (this.closing) return;
-    this.closing = true;
-    // under test, we dont have a this.store
-    if (this.store != null)
-      this.store.deleteChangedReaderObserver(this);
-    if (this.heap != null)
-      this.heap.close();
-    this.heap = null; // CLOSED!
-    this.lastTop = null; // If both are null, we are closed.
+      if (this.closing) return;
+      this.closing = true;
+      // under test, we dont have a this.store
+      if (this.store != null) this.store.deleteChangedReaderObserver(this);
+      if (withHeapClose) {
+        for (KeyValueHeap h : this.heapsForDelayedClose) {
+          h.close();
+        }
+        this.heapsForDelayedClose.clear();
+        if (this.heap != null) {
+          this.heap.close();
+          this.heap = null; // CLOSED!
+        }
+      } else {
+        if (this.heap != null) {
+          this.heapsForDelayedClose.add(this.heap);
+          this.heap = null;
+        }
+      }
+      this.lastTop = null; // If both are null, we are closed.
     } finally {
       lock.unlock();
     }
@@ -491,13 +510,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     // if the heap was left null, then the scanners had previously run out anyways, close and
     // return.
     if (this.heap == null) {
-      close();
+      close(false);// Do all cleanup except heap.close()
       return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
     }
 
     Cell peeked = this.heap.peek();
     if (peeked == null) {
-      close();
+      close(false);// Do all cleanup except heap.close()
       return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
     }
 
@@ -547,7 +566,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
 
           Filter f = matcher.getFilter();
           if (f != null) {
-            // TODO convert Scan Query Matcher to be Cell instead of KV based ?
             cell = f.transformCell(cell);
           }
 
@@ -604,7 +622,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
           return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
 
         case DONE_SCAN:
-          close();
+          close(false);// Do all cleanup except heap.close()
           return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
 
         case SEEK_NEXT_ROW:
@@ -626,7 +644,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
           break;
 
         case SEEK_NEXT_USING_HINT:
-          // TODO convert resee to Cell?
           Cell nextKV = matcher.getNextKeyHint(cell);
           if (nextKV != null) {
             seekAsDirection(nextKV);
@@ -645,7 +662,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     }
 
     // No more keys
-    close();
+    close(false);// Do all cleanup except heap.close()
     return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
     } finally {
       lock.unlock();
@@ -705,7 +722,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     //DebugPrint.println("SS updateReaders, topKey = " + lastTop);
 
     // close scanners to old obsolete Store files
-    this.heap.close(); // bubble thru and close all scanners.
+    this.heapsForDelayedClose.add(this.heap);// Don't close now. Delay it till StoreScanner#close
     this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
 
     // Let the next() call handle re-creating and seeking

http://git-wip-us.apache.org/repos/asf/hbase/blob/fef6d7f4/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
index 0fa904c..5b0ab3b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
@@ -190,12 +190,14 @@ public class TestKeyValueHeap extends HBaseTestCase {
     l1.add(new KeyValue(row1, fam1, col5, data));
     l1.add(new KeyValue(row2, fam1, col1, data));
     l1.add(new KeyValue(row2, fam1, col2, data));
-    scanners.add(new Scanner(l1));
+    Scanner s1 = new Scanner(l1);
+    scanners.add(s1);
 
     List<Cell> l2 = new ArrayList<Cell>();
     l2.add(new KeyValue(row1, fam1, col1, data));
     l2.add(new KeyValue(row1, fam1, col2, data));
-    scanners.add(new Scanner(l2));
+    Scanner s2 = new Scanner(l2);
+    scanners.add(s2);
 
     List<Cell> l3 = new ArrayList<Cell>();
     l3.add(new KeyValue(row1, fam1, col3, data));
@@ -203,16 +205,25 @@ public class TestKeyValueHeap extends HBaseTestCase {
     l3.add(new KeyValue(row1, fam2, col1, data));
     l3.add(new KeyValue(row1, fam2, col2, data));
     l3.add(new KeyValue(row2, fam1, col3, data));
-    scanners.add(new Scanner(l3));
+    Scanner s3 = new Scanner(l3);
+    scanners.add(s3);
 
     List<Cell> l4 = new ArrayList<Cell>();
-    scanners.add(new Scanner(l4));
+    Scanner s4 = new Scanner(l4);
+    scanners.add(s4);
 
     //Creating KeyValueHeap
     KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparator.COMPARATOR);
 
     while(kvh.next() != null);
-
+    // Once the internal scanners go out of Cells, those will be removed from KVHeap's priority
+    // queue and added to a Set for lazy close. The actual close will happen only on KVHeap#close()
+    assertEquals(4, kvh.scannersForDelayedClose.size());
+    assertTrue(kvh.scannersForDelayedClose.contains(s1));
+    assertTrue(kvh.scannersForDelayedClose.contains(s2));
+    assertTrue(kvh.scannersForDelayedClose.contains(s3));
+    assertTrue(kvh.scannersForDelayedClose.contains(s4));
+    kvh.close();
     for(KeyValueScanner scanner : scanners) {
       assertTrue(((Scanner)scanner).isClosed());
     }