You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2015/06/06 04:33:02 UTC
hbase git commit: HBASE-13827 Delayed scanner close in KeyValueHeap
and StoreScanner.
Repository: hbase
Updated Branches:
refs/heads/master c1be65ecf -> fef6d7f48
HBASE-13827 Delayed scanner close in KeyValueHeap and StoreScanner.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fef6d7f4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fef6d7f4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fef6d7f4
Branch: refs/heads/master
Commit: fef6d7f48c81d63b12be4f53031bdbf208635cac
Parents: c1be65e
Author: anoopsjohn <an...@gmail.com>
Authored: Sat Jun 6 08:02:35 2015 +0530
Committer: anoopsjohn <an...@gmail.com>
Committed: Sat Jun 6 08:02:35 2015 +0530
----------------------------------------------------------------------
.../hadoop/hbase/io/HalfStoreFileReader.java | 5 ++
.../hadoop/hbase/io/hfile/HFileReaderImpl.java | 5 ++
.../hadoop/hbase/io/hfile/HFileScanner.java | 5 ++
.../hadoop/hbase/regionserver/KeyValueHeap.java | 22 ++++++---
.../regionserver/ReversedKeyValueHeap.java | 6 +--
.../hbase/regionserver/StoreFileScanner.java | 18 +++----
.../hadoop/hbase/regionserver/StoreScanner.java | 49 +++++++++++++-------
.../hbase/regionserver/TestKeyValueHeap.java | 21 +++++++--
8 files changed, 88 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/fef6d7f4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
index a95da7b..78c6734 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
@@ -288,6 +288,11 @@ public class HalfStoreFileReader extends StoreFile.Reader {
public Cell getNextIndexedKey() {
return null;
}
+
+ @Override
+ public void close() {
+ this.delegate.close();
+ }
};
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/fef6d7f4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index 4d1881d..d184d42 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -1077,6 +1077,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
new KeyValue.KeyOnlyKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+ blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen));
}
+
+ @Override
+ public void close() {
+ // HBASE-12295 will add code here.
+ }
}
public Path getPath() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/fef6d7f4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
index 2b6e011..6b527f6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
@@ -149,4 +149,9 @@ public interface HFileScanner {
* @return the next key in the index (the key to seek to the next block)
*/
Cell getNextIndexedKey();
+
+ /**
+ * Close this HFile scanner and do necessary cleanup.
+ */
+ void close();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/fef6d7f4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
index 9220d07..a12e7c3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
@@ -21,8 +21,10 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.List;
import java.util.PriorityQueue;
+import java.util.Set;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
@@ -59,7 +61,9 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
protected KeyValueScanner current = null;
protected KVScannerComparator comparator;
-
+
+ protected Set<KeyValueScanner> scannersForDelayedClose = new HashSet<KeyValueScanner>();
+
/**
* Constructor. This KeyValueHeap will handle closing of passed in
* KeyValueScanners.
@@ -87,7 +91,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
if (scanner.peek() != null) {
this.heap.add(scanner);
} else {
- scanner.close();
+ this.scannersForDelayedClose.add(scanner);
}
}
this.current = pollRealKV();
@@ -108,7 +112,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
Cell kvReturn = this.current.next();
Cell kvNext = this.current.peek();
if (kvNext == null) {
- this.current.close();
+ this.scannersForDelayedClose.add(this.current);
this.current = pollRealKV();
} else {
KeyValueScanner topScanner = this.heap.peek();
@@ -154,7 +158,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
*/
if (pee == null || !moreCells) {
- this.current.close();
+ this.scannersForDelayedClose.add(this.current);
} else {
this.heap.add(this.current);
}
@@ -210,6 +214,10 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
}
public void close() {
+ for (KeyValueScanner scanner : this.scannersForDelayedClose) {
+ scanner.close();
+ }
+ this.scannersForDelayedClose.clear();
if (this.current != null) {
this.current.close();
}
@@ -311,7 +319,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
}
if (!seekResult) {
- scanner.close();
+ this.scannersForDelayedClose.add(scanner);
} else {
heap.add(scanner);
}
@@ -364,12 +372,12 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
} else {
// Close the scanner because we did a real seek and found out there
// are no more KVs.
- kvScanner.close();
+ this.scannersForDelayedClose.add(kvScanner);
}
} else {
// Close the scanner because it has already run out of KVs even before
// we had to do a real seek on it.
- kvScanner.close();
+ this.scannersForDelayedClose.add(kvScanner);
}
kvScanner = heap.poll();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/fef6d7f4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java
index 5167b4e..6914132 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java
@@ -85,7 +85,7 @@ public class ReversedKeyValueHeap extends KeyValueHeap {
}
if (!scanner.seekToPreviousRow(seekKey)) {
- scanner.close();
+ this.scannersForDelayedClose.add(scanner);
} else {
heap.add(scanner);
}
@@ -114,7 +114,7 @@ public class ReversedKeyValueHeap extends KeyValueHeap {
return current != null;
}
if (!scanner.backwardSeek(seekKey)) {
- scanner.close();
+ this.scannersForDelayedClose.add(scanner);
} else {
heap.add(scanner);
}
@@ -134,7 +134,7 @@ public class ReversedKeyValueHeap extends KeyValueHeap {
if (this.current.seekToPreviousRow(kvReturn)) {
this.heap.add(this.current);
} else {
- this.current.close();
+ this.scannersForDelayedClose.add(this.current);
}
this.current = pollRealKV();
} else {
http://git-wip-us.apache.org/repos/asf/hbase/blob/fef6d7f4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index 42a378d..e7a5af4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -27,8 +27,6 @@ import java.util.List;
import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
@@ -46,8 +44,6 @@ import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
*/
@InterfaceAudience.LimitedPrivate("Coprocessor")
public class StoreFileScanner implements KeyValueScanner {
- private static final Log LOG = LogFactory.getLog(HStore.class);
-
// the reader it comes from:
private final StoreFile.Reader reader;
private final HFileScanner hfs;
@@ -158,7 +154,7 @@ public class StoreFileScanner implements KeyValueScanner {
try {
try {
if(!seekAtOrAfter(hfs, key)) {
- close();
+ this.cur = null;
return false;
}
@@ -185,7 +181,7 @@ public class StoreFileScanner implements KeyValueScanner {
try {
try {
if (!reseekAtOrAfter(hfs, key)) {
- close();
+ this.cur = null;
return false;
}
setCurrentCell(hfs.getKeyValue());
@@ -219,7 +215,7 @@ public class StoreFileScanner implements KeyValueScanner {
Cell startKV = cur;
while(enforceMVCC
&& cur != null
- && (cur.getMvccVersion() > readPt)) {
+ && (cur.getSequenceId() > readPt)) {
hfs.next();
setCurrentCell(hfs.getKeyValue());
if (this.stopSkippingKVsIfNextRow
@@ -229,7 +225,6 @@ public class StoreFileScanner implements KeyValueScanner {
}
if (cur == null) {
- close();
return false;
}
@@ -237,8 +232,8 @@ public class StoreFileScanner implements KeyValueScanner {
}
public void close() {
- // Nothing to close on HFileScanner?
cur = null;
+ this.hfs.close();
}
/**
@@ -421,7 +416,6 @@ public class StoreFileScanner implements KeyValueScanner {
}
@Override
- @SuppressWarnings("deprecation")
public boolean seekToPreviousRow(Cell key) throws IOException {
try {
try {
@@ -429,7 +423,7 @@ public class StoreFileScanner implements KeyValueScanner {
key.getRowLength());
if (seekCount != null) seekCount.incrementAndGet();
if (!hfs.seekBefore(seekKey)) {
- close();
+ this.cur = null;
return false;
}
KeyValue firstKeyOfPreviousRow = KeyValueUtil.createFirstOnRow(hfs.getKeyValue()
@@ -437,7 +431,7 @@ public class StoreFileScanner implements KeyValueScanner {
if (seekCount != null) seekCount.incrementAndGet();
if (!seekAtOrAfter(hfs, firstKeyOfPreviousRow)) {
- close();
+ this.cur = null;
return false;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/fef6d7f4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index cbca57b..4be5c7b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -22,8 +22,10 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.NavigableSet;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;
@@ -84,6 +86,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
protected final long maxRowSize;
protected final long cellsPerHeartbeatCheck;
+ protected Set<KeyValueHeap> heapsForDelayedClose = new HashSet<KeyValueHeap>();
+
/**
* The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
* KVs skipped via seeking to next row/column. TODO: estimate them?
@@ -437,17 +441,32 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@Override
public void close() {
+ close(true);
+ }
+
+ private void close(boolean withHeapClose){
lock.lock();
try {
- if (this.closing) return;
- this.closing = true;
- // under test, we dont have a this.store
- if (this.store != null)
- this.store.deleteChangedReaderObserver(this);
- if (this.heap != null)
- this.heap.close();
- this.heap = null; // CLOSED!
- this.lastTop = null; // If both are null, we are closed.
+ if (this.closing) return;
+ this.closing = true;
+ // under test, we dont have a this.store
+ if (this.store != null) this.store.deleteChangedReaderObserver(this);
+ if (withHeapClose) {
+ for (KeyValueHeap h : this.heapsForDelayedClose) {
+ h.close();
+ }
+ this.heapsForDelayedClose.clear();
+ if (this.heap != null) {
+ this.heap.close();
+ this.heap = null; // CLOSED!
+ }
+ } else {
+ if (this.heap != null) {
+ this.heapsForDelayedClose.add(this.heap);
+ this.heap = null;
+ }
+ }
+ this.lastTop = null; // If both are null, we are closed.
} finally {
lock.unlock();
}
@@ -491,13 +510,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// if the heap was left null, then the scanners had previously run out anyways, close and
// return.
if (this.heap == null) {
- close();
+ close(false);// Do all cleanup except heap.close()
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
Cell peeked = this.heap.peek();
if (peeked == null) {
- close();
+ close(false);// Do all cleanup except heap.close()
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
@@ -547,7 +566,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
Filter f = matcher.getFilter();
if (f != null) {
- // TODO convert Scan Query Matcher to be Cell instead of KV based ?
cell = f.transformCell(cell);
}
@@ -604,7 +622,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
case DONE_SCAN:
- close();
+ close(false);// Do all cleanup except heap.close()
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
case SEEK_NEXT_ROW:
@@ -626,7 +644,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
break;
case SEEK_NEXT_USING_HINT:
- // TODO convert resee to Cell?
Cell nextKV = matcher.getNextKeyHint(cell);
if (nextKV != null) {
seekAsDirection(nextKV);
@@ -645,7 +662,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
// No more keys
- close();
+ close(false);// Do all cleanup except heap.close()
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
} finally {
lock.unlock();
@@ -705,7 +722,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
//DebugPrint.println("SS updateReaders, topKey = " + lastTop);
// close scanners to old obsolete Store files
- this.heap.close(); // bubble thru and close all scanners.
+ this.heapsForDelayedClose.add(this.heap);// Don't close now. Delay it till StoreScanner#close
this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
// Let the next() call handle re-creating and seeking
http://git-wip-us.apache.org/repos/asf/hbase/blob/fef6d7f4/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
index 0fa904c..5b0ab3b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
@@ -190,12 +190,14 @@ public class TestKeyValueHeap extends HBaseTestCase {
l1.add(new KeyValue(row1, fam1, col5, data));
l1.add(new KeyValue(row2, fam1, col1, data));
l1.add(new KeyValue(row2, fam1, col2, data));
- scanners.add(new Scanner(l1));
+ Scanner s1 = new Scanner(l1);
+ scanners.add(s1);
List<Cell> l2 = new ArrayList<Cell>();
l2.add(new KeyValue(row1, fam1, col1, data));
l2.add(new KeyValue(row1, fam1, col2, data));
- scanners.add(new Scanner(l2));
+ Scanner s2 = new Scanner(l2);
+ scanners.add(s2);
List<Cell> l3 = new ArrayList<Cell>();
l3.add(new KeyValue(row1, fam1, col3, data));
@@ -203,16 +205,25 @@ public class TestKeyValueHeap extends HBaseTestCase {
l3.add(new KeyValue(row1, fam2, col1, data));
l3.add(new KeyValue(row1, fam2, col2, data));
l3.add(new KeyValue(row2, fam1, col3, data));
- scanners.add(new Scanner(l3));
+ Scanner s3 = new Scanner(l3);
+ scanners.add(s3);
List<Cell> l4 = new ArrayList<Cell>();
- scanners.add(new Scanner(l4));
+ Scanner s4 = new Scanner(l4);
+ scanners.add(s4);
//Creating KeyValueHeap
KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparator.COMPARATOR);
while(kvh.next() != null);
-
+ // Once the internal scanners go out of Cells, those will be removed from KVHeap's priority
+ // queue and added to a Set for lazy close. The actual close will happen only on KVHeap#close()
+ assertEquals(4, kvh.scannersForDelayedClose.size());
+ assertTrue(kvh.scannersForDelayedClose.contains(s1));
+ assertTrue(kvh.scannersForDelayedClose.contains(s2));
+ assertTrue(kvh.scannersForDelayedClose.contains(s3));
+ assertTrue(kvh.scannersForDelayedClose.contains(s4));
+ kvh.close();
for(KeyValueScanner scanner : scanners) {
assertTrue(((Scanner)scanner).isClosed());
}