You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2016/09/29 06:57:13 UTC
[28/50] [abbrv] hbase git commit: HBASE-16643 - Reverse scanner heap
creation may not allow MSLAB closure due to improper ref counting of segments
(Ram)
HBASE-16643 - Reverse scanner heap creation may not allow MSLAB closure
due to improper ref counting of segments (Ram)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f196a8c3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f196a8c3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f196a8c3
Branch: refs/heads/hbase-14439
Commit: f196a8c331227c0258c0b25c1837f63e6504e8d6
Parents: db394f5
Author: Ramkrishna <ra...@intel.com>
Authored: Tue Sep 27 14:24:19 2016 +0530
Committer: Ramkrishna <ra...@intel.com>
Committed: Tue Sep 27 14:24:19 2016 +0530
----------------------------------------------------------------------
.../regionserver/MemStoreCompactorIterator.java | 3 +-
.../hbase/regionserver/MemStoreScanner.java | 260 ++++++++-----------
.../hbase/regionserver/SegmentScanner.java | 4 +-
.../hadoop/hbase/client/TestFromClientSide.java | 1 -
.../TestCompactingToCellArrayMapMemStore.java | 26 +-
.../hadoop/hbase/regionserver/TestHRegion.java | 2 -
6 files changed, 122 insertions(+), 174 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/f196a8c3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java
index 2eafb42..9798ec2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java
@@ -65,7 +65,7 @@ public class MemStoreCompactorIterator implements Iterator<Cell> {
scanners.add(segment.getScanner(store.getSmallestReadPoint()));
}
- scanner = new MemStoreScanner(comparator, scanners, MemStoreScanner.Type.COMPACT_FORWARD);
+ scanner = new MemStoreScanner(comparator, scanners, true);
// reinitialize the compacting scanner for each instance of iterator
compactingScanner = createScanner(store, scanner);
@@ -101,7 +101,6 @@ public class MemStoreCompactorIterator implements Iterator<Cell> {
public void close() {
compactingScanner.close();
compactingScanner = null;
- scanner.close();
scanner = null;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f196a8c3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java
index 74d061c..2371e20 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java
@@ -24,6 +24,8 @@ import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
@@ -37,67 +39,90 @@ import org.apache.htrace.Trace;
*/
@InterfaceAudience.Private
public class MemStoreScanner extends NonLazyKeyValueScanner {
- /**
- * Types of cell MemStoreScanner
- */
- static public enum Type {
- UNDEFINED,
- COMPACT_FORWARD,
- USER_SCAN_FORWARD,
- USER_SCAN_BACKWARD
- }
- // heap of scanners used for traversing forward
- private KeyValueHeap forwardHeap;
- // reversed scanners heap for traversing backward
- private ReversedKeyValueHeap backwardHeap;
+ // heap of scanners, lazily initialized
+ private KeyValueHeap heap;
- // The type of the scan is defined by constructor
- // or according to the first usage
- private Type type = Type.UNDEFINED;
+ // indicates if the scanner is created for inmemoryCompaction
+ private boolean inmemoryCompaction;
// remember the initial version of the scanners list
List<KeyValueScanner> scanners;
private final CellComparator comparator;
+ private boolean closed;
+
/**
- * If UNDEFINED type for MemStoreScanner is provided, the forward heap is used as default!
- * After constructor only one heap is going to be initialized for entire lifespan
- * of the MemStoreScanner. A specific scanner can only be one directional!
- *
+ * Creates either a forward KeyValue heap or Reverse KeyValue heap based on the type of scan
+ * and the heap is lazily initialized
* @param comparator Cell Comparator
- * @param scanners List of scanners, from which the heap will be built
- * @param type The scan type COMPACT_FORWARD should be used for compaction
+ * @param scanners List of scanners, from which the heap will be built
+ * @param inmemoryCompaction true if used for inmemoryCompaction.
+ * In this case, creates a forward heap always.
*/
- public MemStoreScanner(CellComparator comparator, List<KeyValueScanner> scanners, Type type)
- throws IOException {
+ public MemStoreScanner(CellComparator comparator, List<KeyValueScanner> scanners,
+ boolean inmemoryCompaction) throws IOException {
super();
- this.type = type;
- switch (type) {
- case UNDEFINED:
- case USER_SCAN_FORWARD:
- case COMPACT_FORWARD:
- this.forwardHeap = new KeyValueHeap(scanners, comparator);
- break;
- case USER_SCAN_BACKWARD:
- this.backwardHeap = new ReversedKeyValueHeap(scanners, comparator);
- break;
- default:
- throw new IllegalArgumentException("Unknown scanner type in MemStoreScanner");
- }
this.comparator = comparator;
this.scanners = scanners;
if (Trace.isTracing() && Trace.currentSpan() != null) {
Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner");
}
+ this.inmemoryCompaction = inmemoryCompaction;
+ if (inmemoryCompaction) {
+ // init the forward scanner in case of inmemoryCompaction
+ initForwardKVHeapIfNeeded(comparator, scanners);
+ }
}
- /* Constructor used only when the scan usage is unknown
- and need to be defined according to the first move */
+ /**
+ * Creates either a forward KeyValue heap or Reverse KeyValue heap based on the type of scan
+ * and the heap is lazily initialized
+ * @param comparator Cell Comparator
+ * @param scanners List of scanners, from which the heap will be built
+ */
public MemStoreScanner(CellComparator comparator, List<KeyValueScanner> scanners)
throws IOException {
- this(comparator, scanners, Type.UNDEFINED);
+ this(comparator, scanners, false);
+ }
+
+ private void initForwardKVHeapIfNeeded(CellComparator comparator, List<KeyValueScanner> scanners)
+ throws IOException {
+ if (heap == null) {
+ // lazy init
+ // In a normal scan case, at the StoreScanner level before the KVHeap is
+ // created we do a seek or reseek. So that will happen
+ // on all the scanners that the StoreScanner is
+ // made of. So when we get any of those call to this scanner we init the
+ // heap here with normal forward KVHeap.
+ this.heap = new KeyValueHeap(scanners, comparator);
+ }
+ }
+
+ private boolean initReverseKVHeapIfNeeded(Cell seekKey, CellComparator comparator,
+ List<KeyValueScanner> scanners) throws IOException {
+ boolean res = false;
+ if (heap == null) {
+ // lazy init
+ // In a normal reverse scan case, at the ReversedStoreScanner level before the
+ // ReverseKeyValueheap is
+ // created we do a seekToLastRow or backwardSeek. So that will happen
+ // on all the scanners that the ReversedStoreSCanner is
+ // made of. So when we get any of those call to this scanner we init the
+ // heap here with ReversedKVHeap.
+ if (CellUtil.matchingRow(seekKey, HConstants.EMPTY_START_ROW)) {
+ for (KeyValueScanner scanner : scanners) {
+ res |= scanner.seekToLastRow();
+ }
+ } else {
+ for (KeyValueScanner scanner : scanners) {
+ res |= scanner.backwardSeek(seekKey);
+ }
+ }
+ this.heap = new ReversedKeyValueHeap(scanners, comparator);
+ }
+ return res;
}
/**
@@ -105,30 +130,29 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
* The backward traversal is assumed, only if specified explicitly
*/
@Override
- public synchronized Cell peek() {
- if (type == Type.USER_SCAN_BACKWARD) {
- return backwardHeap.peek();
+ public Cell peek() {
+ if (this.heap != null) {
+ return this.heap.peek();
}
- return forwardHeap.peek();
+ // Doing this way in case some test cases tries to peek directly to avoid NPE
+ return null;
}
/**
* Gets the next cell from the top-most scanner. Assumed forward scanning.
*/
@Override
- public synchronized Cell next() throws IOException {
- KeyValueHeap heap = (Type.USER_SCAN_BACKWARD == type) ? backwardHeap : forwardHeap;
-
- // loop over till the next suitable value
- // take next value from the heap
- for (Cell currentCell = heap.next();
- currentCell != null;
- currentCell = heap.next()) {
-
- // all the logic of presenting cells is inside the internal KeyValueScanners
- // located inside the heap
-
- return currentCell;
+ public Cell next() throws IOException {
+ if(this.heap != null) {
+ // loop over till the next suitable value
+ // take next value from the heap
+ for (Cell currentCell = heap.next();
+ currentCell != null;
+ currentCell = heap.next()) {
+ // all the logic of presenting cells is inside the internal KeyValueScanners
+ // located inside the heap
+ return currentCell;
+ }
}
return null;
}
@@ -142,15 +166,15 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
* @return false if the key is null or if there is no data
*/
@Override
- public synchronized boolean seek(Cell cell) throws IOException {
- assertForward();
+ public boolean seek(Cell cell) throws IOException {
+ initForwardKVHeapIfNeeded(comparator, scanners);
if (cell == null) {
close();
return false;
}
- return forwardHeap.seek(cell);
+ return heap.seek(cell);
}
/**
@@ -160,7 +184,7 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
* @return true if there is at least one KV to read, false otherwise
*/
@Override
- public synchronized boolean reseek(Cell cell) throws IOException {
+ public boolean reseek(Cell cell) throws IOException {
/*
* See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
* This code is executed concurrently with flush and puts, without locks.
@@ -175,8 +199,8 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
*
* TODO: The above comment copied from the original MemStoreScanner
*/
- assertForward();
- return forwardHeap.reseek(cell);
+ initForwardKVHeapIfNeeded(comparator, scanners);
+ return heap.reseek(cell);
}
/**
@@ -190,22 +214,21 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
}
@Override
- public synchronized void close() {
-
- if (forwardHeap != null) {
- assert ((type == Type.USER_SCAN_FORWARD) ||
- (type == Type.COMPACT_FORWARD) || (type == Type.UNDEFINED));
- forwardHeap.close();
- forwardHeap = null;
- if (backwardHeap != null) {
- backwardHeap.close();
- backwardHeap = null;
+ public void close() {
+ if (closed) {
+ return;
+ }
+ // Ensuring that all the segment scanners are closed
+ if (heap != null) {
+ heap.close();
+ // It is safe to do close as no new calls will be made to this scanner.
+ heap = null;
+ } else {
+ for (KeyValueScanner scanner : scanners) {
+ scanner.close();
}
- } else if (backwardHeap != null) {
- assert (type == Type.USER_SCAN_BACKWARD);
- backwardHeap.close();
- backwardHeap = null;
}
+ closed = true;
}
/**
@@ -215,9 +238,11 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
* @return false if the key is null or if there is no data
*/
@Override
- public synchronized boolean backwardSeek(Cell cell) throws IOException {
- initBackwardHeapIfNeeded(cell, false);
- return backwardHeap.backwardSeek(cell);
+ public boolean backwardSeek(Cell cell) throws IOException {
+ // The first time when this happens it sets the scanners to the seek key
+ // passed by the incoming scan's start row
+ initReverseKVHeapIfNeeded(cell, comparator, scanners);
+ return heap.backwardSeek(cell);
}
/**
@@ -227,22 +252,17 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
* @return false if the key is null or if there is no data
*/
@Override
- public synchronized boolean seekToPreviousRow(Cell cell) throws IOException {
- initBackwardHeapIfNeeded(cell, false);
- if (backwardHeap.peek() == null) {
+ public boolean seekToPreviousRow(Cell cell) throws IOException {
+ initReverseKVHeapIfNeeded(cell, comparator, scanners);
+ if (heap.peek() == null) {
restartBackwardHeap(cell);
}
- return backwardHeap.seekToPreviousRow(cell);
+ return heap.seekToPreviousRow(cell);
}
@Override
- public synchronized boolean seekToLastRow() throws IOException {
- // TODO: it looks like this is how it should be, however ReversedKeyValueHeap class doesn't
- // implement seekToLastRow() method :(
- // however seekToLastRow() was implemented in internal MemStoreScanner
- // so I wonder whether we need to come with our own workaround, or to update
- // ReversedKeyValueHeap
- return initBackwardHeapIfNeeded(KeyValue.LOWESTKEY, true);
+ public boolean seekToLastRow() throws IOException {
+ return initReverseKVHeapIfNeeded(KeyValue.LOWESTKEY, comparator, scanners);
}
/**
@@ -250,9 +270,9 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
* @return False if the key definitely does not exist in this Memstore
*/
@Override
- public synchronized boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
-
- if (type == Type.COMPACT_FORWARD) {
+ public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
+ // TODO : Check if this can be removed.
+ if (inmemoryCompaction) {
return true;
}
@@ -286,58 +306,8 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
for (KeyValueScanner scan : scanners) {
res |= scan.seekToPreviousRow(cell);
}
- this.backwardHeap =
+ this.heap =
new ReversedKeyValueHeap(scanners, comparator);
return res;
}
-
- /**
- * Checks whether the type of the scan suits the assumption of moving backward
- */
- private boolean initBackwardHeapIfNeeded(Cell cell, boolean toLast) throws IOException {
- boolean res = false;
- if (toLast && (type != Type.UNDEFINED)) {
- throw new IllegalStateException(
- "Wrong usage of initBackwardHeapIfNeeded in parameters. The type is:" + type.toString());
- }
- if (type == Type.UNDEFINED) {
- // In case we started from peek, release the forward heap
- // and build backward. Set the correct type. Thus this turn
- // can happen only once
- if ((backwardHeap == null) && (forwardHeap != null)) {
- forwardHeap.close();
- forwardHeap = null;
- // before building the heap seek for the relevant key on the scanners,
- // for the heap to be built from the scanners correctly
- for (KeyValueScanner scan : scanners) {
- if (toLast) {
- res |= scan.seekToLastRow();
- } else {
- res |= scan.backwardSeek(cell);
- }
- }
- this.backwardHeap =
- new ReversedKeyValueHeap(scanners, comparator);
- type = Type.USER_SCAN_BACKWARD;
- }
- }
-
- if (type == Type.USER_SCAN_FORWARD) {
- throw new IllegalStateException("Traversing backward with forward scan");
- }
- return res;
- }
-
- /**
- * Checks whether the type of the scan suits the assumption of moving forward
- */
- private void assertForward() throws IllegalStateException {
- if (type == Type.UNDEFINED) {
- type = Type.USER_SCAN_FORWARD;
- }
-
- if (type == Type.USER_SCAN_BACKWARD) {
- throw new IllegalStateException("Traversing forward with backward scan");
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f196a8c3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
index 8cf0a7c..92c3443 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
@@ -67,11 +67,11 @@ public class SegmentScanner implements KeyValueScanner {
protected SegmentScanner(Segment segment, long readPoint, long scannerOrder) {
this.segment = segment;
this.readPoint = readPoint;
+ //increase the reference count so the underlying structure will not be de-allocated
+ this.segment.incScannerCount();
iter = segment.iterator();
// the initialization of the current is required for working with heap of SegmentScanners
current = getNext();
- //increase the reference count so the underlying structure will not be de-allocated
- this.segment.incScannerCount();
this.scannerOrder = scannerOrder;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f196a8c3/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index 33a5315..f10cce3a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -90,7 +90,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
http://git-wip-us.apache.org/repos/asf/hbase/blob/f196a8c3/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
index fefe2c1..f89a040 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
@@ -22,12 +22,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
-import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdge;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -36,9 +33,6 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.util.ArrayList;
import java.util.List;
@@ -281,13 +275,14 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
Threads.sleep(10);
}
List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE);
- MemStoreScanner scanner = new MemStoreScanner(CellComparator.COMPARATOR, scanners);
+ // seek
+ scanners.get(0).seek(KeyValue.LOWESTKEY);
int count = 0;
- while (scanner.next() != null) {
+ while (scanners.get(0).next() != null) {
count++;
}
assertEquals("the count should be ", count, 150);
- scanner.close();
+ scanners.get(0).close();
}
@Test
@@ -345,17 +340,4 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
}
regionServicesForStores.addAndGetGlobalMemstoreSize(hmc.getActive().size() - size);//
}
-
- private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge {
- long t = 1234;
-
- @Override public long currentTime() {
- return t;
- }
-
- public void setCurrentTimeMillis(long t) {
- this.t = t;
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f196a8c3/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 2042f52..80d220d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -90,7 +90,6 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TestMobSnapshotCloneIndependence;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
@@ -103,7 +102,6 @@ import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.master.procedure.TestMasterFailoverWithProcedures;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;