You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by es...@apache.org on 2017/03/21 10:45:49 UTC
hbase git commit: HBASE-17655 Removing MemStoreScanner and
SnapshotScanner
Repository: hbase
Updated Branches:
refs/heads/master cc59fe4e9 -> 8f4ae0a0d
HBASE-17655 Removing MemStoreScanner and SnapshotScanner
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8f4ae0a0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8f4ae0a0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8f4ae0a0
Branch: refs/heads/master
Commit: 8f4ae0a0dcb658c4fe669bc4cdc68ad8e6219daf
Parents: cc59fe4
Author: eshcar <es...@yahoo-inc.com>
Authored: Tue Mar 21 12:32:59 2017 +0200
Committer: eshcar <es...@yahoo-inc.com>
Committed: Tue Mar 21 12:35:47 2017 +0200
----------------------------------------------------------------------
.../example/ZooKeeperScanPolicyObserver.java | 4 +-
.../hbase/coprocessor/RegionObserver.java | 35 +-
.../hbase/mob/DefaultMobStoreFlusher.java | 2 +-
.../hbase/regionserver/AbstractMemStore.java | 14 +
.../hbase/regionserver/CompactingMemStore.java | 21 +-
.../regionserver/CompositeImmutableSegment.java | 33 +-
.../hbase/regionserver/DefaultMemStore.java | 15 +-
.../hbase/regionserver/DefaultStoreFlusher.java | 2 +-
.../hbase/regionserver/ImmutableSegment.java | 12 +-
.../hbase/regionserver/MemStoreCompactor.java | 2 +-
.../MemStoreCompactorSegmentsIterator.java | 17 +-
.../MemStoreMergerSegmentsIterator.java | 52 ++-
.../hbase/regionserver/MemStoreScanner.java | 334 -------------------
.../regionserver/MemStoreSegmentsIterator.java | 23 +-
.../hbase/regionserver/MemStoreSnapshot.java | 15 +-
.../regionserver/RegionCoprocessorHost.java | 7 +-
.../hadoop/hbase/regionserver/Segment.java | 8 +-
.../hbase/regionserver/SegmentScanner.java | 13 +-
.../hbase/regionserver/SnapshotScanner.java | 105 ------
.../hadoop/hbase/regionserver/StoreFlusher.java | 8 +-
.../hbase/regionserver/StripeStoreFlusher.java | 2 +-
.../hbase/coprocessor/SimpleRegionObserver.java | 2 +-
.../TestRegionObserverScannerOpenHook.java | 6 +-
.../regionserver/NoOpScanPolicyObserver.java | 4 +-
.../regionserver/TestCompactingMemStore.java | 30 +-
.../TestCompactingToCellArrayMapMemStore.java | 32 +-
.../hbase/regionserver/TestDefaultMemStore.java | 20 +-
.../regionserver/TestMemStoreChunkPool.java | 14 +-
.../regionserver/TestReversibleScanners.java | 66 +++-
.../hbase/util/TestCoprocessorScanPolicy.java | 5 +-
30 files changed, 262 insertions(+), 641 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
index 2343c1d..b7df9b4 100644
--- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
@@ -188,7 +188,7 @@ public class ZooKeeperScanPolicyObserver implements RegionObserver {
@Override
public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
- Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
+ Store store, List<KeyValueScanner> scanners, InternalScanner s) throws IOException {
ScanInfo scanInfo = getScanInfo(store, c.getEnvironment());
if (scanInfo == null) {
// take default action
@@ -196,7 +196,7 @@ public class ZooKeeperScanPolicyObserver implements RegionObserver {
}
Scan scan = new Scan();
scan.setMaxVersions(scanInfo.getMaxVersions());
- return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner),
+ return new StoreScanner(store, scanInfo, scan, scanners,
ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index a3db3b1..e36feea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.coprocessor;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
@@ -128,16 +129,16 @@ public interface RegionObserver extends Coprocessor {
* effect in this hook.
* @param c the environment provided by the region server
* @param store the store being flushed
- * @param memstoreScanner the scanner for the memstore that is flushed
+ * @param scanners the scanners for the memstore that is flushed
* @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
* @return the scanner to use during the flush. {@code null} if the default implementation
* is to be used.
- * @deprecated Use {@link #preFlushScannerOpen(ObserverContext, Store, KeyValueScanner,
+ * @deprecated Use {@link #preFlushScannerOpen(ObserverContext, Store, List,
* InternalScanner, long)}
*/
@Deprecated
default InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
- final Store store, final KeyValueScanner memstoreScanner, final InternalScanner s)
+ final Store store, final List<KeyValueScanner> scanners, final InternalScanner s)
throws IOException {
return s;
}
@@ -151,16 +152,32 @@ public interface RegionObserver extends Coprocessor {
* effect in this hook.
* @param c the environment provided by the region server
* @param store the store being flushed
- * @param memstoreScanner the scanner for the memstore that is flushed
+ * @param scanners the scanners for the memstore that is flushed
* @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
* @param readPoint the readpoint to create scanner
* @return the scanner to use during the flush. {@code null} if the default implementation
* is to be used.
*/
default InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
- final Store store, final KeyValueScanner memstoreScanner, final InternalScanner s,
+ final Store store, final List<KeyValueScanner> scanners, final InternalScanner s,
final long readPoint) throws IOException {
- return preFlushScannerOpen(c, store, memstoreScanner, s);
+ return preFlushScannerOpen(c, store, scanners, s);
+ }
+
+ /**
+ * Maintain backward compatibility.
+ * @param c the environment provided by the region server
+ * @param store the store being flushed
+ * @param scanner the scanner for the memstore that is flushed
+ * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
+ * @param readPoint the readpoint to create scanner
+ * @return the scanner to use during the flush. {@code null} if the default implementation
+ * is to be used.
+ */
+ default InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Store store, final KeyValueScanner scanner, final InternalScanner s,
+ final long readPoint) throws IOException {
+ return preFlushScannerOpen(c, store, Collections.singletonList(scanner), s, readPoint);
}
/**
@@ -1113,8 +1130,7 @@ public interface RegionObserver extends Coprocessor {
* Called before a store opens a new scanner.
* This hook is called when a "user" scanner is opened.
* <p>
- * See {@link #preFlushScannerOpen(ObserverContext, Store, KeyValueScanner, InternalScanner,
- * long)} and {@link #preCompactScannerOpen(ObserverContext,
+ * See {@link #preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)} and {@link #preCompactScannerOpen(ObserverContext,
* Store, List, ScanType, long, InternalScanner, CompactionRequest, long)}
* to override scanners created for flushes or compactions, resp.
* <p>
@@ -1145,8 +1161,7 @@ public interface RegionObserver extends Coprocessor {
* Called before a store opens a new scanner.
* This hook is called when a "user" scanner is opened.
* <p>
- * See {@link #preFlushScannerOpen(ObserverContext, Store, KeyValueScanner, InternalScanner,
- * long)} and {@link #preCompactScannerOpen(ObserverContext,
+ * See {@link #preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)} and {@link #preCompactScannerOpen(ObserverContext,
* Store, List, ScanType, long, InternalScanner, CompactionRequest, long)}
* to override scanners created for flushes or compactions, resp.
* <p>
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
index 2456a41..1a1c5a7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
@@ -104,7 +104,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
// Use a store scanner to find which rows to flush.
long smallestReadPoint = store.getSmallestReadPoint();
- InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
+ InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint);
if (scanner == null) {
return result; // NULL scanner returned from coprocessor hooks means skip normal processing
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
index d44486c..cff2b27 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
@@ -60,6 +60,20 @@ public abstract class AbstractMemStore implements MemStore {
public final static long DEEP_OVERHEAD = FIXED_OVERHEAD;
+ public static long addToScanners(List<? extends Segment> segments, long readPt, long order,
+ List<KeyValueScanner> scanners) {
+ for (Segment item : segments) {
+ order = addToScanners(item, readPt, order, scanners);
+ }
+ return order;
+ }
+
+ protected static long addToScanners(Segment segment, long readPt, long order,
+ List<KeyValueScanner> scanners) {
+ scanners.add(segment.getScanner(readPt, order));
+ return order - 1;
+ }
+
protected AbstractMemStore(final Configuration conf, final CellComparator c) {
this.conf = conf;
this.comparator = c;
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index 926b3f7..26b2f49 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -318,21 +317,15 @@ public class CompactingMemStore extends AbstractMemStore {
*/
public List<KeyValueScanner> getScanners(long readPt) throws IOException {
List<? extends Segment> pipelineList = pipeline.getSegments();
- int order = pipelineList.size() + snapshot.getNumOfSegments();
+ List<? extends Segment> snapshotList = snapshot.getAllSegments();
+ long order = 1 + pipelineList.size() + snapshotList.size();
// The list of elements in pipeline + the active element + the snapshot segment
- // TODO : This will change when the snapshot is made of more than one element
// The order is the Segment ordinal
- List<KeyValueScanner> list = new ArrayList<>(order+1);
- list.add(this.active.getScanner(readPt, order + 1));
- for (Segment item : pipelineList) {
- list.add(item.getScanner(readPt, order));
- order--;
- }
- for (Segment item : snapshot.getAllSegments()) {
- list.add(item.getScanner(readPt, order));
- order--;
- }
- return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(getComparator(), list));
+ List<KeyValueScanner> list = new ArrayList<KeyValueScanner>((int) order);
+ order = addToScanners(active, readPt, order, list);
+ order = addToScanners(pipelineList, readPt, order, list);
+ addToScanners(snapshotList, readPt, order, list);
+ return list;
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
index eeade4f..2f89ec7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
@@ -72,16 +71,6 @@ public class CompositeImmutableSegment extends ImmutableSegment {
}
/**
- * Builds a special scanner for the MemStoreSnapshot object that is different than the
- * general segment scanner.
- * @return a special scanner for the MemStoreSnapshot object
- */
- @Override
- public KeyValueScanner getSnapshotScanner() {
- return getScanner(Long.MAX_VALUE, Long.MAX_VALUE);
- }
-
- /**
* @return whether the segment has any cells
*/
@Override
@@ -148,8 +137,7 @@ public class CompositeImmutableSegment extends ImmutableSegment {
*/
@Override
public KeyValueScanner getScanner(long readPoint) {
- // Long.MAX_VALUE is DEFAULT_SCANNER_ORDER
- return getScanner(readPoint,Long.MAX_VALUE);
+ throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
/**
@@ -158,19 +146,14 @@ public class CompositeImmutableSegment extends ImmutableSegment {
*/
@Override
public KeyValueScanner getScanner(long readPoint, long order) {
- KeyValueScanner resultScanner;
- List<KeyValueScanner> list = new ArrayList<>(segments.size());
- for (ImmutableSegment s : segments) {
- list.add(s.getScanner(readPoint, order));
- }
-
- try {
- resultScanner = new MemStoreScanner(getComparator(), list);
- } catch (IOException ie) {
- throw new IllegalStateException(ie);
- }
+ throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+ }
- return resultScanner;
+ @Override
+ public List<KeyValueScanner> getScanners(long readPoint, long order) {
+ List<KeyValueScanner> list = new ArrayList<>(segments.size());
+ AbstractMemStore.addToScanners(segments, readPoint, order, list);
+ return list;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index b3e9c65..d1f6b1c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -75,10 +74,6 @@ public class DefaultMemStore extends AbstractMemStore {
super(conf, c);
}
- void dump() {
- super.dump(LOG);
- }
-
/**
* Creates a snapshot of the current memstore.
* Snapshot must be cleared by call to {@link #clearSnapshot(long)}
@@ -129,11 +124,11 @@ public class DefaultMemStore extends AbstractMemStore {
* Scanners are ordered from 0 (oldest) to newest in increasing order.
*/
public List<KeyValueScanner> getScanners(long readPt) throws IOException {
- List<KeyValueScanner> list = new ArrayList<>(2);
- list.add(this.active.getScanner(readPt, 1));
- list.add(this.snapshot.getScanner(readPt, 0));
- return Collections.<KeyValueScanner> singletonList(
- new MemStoreScanner(getComparator(), list));
+ List<KeyValueScanner> list = new ArrayList<>();
+ long order = snapshot.getNumOfSegments();
+ order = addToScanners(active, readPt, order, list);
+ addToScanners(snapshot.getAllSegments(), readPt, order, list);
+ return list;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
index 8cb3a1d..ef49f29 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
@@ -52,7 +52,7 @@ public class DefaultStoreFlusher extends StoreFlusher {
// Use a store scanner to find which rows to flush.
long smallestReadPoint = store.getSmallestReadPoint();
- InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
+ InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint);
if (scanner == null) {
return result; // NULL scanner returned from coprocessor hooks means skip normal processing
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
index c8d27b2..f1273a9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
@@ -34,9 +34,7 @@ import java.util.List;
/**
* ImmutableSegment is an abstract class that extends the API supported by a {@link Segment},
- * and is not needed for a {@link MutableSegment}. Specifically, the method
- * {@link ImmutableSegment#getSnapshotScanner()} builds a special scanner for the
- * {@link MemStoreSnapshot} object.
+ * and is not needed for a {@link MutableSegment}.
*/
@InterfaceAudience.Private
public class ImmutableSegment extends Segment {
@@ -130,14 +128,6 @@ public class ImmutableSegment extends Segment {
}
///////////////////// PUBLIC METHODS /////////////////////
- /**
- * Builds a special scanner for the MemStoreSnapshot object that is different than the
- * general segment scanner.
- * @return a special scanner for the MemStoreSnapshot object
- */
- public KeyValueScanner getSnapshotScanner() {
- return new SnapshotScanner(this);
- }
@Override
public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
index c435098..dfa7d18 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
@@ -252,7 +252,7 @@ public class MemStoreCompactor {
iterator =
new MemStoreMergerSegmentsIterator(versionedList.getStoreSegments(),
compactingMemStore.getComparator(),
- compactionKVMax, compactingMemStore.getStore());
+ compactionKVMax);
result = SegmentFactory.instance().createImmutableSegmentByMerge(
compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
index 6a30eac..8f481e0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.client.Scan;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -50,11 +49,16 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
List<ImmutableSegment> segments,
CellComparator comparator, int compactionKVMax, Store store
) throws IOException {
- super(segments,comparator,compactionKVMax,store);
+ super(compactionKVMax);
+ List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
+ // create the list of scanners to traverse over all the data
+ // no dirty reads here as these are immutable segments
+ int order = segments.size();
+ AbstractMemStore.addToScanners(segments, Integer.MAX_VALUE, order, scanners);
// build the scanner based on Query Matcher
// reinitialize the compacting scanner for each instance of iterator
- compactingScanner = createScanner(store, scanner);
+ compactingScanner = createScanner(store, scanners);
hasMore = compactingScanner.next(kvs, scannerContext);
@@ -93,7 +97,6 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
public void close() {
compactingScanner.close();
compactingScanner = null;
- scanner = null;
}
@Override
@@ -106,13 +109,13 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
*
* @return the scanner
*/
- private StoreScanner createScanner(Store store, KeyValueScanner scanner)
+ private StoreScanner createScanner(Store store, List<KeyValueScanner> scanners)
throws IOException {
Scan scan = new Scan();
scan.setMaxVersions(); //Get all available versions
StoreScanner internalScanner =
- new StoreScanner(store, store.getScanInfo(), scan, Collections.singletonList(scanner),
+ new StoreScanner(store, store.getScanInfo(), scan, scanners,
ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(),
HConstants.OLDEST_TIMESTAMP);
@@ -146,4 +149,4 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
}
return hasMore;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreMergerSegmentsIterator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreMergerSegmentsIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreMergerSegmentsIterator.java
index 625fc76..3bb814b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreMergerSegmentsIterator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreMergerSegmentsIterator.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
/**
@@ -33,36 +34,67 @@ import java.util.List;
@InterfaceAudience.Private
public class MemStoreMergerSegmentsIterator extends MemStoreSegmentsIterator {
+ // heap of scanners, lazily initialized
+ private KeyValueHeap heap = null;
+ // remember the initial version of the scanners list
+ List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
+
+ private boolean closed = false;
+
// C-tor
public MemStoreMergerSegmentsIterator(List<ImmutableSegment> segments, CellComparator comparator,
- int compactionKVMax, Store store
- ) throws IOException {
- super(segments,comparator,compactionKVMax,store);
+ int compactionKVMax) throws IOException {
+ super(compactionKVMax);
+ // create the list of scanners to traverse over all the data
+ // no dirty reads here as these are immutable segments
+ int order = segments.size();
+ AbstractMemStore.addToScanners(segments, Integer.MAX_VALUE, order, scanners);
+ heap = new KeyValueHeap(scanners, comparator);
}
@Override
public boolean hasNext() {
- return (scanner.peek()!=null);
+ if (closed) {
+ return false;
+ }
+ if (this.heap != null) {
+ return (this.heap.peek() != null);
+ }
+ // Doing this way in case some test cases tries to peek directly
+ return false;
}
@Override
public Cell next() {
- Cell result = null;
try { // try to get next
- result = scanner.next();
+ if (!closed && heap != null) {
+ return heap.next();
+ }
} catch (IOException ie) {
throw new IllegalStateException(ie);
}
- return result;
+ return null;
}
public void close() {
- scanner.close();
- scanner = null;
+ if (closed) {
+ return;
+ }
+ // Ensuring that all the segment scanners are closed
+ if (heap != null) {
+ heap.close();
+ // It is safe to do close as no new calls will be made to this scanner.
+ heap = null;
+ } else {
+ for (KeyValueScanner scanner : scanners) {
+ scanner.close();
+ }
+ }
+ closed = true;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java
deleted file mode 100644
index 2ccdf68..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java
+++ /dev/null
@@ -1,334 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.regionserver;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.htrace.Trace;
-
-/**
- * This is the scanner for any MemStore implementation, derived from MemStore.
- * The MemStoreScanner combines KeyValueScanner from different Segments and
- * uses the key-value heap and the reversed key-value heap for the aggregated key-values set.
- * It is assumed that only traversing forward or backward is used (without zigzagging in between)
- */
-@InterfaceAudience.Private
-public class MemStoreScanner extends NonLazyKeyValueScanner {
-
- // heap of scanners, lazily initialized
- private KeyValueHeap heap;
-
- // indicates if the scanner is created for inmemoryCompaction
- private boolean inmemoryCompaction;
-
- // remember the initial version of the scanners list
- List<KeyValueScanner> scanners;
-
- private final CellComparator comparator;
-
- private boolean closed;
-
- /**
- * Creates either a forward KeyValue heap or Reverse KeyValue heap based on the type of scan
- * and the heap is lazily initialized
- * @param comparator Cell Comparator
- * @param scanners List of scanners, from which the heap will be built
- * @param inmemoryCompaction true if used for inmemoryCompaction.
- * In this case, creates a forward heap always.
- */
- public MemStoreScanner(CellComparator comparator, List<KeyValueScanner> scanners,
- boolean inmemoryCompaction) throws IOException {
- super();
- this.comparator = comparator;
- this.scanners = scanners;
- if (Trace.isTracing() && Trace.currentSpan() != null) {
- Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner");
- }
- this.inmemoryCompaction = inmemoryCompaction;
- if (inmemoryCompaction) {
- // init the forward scanner in case of inmemoryCompaction
- initForwardKVHeapIfNeeded(comparator, scanners);
- }
- }
-
- /**
- * Creates either a forward KeyValue heap or Reverse KeyValue heap based on the type of scan
- * and the heap is lazily initialized
- * @param comparator Cell Comparator
- * @param scanners List of scanners, from which the heap will be built
- */
- public MemStoreScanner(CellComparator comparator, List<KeyValueScanner> scanners)
- throws IOException {
- this(comparator, scanners, false);
- }
-
- private void initForwardKVHeapIfNeeded(CellComparator comparator, List<KeyValueScanner> scanners)
- throws IOException {
- if (heap == null) {
- // lazy init
- // In a normal scan case, at the StoreScanner level before the KVHeap is
- // created we do a seek or reseek. So that will happen
- // on all the scanners that the StoreScanner is
- // made of. So when we get any of those call to this scanner we init the
- // heap here with normal forward KVHeap.
- this.heap = new KeyValueHeap(scanners, comparator);
- }
- }
-
- private boolean initReverseKVHeapIfNeeded(Cell seekKey, CellComparator comparator,
- List<KeyValueScanner> scanners) throws IOException {
- boolean res = false;
- if (heap == null) {
- // lazy init
- // In a normal reverse scan case, at the ReversedStoreScanner level before the
- // ReverseKeyValueheap is
- // created we do a seekToLastRow or backwardSeek. So that will happen
- // on all the scanners that the ReversedStoreSCanner is
- // made of. So when we get any of those call to this scanner we init the
- // heap here with ReversedKVHeap.
- if (CellUtil.matchingRow(seekKey, HConstants.EMPTY_START_ROW)) {
- for (KeyValueScanner scanner : scanners) {
- res |= scanner.seekToLastRow();
- }
- } else {
- for (KeyValueScanner scanner : scanners) {
- res |= scanner.backwardSeek(seekKey);
- }
- }
- this.heap = new ReversedKeyValueHeap(scanners, comparator);
- }
- return res;
- }
-
- /**
- * Returns the cell from the top-most scanner without advancing the iterator.
- * The backward traversal is assumed, only if specified explicitly
- */
- @Override
- public Cell peek() {
- if (closed) {
- return null;
- }
- if (this.heap != null) {
- return this.heap.peek();
- }
- // Doing this way in case some test cases tries to peek directly to avoid NPE
- return null;
- }
-
- /**
- * Gets the next cell from the top-most scanner. Assumed forward scanning.
- */
- @Override
- public Cell next() throws IOException {
- if (closed) {
- return null;
- }
- if(this.heap != null) {
- // loop over till the next suitable value
- // take next value from the heap
- for (Cell currentCell = heap.next();
- currentCell != null;
- currentCell = heap.next()) {
- // all the logic of presenting cells is inside the internal KeyValueScanners
- // located inside the heap
- return currentCell;
- }
- }
- return null;
- }
-
- /**
- * Set the scanner at the seek key. Assumed forward scanning.
- * Must be called only once: there is no thread safety between the scanner
- * and the memStore.
- *
- * @param cell seek value
- * @return false if the key is null or if there is no data
- */
- @Override
- public boolean seek(Cell cell) throws IOException {
- if (closed) {
- return false;
- }
- initForwardKVHeapIfNeeded(comparator, scanners);
-
- if (cell == null) {
- close();
- return false;
- }
-
- return heap.seek(cell);
- }
-
- /**
- * Move forward on the sub-lists set previously by seek. Assumed forward scanning.
- *
- * @param cell seek value (should be non-null)
- * @return true if there is at least one KV to read, false otherwise
- */
- @Override
- public boolean reseek(Cell cell) throws IOException {
- /*
- * See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
- * This code is executed concurrently with flush and puts, without locks.
- * Two points must be known when working on this code:
- * 1) It's not possible to use the 'kvTail' and 'snapshot'
- * variables, as they are modified during a flush.
- * 2) The ideal implementation for performance would use the sub skip list
- * implicitly pointed by the iterators 'kvsetIt' and
- * 'snapshotIt'. Unfortunately the Java API does not offer a method to
- * get it. So we remember the last keys we iterated to and restore
- * the reseeked set to at least that point.
- *
- * TODO: The above comment copied from the original MemStoreScanner
- */
- if (closed) {
- return false;
- }
- initForwardKVHeapIfNeeded(comparator, scanners);
- return heap.reseek(cell);
- }
-
- /**
- * MemStoreScanner returns Long.MAX_VALUE because it will always have the latest data among all
- * scanners.
- * @see KeyValueScanner#getScannerOrder()
- */
- @Override
- public long getScannerOrder() {
- return Long.MAX_VALUE;
- }
-
- @Override
- public void close() {
- if (closed) {
- return;
- }
- // Ensuring that all the segment scanners are closed
- if (heap != null) {
- heap.close();
- // It is safe to do close as no new calls will be made to this scanner.
- heap = null;
- } else {
- for (KeyValueScanner scanner : scanners) {
- scanner.close();
- }
- }
- closed = true;
- }
-
- /**
- * Set the scanner at the seek key. Assumed backward scanning.
- *
- * @param cell seek value
- * @return false if the key is null or if there is no data
- */
- @Override
- public boolean backwardSeek(Cell cell) throws IOException {
- // The first time when this happens it sets the scanners to the seek key
- // passed by the incoming scan's start row
- if (closed) {
- return false;
- }
- initReverseKVHeapIfNeeded(cell, comparator, scanners);
- return heap.backwardSeek(cell);
- }
-
- /**
- * Assumed backward scanning.
- *
- * @param cell seek value
- * @return false if the key is null or if there is no data
- */
- @Override
- public boolean seekToPreviousRow(Cell cell) throws IOException {
- if (closed) {
- return false;
- }
- initReverseKVHeapIfNeeded(cell, comparator, scanners);
- if (heap.peek() == null) {
- restartBackwardHeap(cell);
- }
- return heap.seekToPreviousRow(cell);
- }
-
- @Override
- public boolean seekToLastRow() throws IOException {
- if (closed) {
- return false;
- }
- return initReverseKVHeapIfNeeded(KeyValue.LOWESTKEY, comparator, scanners);
- }
-
- /**
- * Check if this memstore may contain the required keys
- * @return False if the key definitely does not exist in this Memstore
- */
- @Override
- public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
- // TODO : Check if this can be removed.
- if (inmemoryCompaction) {
- return true;
- }
-
- for (KeyValueScanner sc : scanners) {
- if (sc.shouldUseScanner(scan, store, oldestUnexpiredTS)) {
- return true;
- }
- }
- return false;
- }
-
- // debug method
- @Override
- public String toString() {
- StringBuffer buf = new StringBuffer();
- int i = 1;
- for (KeyValueScanner scanner : scanners) {
- buf.append("scanner (" + i + ") " + scanner.toString() + " ||| ");
- i++;
- }
- return buf.toString();
- }
- /****************** Private methods ******************/
- /**
- * Restructure the ended backward heap after rerunning a seekToPreviousRow()
- * on each scanner
- * @return false if given Cell does not exist in any scanner
- */
- private boolean restartBackwardHeap(Cell cell) throws IOException {
- boolean res = false;
- for (KeyValueScanner scan : scanners) {
- res |= scan.seekToPreviousRow(cell);
- }
- this.heap =
- new ReversedKeyValueHeap(scanners, comparator);
- return res;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSegmentsIterator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSegmentsIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSegmentsIterator.java
index 7728534..048f746 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSegmentsIterator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSegmentsIterator.java
@@ -20,11 +20,10 @@
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import java.io.IOException;
-import java.util.*;
+import java.util.Iterator;
/**
* The MemStoreSegmentsIterator is designed to perform one iteration over given list of segments
@@ -35,29 +34,11 @@ import java.util.*;
@InterfaceAudience.Private
public abstract class MemStoreSegmentsIterator implements Iterator<Cell> {
- // scanner for full or partial pipeline (heap of segment scanners)
- // we need to keep those scanners in order to close them at the end
- protected KeyValueScanner scanner;
-
protected final ScannerContext scannerContext;
-
// C-tor
- public MemStoreSegmentsIterator(List<ImmutableSegment> segments, CellComparator comparator,
- int compactionKVMax, Store store) throws IOException {
-
+ public MemStoreSegmentsIterator(int compactionKVMax) throws IOException {
this.scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
-
- // list of Scanners of segments in the pipeline, when compaction starts
- List<KeyValueScanner> scanners = new ArrayList<>();
-
- // create the list of scanners to traverse over all the data
- // no dirty reads here as these are immutable segments
- for (ImmutableSegment segment : segments) {
- scanners.add(segment.getScanner(Integer.MAX_VALUE));
- }
-
- scanner = new MemStoreScanner(comparator, scanners, true);
}
public abstract void close();
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
index 3858b1c..dd7f957 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import java.util.List;
/**
* Holds details of the snapshot taken on a MemStore. Details include the snapshot's identifier,
* count of cells in it and total memory size occupied by all the cells, timestamp information of
@@ -31,7 +32,7 @@ public class MemStoreSnapshot {
private final long dataSize;
private final long heapSize;
private final TimeRangeTracker timeRangeTracker;
- private final KeyValueScanner scanner;
+ private final List<KeyValueScanner> scanners;
private final boolean tagsPresent;
public MemStoreSnapshot(long id, ImmutableSegment snapshot) {
@@ -40,7 +41,7 @@ public class MemStoreSnapshot {
this.dataSize = snapshot.keySize();
this.heapSize = snapshot.heapSize();
this.timeRangeTracker = snapshot.getTimeRangeTracker();
- this.scanner = snapshot.getSnapshotScanner();
+ this.scanners = snapshot.getScanners(Long.MAX_VALUE, Long.MAX_VALUE);
this.tagsPresent = snapshot.isTagsPresent();
}
@@ -66,21 +67,21 @@ public class MemStoreSnapshot {
}
public long getHeapSize() {
- return this.heapSize;
+ return heapSize;
}
/**
* @return {@link TimeRangeTracker} for all the Cells in the snapshot.
*/
public TimeRangeTracker getTimeRangeTracker() {
- return this.timeRangeTracker;
+ return timeRangeTracker;
}
/**
* @return {@link KeyValueScanner} for iterating over the snapshot
*/
- public KeyValueScanner getScanner() {
- return this.scanner;
+ public List<KeyValueScanner> getScanners() {
+ return scanners;
}
/**
@@ -89,4 +90,4 @@ public class MemStoreSnapshot {
public boolean isTagsPresent() {
return this.tagsPresent;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index 925e349..64823b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -629,17 +629,16 @@ public class RegionCoprocessorHost
/**
* See
- * {@link RegionObserver#preFlushScannerOpen(ObserverContext,
- * Store, KeyValueScanner, InternalScanner, long)}
+ * {@link RegionObserver#preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)}
*/
public InternalScanner preFlushScannerOpen(final Store store,
- final KeyValueScanner memstoreScanner, final long readPoint) throws IOException {
+ final List<KeyValueScanner> scanners, final long readPoint) throws IOException {
return execOperationWithResult(null,
coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
@Override
public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
throws IOException {
- setResult(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, getResult(), readPoint));
+ setResult(oserver.preFlushScannerOpen(ctx, store, scanners, getResult(), readPoint));
}
});
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
index 452cca8..6f431c9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
@@ -18,7 +18,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.SortedSet;
@@ -102,7 +102,7 @@ public abstract class Segment {
* Creates the scanner for the given read point
* @return a scanner for the given read point
*/
- public KeyValueScanner getScanner(long readPoint) {
+ protected KeyValueScanner getScanner(long readPoint) {
return new SegmentScanner(this, readPoint);
}
@@ -115,9 +115,7 @@ public abstract class Segment {
}
public List<KeyValueScanner> getScanners(long readPoint, long order) {
- List<KeyValueScanner> scanners = new ArrayList<>(1);
- scanners.add(getScanner(readPoint, order));
- return scanners;
+ return Collections.singletonList(new SegmentScanner(this, readPoint, order));
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
index 5e2e36f..2727360 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.SortedSet;
+import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -280,16 +281,11 @@ public class SegmentScanner implements KeyValueScanner {
public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
return getSegment().shouldSeek(scan,oldestUnexpiredTS);
}
- /**
- * This scanner is working solely on the in-memory MemStore therefore this
- * interface is not relevant.
- */
+
@Override
public boolean requestSeek(Cell c, boolean forward, boolean useBloom)
throws IOException {
-
- throw new IllegalStateException(
- "requestSeek cannot be called on MutableCellSetSegmentScanner");
+ return NonLazyKeyValueScanner.doRealSeek(this, c, forward);
}
/**
@@ -309,8 +305,7 @@ public class SegmentScanner implements KeyValueScanner {
*/
@Override
public void enforceSeek() throws IOException {
- throw new IllegalStateException(
- "enforceSeek cannot be called on MutableCellSetSegmentScanner");
+ throw new NotImplementedException("enforceSeek cannot be called on a SegmentScanner");
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotScanner.java
deleted file mode 100644
index 6300e00..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotScanner.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Scan;
-
-/**
- * Scans the snapshot. Acts as a simple scanner that just iterates over all the cells
- * in the segment
- */
-@InterfaceAudience.Private
-public class SnapshotScanner extends SegmentScanner {
-
- public SnapshotScanner(Segment immutableSegment) {
- // Snapshot scanner does not need readpoint. It should read all the cells in the
- // segment
- super(immutableSegment, Long.MAX_VALUE);
- }
-
- @Override
- public Cell peek() { // sanity check, the current should be always valid
- if (closed) {
- return null;
- }
- return current;
- }
-
- @Override
- public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
- return true;
- }
-
- @Override
- public boolean backwardSeek(Cell key) throws IOException {
- throw new NotImplementedException(
- "backwardSeek must not be called on a " + "non-reversed scanner");
- }
-
- @Override
- public boolean seekToPreviousRow(Cell key) throws IOException {
- throw new NotImplementedException(
- "seekToPreviousRow must not be called on a " + "non-reversed scanner");
- }
-
- @Override
- public boolean seekToLastRow() throws IOException {
- throw new NotImplementedException(
- "seekToLastRow must not be called on a " + "non-reversed scanner");
- }
-
- @Override
- protected Iterator<Cell> getIterator(Cell cell) {
- return segment.iterator();
- }
-
- @Override
- protected void updateCurrent() {
- if (iter.hasNext()) {
- current = iter.next();
- } else {
- current = null;
- }
- }
-
- @Override
- public boolean seek(Cell seekCell) {
- // restart iterator
- iter = getIterator(seekCell);
- return reseek(seekCell);
- }
-
- @Override
- public boolean reseek(Cell seekCell) {
- while (iter.hasNext()) {
- Cell next = iter.next();
- int ret = segment.getComparator().compare(next, seekCell);
- if (ret >= 0) {
- current = next;
- return true;
- }
- }
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
index 23fae6a..298f3d4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
@@ -74,22 +74,22 @@ abstract class StoreFlusher {
/**
* Creates the scanner for flushing snapshot. Also calls coprocessors.
- * @param snapshotScanner
+ * @param snapshotScanners
* @param smallestReadPoint
* @return The scanner; null if coprocessor is canceling the flush.
*/
- protected InternalScanner createScanner(KeyValueScanner snapshotScanner,
+ protected InternalScanner createScanner(List<KeyValueScanner> snapshotScanners,
long smallestReadPoint) throws IOException {
InternalScanner scanner = null;
if (store.getCoprocessorHost() != null) {
- scanner = store.getCoprocessorHost().preFlushScannerOpen(store, snapshotScanner,
+ scanner = store.getCoprocessorHost().preFlushScannerOpen(store, snapshotScanners,
smallestReadPoint);
}
if (scanner == null) {
Scan scan = new Scan();
scan.setMaxVersions(store.getScanInfo().getMaxVersions());
scanner = new StoreScanner(store, store.getScanInfo(), scan,
- Collections.singletonList(snapshotScanner), ScanType.COMPACT_RETAIN_DELETES,
+ snapshotScanners, ScanType.COMPACT_RETAIN_DELETES,
smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
}
assert scanner != null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
index 85bae9d..3f9688d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
@@ -62,7 +62,7 @@ public class StripeStoreFlusher extends StoreFlusher {
if (cellsCount == 0) return result; // don't flush if there are no entries
long smallestReadPoint = store.getSmallestReadPoint();
- InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
+ InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint);
if (scanner == null) {
return result; // NULL scanner returned from coprocessor hooks means skip normal processing
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
index ec4601c..24b5051 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
@@ -187,7 +187,7 @@ public class SimpleRegionObserver implements RegionObserver {
@Override
public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
- Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
+ Store store, List<KeyValueScanner> scanners, InternalScanner s) throws IOException {
ctPreFlushScannerOpen.incrementAndGet();
return null;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
index ce36af8..80d0e3a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
@@ -50,7 +49,6 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.Region;
@@ -122,11 +120,11 @@ public class TestRegionObserverScannerOpenHook {
public static class NoDataFromFlush implements RegionObserver {
@Override
public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
- Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
+ Store store, List<KeyValueScanner> scanners, InternalScanner s) throws IOException {
Scan scan = new Scan();
scan.setFilter(new NoDataFilter());
return new StoreScanner(store, store.getScanInfo(), scan,
- Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
+ scanners, ScanType.COMPACT_RETAIN_DELETES,
store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java
index 2d096fa..c47ed68 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java
@@ -43,13 +43,13 @@ public class NoOpScanPolicyObserver implements RegionObserver {
*/
@Override
public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
- Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
+ Store store, List<KeyValueScanner> scanners, InternalScanner s) throws IOException {
ScanInfo oldSI = store.getScanInfo();
ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), store.getFamily(), oldSI.getTtl(),
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
Scan scan = new Scan();
scan.setMaxVersions(oldSI.getMaxVersions());
- return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner),
+ return new StoreScanner(store, scanInfo, scan, scanners,
ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
index 09ddd6f..a888c45 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
@@ -384,8 +384,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
memstore.add(new KeyValue(row, fam, qf4, val), null);
memstore.add(new KeyValue(row, fam, qf5, val), null);
assertEquals(2, memstore.getActive().getCellsCount());
- // close the scanner
- snapshot.getScanner().close();
+ // close the scanners
+ for(KeyValueScanner scanner : snapshot.getScanners()) {
+ scanner.close();
+ }
memstore.clearSnapshot(snapshot.getId());
int chunkCount = chunkPool.getPoolSize();
@@ -426,8 +428,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
List<KeyValueScanner> scanners = memstore.getScanners(0);
// Shouldn't putting back the chunks to pool,since some scanners are opening
// based on their data
- // close the scanner
- snapshot.getScanner().close();
+ // close the scanners
+ for(KeyValueScanner scanner : snapshot.getScanners()) {
+ scanner.close();
+ }
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkPool.getPoolSize() == 0);
@@ -455,8 +459,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
}
// Since no opening scanner, the chunks of snapshot should be put back to
// pool
- // close the scanner
- snapshot.getScanner().close();
+ // close the scanners
+ for(KeyValueScanner scanner : snapshot.getScanners()) {
+ scanner.close();
+ }
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkPool.getPoolSize() > 0);
}
@@ -524,8 +530,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
// Creating another snapshot
MemStoreSnapshot snapshot = memstore.snapshot();
- // close the scanner
- snapshot.getScanner().close();
+ // close the scanners
+ for(KeyValueScanner scanner : snapshot.getScanners()) {
+ scanner.close();
+ }
memstore.clearSnapshot(snapshot.getId());
snapshot = memstore.snapshot();
@@ -540,8 +548,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
}
// Since no opening scanner, the chunks of snapshot should be put back to
// pool
- // close the scanner
- snapshot.getScanner().close();
+ // close the scanners
+ for(KeyValueScanner scanner : snapshot.getScanners()) {
+ scanner.close();
+ }
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkPool.getPoolSize() > 0);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
index a9f8a97..5a48455 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
@@ -316,13 +316,17 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
}
List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE);
// seek
- scanners.get(0).seek(KeyValue.LOWESTKEY);
int count = 0;
- while (scanners.get(0).next() != null) {
- count++;
+ for(int i = 0; i < scanners.size(); i++) {
+ scanners.get(i).seek(KeyValue.LOWESTKEY);
+ while (scanners.get(i).next() != null) {
+ count++;
+ }
}
assertEquals("the count should be ", count, 150);
- scanners.get(0).close();
+ for(int i = 0; i < scanners.size(); i++) {
+ scanners.get(i).close();
+ }
}
@Test
@@ -337,7 +341,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
// Just doing the cnt operation here
MemStoreSegmentsIterator itr = new MemStoreMergerSegmentsIterator(
((CompactingMemStore) memstore).getImmutableSegments().getStoreSegments(),
- CellComparator.COMPARATOR, 10, ((CompactingMemStore) memstore).getStore());
+ CellComparator.COMPARATOR, 10);
int cnt = 0;
try {
while (itr.next() != null) {
@@ -398,8 +402,10 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
List<KeyValueScanner> scanners = memstore.getScanners(0);
// Shouldn't putting back the chunks to pool,since some scanners are opening
// based on their data
- // close the scanner
- snapshot.getScanner().close();
+ // close the scanners
+ for(KeyValueScanner scanner : snapshot.getScanners()) {
+ scanner.close();
+ }
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkPool.getPoolSize() == 0);
@@ -427,8 +433,10 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
}
// Since no opening scanner, the chunks of snapshot should be put back to
// pool
- // close the scanner
- snapshot.getScanner().close();
+ // close the scanners
+ for(KeyValueScanner scanner : snapshot.getScanners()) {
+ scanner.close();
+ }
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkPool.getPoolSize() > 0);
}
@@ -458,8 +466,10 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
memstore.add(new KeyValue(row, fam, qf4, val), null);
memstore.add(new KeyValue(row, fam, qf5, val), null);
assertEquals(2, memstore.getActive().getCellsCount());
- // close the scanner
- snapshot.getScanner().close();
+ // close the scanners
+ for(KeyValueScanner scanner : snapshot.getScanners()) {
+ scanner.close();
+ }
memstore.clearSnapshot(snapshot.getId());
int chunkCount = chunkPool.getPoolSize();
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index e76da5a..7434eb1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -264,12 +264,20 @@ public class TestDefaultMemStore {
protected void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2)
throws IOException {
List<KeyValueScanner> memstorescanners = this.memstore.getScanners(mvcc.getReadPoint());
- assertEquals(1, memstorescanners.size());
- final KeyValueScanner scanner = memstorescanners.get(0);
- scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW));
- assertEquals(kv1, scanner.next());
- assertEquals(kv2, scanner.next());
- assertNull(scanner.next());
+ assertEquals(2, memstorescanners.size());
+ final KeyValueScanner scanner0 = memstorescanners.get(0);
+ final KeyValueScanner scanner1 = memstorescanners.get(1);
+ scanner0.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW));
+ scanner1.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW));
+ Cell n0 = scanner0.next();
+ Cell n1 = scanner1.next();
+ assertTrue(kv1.equals(n0) || kv1.equals(n1));
+ assertTrue(kv2.equals(n0)
+ || kv2.equals(n1)
+ || kv2.equals(scanner0.next())
+ || kv2.equals(scanner1.next()));
+ assertNull(scanner0.next());
+ assertNull(scanner1.next());
}
protected void assertScannerResults(KeyValueScanner scanner, KeyValue[] expected)
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
index 42aad5c..37a7664 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
@@ -138,7 +138,9 @@ public class TestMemStoreChunkPool {
memstore.add(new KeyValue(row, fam, qf5, val), null);
assertEquals(2, memstore.getActive().getCellsCount());
// close the scanner - this is how the snapshot will be used
- snapshot.getScanner().close();
+ for(KeyValueScanner scanner : snapshot.getScanners()) {
+ scanner.close();
+ }
memstore.clearSnapshot(snapshot.getId());
int chunkCount = chunkPool.getPoolSize();
@@ -182,7 +184,9 @@ public class TestMemStoreChunkPool {
// Shouldn't putting back the chunks to pool,since some scanners are opening
// based on their data
// close the snapshot scanner
- snapshot.getScanner().close();
+ for(KeyValueScanner scanner : snapshot.getScanners()) {
+ scanner.close();
+ }
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkPool.getPoolSize() == 0);
@@ -209,8 +213,10 @@ public class TestMemStoreChunkPool {
}
// Since no opening scanner, the chunks of snapshot should be put back to
// pool
- // close the snapshot scanner
- snapshot.getScanner().close();
+ // close the snapshot scanners
+ for(KeyValueScanner scanner : snapshot.getScanners()) {
+ scanner.close();
+ }
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkPool.getPoolSize() > 0);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
index 69965ba..ecb808e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
@@ -120,7 +120,7 @@ public class TestReversibleScanners {
LOG.info("Setting read point to " + readPoint);
scanners = StoreFileScanner.getScannersForStoreFiles(
Collections.singletonList(sf), false, true, false, false, readPoint);
- seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint);
+ seekTestOfReversibleKeyValueScannerWithMVCC(scanners, readPoint);
}
}
@@ -135,7 +135,7 @@ public class TestReversibleScanners {
for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
LOG.info("Setting read point to " + readPoint);
scanners = memstore.getScanners(readPoint);
- seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint);
+ seekTestOfReversibleKeyValueScannerWithMVCC(scanners, readPoint);
}
}
@@ -560,38 +560,68 @@ public class TestReversibleScanners {
}
private void seekTestOfReversibleKeyValueScannerWithMVCC(
- KeyValueScanner scanner, int readPoint) throws IOException {
- /**
- * Test with MVCC
- */
- // Test seek to last row
- KeyValue expectedKey = getNextReadableKeyValueWithBackwardScan(
- ROWSIZE - 1, 0, readPoint);
- assertEquals(expectedKey != null, scanner.seekToLastRow());
- assertEquals(expectedKey, scanner.peek());
+ List<? extends KeyValueScanner> scanners, int readPoint) throws IOException {
+ /**
+ * Test with MVCC
+ */
+ // Test seek to last row
+ KeyValue expectedKey = getNextReadableKeyValueWithBackwardScan(
+ ROWSIZE - 1, 0, readPoint);
+ boolean res = false;
+ for (KeyValueScanner scanner : scanners) {
+ res |= scanner.seekToLastRow();
+ }
+ assertEquals(expectedKey != null, res);
+ res = false;
+ for (KeyValueScanner scanner : scanners) {
+ res |= (expectedKey.equals(scanner.peek()));
+ }
+ assertTrue(res);
// Test backward seek in two cases
// Case1: seek in the same row in backwardSeek
expectedKey = getNextReadableKeyValueWithBackwardScan(ROWSIZE - 2,
QUALSIZE - 2, readPoint);
- assertEquals(expectedKey != null, scanner.backwardSeek(expectedKey));
- assertEquals(expectedKey, scanner.peek());
+ res = false;
+ for (KeyValueScanner scanner : scanners) {
+ res |= scanner.backwardSeek(expectedKey);
+ }
+ assertEquals(expectedKey != null, res);
+ res = false;
+ for (KeyValueScanner scanner : scanners) {
+ res |= (expectedKey.equals(scanner.peek()));
+ }
+ assertTrue(res);
// Case2: seek to the previous row in backwardSeek
int seekRowNum = ROWSIZE - 3;
KeyValue seekKey = KeyValueUtil.createLastOnRow(ROWS[seekRowNum]);
expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0,
readPoint);
- assertEquals(expectedKey != null, scanner.backwardSeek(seekKey));
- assertEquals(expectedKey, scanner.peek());
+ res = false;
+ for (KeyValueScanner scanner : scanners) {
+ res |= scanner.backwardSeek(expectedKey);
+ }
+ res = false;
+ for (KeyValueScanner scanner : scanners) {
+ res |= (expectedKey.equals(scanner.peek()));
+ }
+ assertTrue(res);
// Test seek to previous row
seekRowNum = ROWSIZE - 4;
expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0,
readPoint);
- assertEquals(expectedKey != null, scanner.seekToPreviousRow(KeyValueUtil
- .createFirstOnRow(ROWS[seekRowNum])));
- assertEquals(expectedKey, scanner.peek());
+ res = false;
+ for (KeyValueScanner scanner : scanners) {
+ res |= scanner.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[seekRowNum]));
+ }
+ assertEquals(expectedKey != null, res);
+ res = false;
+ for (KeyValueScanner scanner : scanners) {
+ res |= (expectedKey.equals(scanner.peek()));
+ }
+ assertTrue(res);
}
private KeyValue getNextReadableKeyValueWithBackwardScan(int startRowNum,
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java
index caf8de9..27e93a0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -238,7 +237,7 @@ public class TestCoprocessorScanPolicy {
@Override
public InternalScanner preFlushScannerOpen(
final ObserverContext<RegionCoprocessorEnvironment> c,
- Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
+ Store store, List<KeyValueScanner> scanners, InternalScanner s) throws IOException {
Long newTtl = ttls.get(store.getTableName());
if (newTtl != null) {
System.out.println("PreFlush:" + newTtl);
@@ -253,7 +252,7 @@ public class TestCoprocessorScanPolicy {
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
Scan scan = new Scan();
scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions);
- return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner),
+ return new StoreScanner(store, scanInfo, scan, scanners,
ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(),
HConstants.OLDEST_TIMESTAMP);
}