You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/07/31 06:00:09 UTC
svn commit: r1367402 - in /hbase/branches/0.94/src:
main/java/org/apache/hadoop/hbase/coprocessor/
main/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/
test/java/org/apache/hadoop/hbase/client/ test/java/org/apache/hadoop/...
Author: larsh
Date: Tue Jul 31 04:00:08 2012
New Revision: 1367402
URL: http://svn.apache.org/viewvc?rev=1367402&view=rev
Log:
HBASE-6427 Pluggable compaction and scan policies via coprocessors
Added:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/ScanType.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionWithCoprocessor.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java?rev=1367402&r1=1367401&r2=1367402&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java Tue Jul 31 04:00:08 2012
@@ -17,7 +17,7 @@
package org.apache.hadoop.hbase.coprocessor;
import java.util.List;
-import java.util.Map;
+import java.util.NavigableSet;
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
@@ -34,7 +34,9 @@ import org.apache.hadoop.hbase.filter.Co
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
@@ -70,6 +72,13 @@ public abstract class BaseRegionObserver
boolean abortRequested) { }
@Override
+ public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Store store, final KeyValueScanner memstoreScanner, final InternalScanner s)
+ throws IOException {
+ return null;
+ }
+
+ @Override
public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
}
@@ -78,6 +87,17 @@ public abstract class BaseRegionObserver
}
@Override
+ public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
+ InternalScanner scanner) throws IOException {
+ return scanner;
+ }
+
+ @Override
+ public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
+ StoreFile resultFile) throws IOException {
+ }
+
+ @Override
public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
}
@@ -101,6 +121,13 @@ public abstract class BaseRegionObserver
}
@Override
+ public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Store store, List<? extends KeyValueScanner> scanners, final ScanType scanType,
+ final long earliestPutTs, final InternalScanner s) throws IOException {
+ return null;
+ }
+
+ @Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, final Store store,
final StoreFile resultFile) throws IOException {
}
@@ -237,6 +264,13 @@ public abstract class BaseRegionObserver
}
@Override
+ public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Store store, final Scan scan, final NavigableSet<byte[]> targetCols,
+ final KeyValueScanner s) throws IOException {
+ return null;
+ }
+
+ @Override
public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
final Scan scan, final RegionScanner s) throws IOException {
return s;
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java?rev=1367402&r1=1367401&r2=1367402&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java Tue Jul 31 04:00:08 2012
@@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.coproces
import java.io.IOException;
import java.util.List;
+import java.util.NavigableSet;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -33,9 +34,12 @@ import org.apache.hadoop.hbase.filter.Co
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -61,20 +65,63 @@ public interface RegionObserver extends
void postOpen(final ObserverContext<RegionCoprocessorEnvironment> c);
/**
+ * Called before a memstore is flushed to disk and prior to creating the scanner to read from
+ * the memstore. To override or modify how a memstore is flushed,
+ * implementing classes can return a new scanner to provide the KeyValues to be
+ * stored into the new {@code StoreFile} or null to perform the default processing.
+ * Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
+ * effect in this hook.
+ * @param c the environment provided by the region server
+ * @param store the store being flushed
+ * @param memstoreScanner the scanner for the memstore that is flushed
+ * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
+ * @return the scanner to use during the flush. {@code null} if the default implementation
+ * is to be used.
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Store store, final KeyValueScanner memstoreScanner, final InternalScanner s)
+ throws IOException;
+
+ /**
* Called before the memstore is flushed to disk.
* @param c the environment provided by the region server
* @throws IOException if an error occurred on the coprocessor
+ * @deprecated use {@link #preFlush(ObserverContext, Store, InternalScanner)} instead
*/
void preFlush(final ObserverContext<RegionCoprocessorEnvironment> c) throws IOException;
/**
+ * Called before a Store's memstore is flushed to disk.
+ * @param c the environment provided by the region server
+ * @param store the store where compaction is being requested
+ * @param scanner the scanner over existing data used in the store file
+ * @return the scanner to use during compaction. Should not be {@code null}
+ * unless the implementation is writing new store files on its own.
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ InternalScanner preFlush(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
+ final InternalScanner scanner) throws IOException;
+
+ /**
* Called after the memstore is flushed to disk.
* @param c the environment provided by the region server
* @throws IOException if an error occurred on the coprocessor
+ * @deprecated use {@link #preFlush(ObserverContext, Store, InternalScanner)} instead.
*/
void postFlush(final ObserverContext<RegionCoprocessorEnvironment> c) throws IOException;
/**
+ * Called after a Store's memstore is flushed to disk.
+ * @param c the environment provided by the region server
+ * @param store the store being flushed
+ * @param resultFile the new store file written out during compaction
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ void postFlush(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
+ final StoreFile resultFile) throws IOException;
+
+ /**
* Called prior to selecting the {@link StoreFile}s to compact from the list
* of available candidates. To alter the files used for compaction, you may
* mutate the passed in list of candidates.
@@ -124,6 +171,29 @@ public interface RegionObserver extends
final Store store, final InternalScanner scanner) throws IOException;
/**
+ * Called prior to writing the {@link StoreFile}s selected for compaction into
+ * a new {@code StoreFile} and prior to creating the scanner used to read the
+ * input files. To override or modify the compaction process,
+ * implementing classes can return a new scanner to provide the KeyValues to be
+ * stored into the new {@code StoreFile} or null to perform the default processing.
+ * Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
+ * effect in this hook.
+ * @param c the environment provided by the region server
+ * @param store the store being compacted
+ * @param scanners the list {@link StoreFileScanner}s to be read from
+ * @param scantype the {@link ScanType} indicating whether this is a major or minor compaction
+ * @param earliestPutTs timestamp of the earliest put that was found in any of the involved
+ * store files
+ * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
+ * @return the scanner to use during compaction. {@code null} if the default implementation
+ * is to be used.
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Store store, List<? extends KeyValueScanner> scanners, final ScanType scanType,
+ final long earliestPutTs, final InternalScanner s) throws IOException;
+
+ /**
* Called after compaction has completed and the new store file has been
* moved in to place.
* @param c the environment provided by the region server
@@ -546,6 +616,30 @@ public interface RegionObserver extends
throws IOException;
/**
+ * Called before a store opens a new scanner.
+ * This hook is called when a "user" scanner is opened.
+ * <p>
+ * See {@link #preFlushScannerOpen(ObserverContext, Store, KeyValueScanner, InternalScanner)}
+ * and {@link #preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner)}
+ * to override scanners created for flushes or compactions, resp.
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors.
+ * Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
+ * effect in this hook.
+ * @param c the environment provided by the region server
+ * @param store the store being scanned
+ * @param scan the Scan specification
+ * @param targetCols columns to be used in the scanner
+ * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
+ * @return a KeyValueScanner instance to use or {@code null} to use the default implementation
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Store store, final Scan scan, final NavigableSet<byte[]> targetCols,
+ final KeyValueScanner s) throws IOException;
+
+ /**
* Called after the client opens a new scanner.
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1367402&r1=1367401&r2=1367402&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Jul 31 04:00:08 2012
@@ -1129,7 +1129,7 @@ public class HRegion implements HeapSize
* @param majorCompaction True to force a major compaction regardless of thresholds
* @throws IOException e
*/
- void compactStores(final boolean majorCompaction)
+ public void compactStores(final boolean majorCompaction)
throws IOException {
if (majorCompaction) {
this.triggerMajorCompaction();
@@ -3367,7 +3367,7 @@ public class HRegion implements HeapSize
for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
- StoreScanner scanner = store.getScanner(scan, entry.getValue());
+ KeyValueScanner scanner = store.getScanner(scan, entry.getValue());
scanners.add(scanner);
}
this.storeHeap = new KeyValueHeap(scanners, comparator);
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1367402&r1=1367401&r2=1367402&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java Tue Jul 31 04:00:08 2012
@@ -304,6 +304,31 @@ public class RegionCoprocessorHost
}
/**
+ * See
+ * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner)}
+ */
+ public InternalScanner preCompactScannerOpen(Store store, List<StoreFileScanner> scanners,
+ ScanType scanType, long earliestPutTs) throws IOException {
+ ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+ InternalScanner s = null;
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ctx = ObserverContext.createAndPrepare(env, ctx);
+ try {
+ s = ((RegionObserver) env.getInstance()).preCompactScannerOpen(ctx, store, scanners,
+ scanType, earliestPutTs, s);
+ } catch (Throwable e) {
+ handleCoprocessorThrowable(env,e);
+ }
+ if (ctx.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return s;
+ }
+
+ /**
* Called prior to selecting the {@link StoreFile}s for compaction from
* the list of currently available candidates.
* @param store The store where compaction is being requested
@@ -389,7 +414,7 @@ public class RegionCoprocessorHost
* Called after the store compaction has completed.
* @param store the store being compacted
* @param resultFile the new store file written during compaction
- * @throws IOException
+ * @throws IOException
*/
public void postCompact(Store store, StoreFile resultFile) throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
@@ -410,6 +435,31 @@ public class RegionCoprocessorHost
/**
* Invoked before a memstore flush
+ * @throws IOException
+ */
+ public InternalScanner preFlush(Store store, InternalScanner scanner) throws IOException {
+ ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+ boolean bypass = false;
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ctx = ObserverContext.createAndPrepare(env, ctx);
+ try {
+ scanner = ((RegionObserver)env.getInstance()).preFlush(
+ ctx, store, scanner);
+ } catch (Throwable e) {
+ handleCoprocessorThrowable(env,e);
+ }
+ bypass |= ctx.shouldBypass();
+ if (ctx.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return bypass ? null : scanner;
+ }
+
+ /**
+ * Invoked before a memstore flush
* @throws IOException
*/
public void preFlush() throws IOException {
@@ -430,8 +480,31 @@ public class RegionCoprocessorHost
}
/**
+ * See
+ * {@link RegionObserver#preFlush(ObserverContext, Store, KeyValueScanner)}
+ */
+ public InternalScanner preFlushScannerOpen(Store store, KeyValueScanner memstoreScanner) throws IOException {
+ ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+ InternalScanner s = null;
+ for (RegionEnvironment env : coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ctx = ObserverContext.createAndPrepare(env, ctx);
+ try {
+ s = ((RegionObserver) env.getInstance()).preFlushScannerOpen(ctx, store, memstoreScanner, s);
+ } catch (Throwable e) {
+ handleCoprocessorThrowable(env, e);
+ }
+ if (ctx.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return s;
+ }
+
+ /**
* Invoked after a memstore flush
- * @throws IOException
+ * @throws IOException
*/
public void postFlush() throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
@@ -451,8 +524,29 @@ public class RegionCoprocessorHost
}
/**
+ * Invoked after a memstore flush
+ * @throws IOException
+ */
+ public void postFlush(final Store store, final StoreFile storeFile) throws IOException {
+ ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ctx = ObserverContext.createAndPrepare(env, ctx);
+ try {
+ ((RegionObserver)env.getInstance()).postFlush(ctx, store, storeFile);
+ } catch (Throwable e) {
+ handleCoprocessorThrowable(env, e);
+ }
+ if (ctx.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ }
+
+ /**
* Invoked just before a split
- * @throws IOException
+ * @throws IOException
*/
public void preSplit() throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
@@ -1089,6 +1183,31 @@ public class RegionCoprocessorHost
}
/**
+ * See
+ * {@link RegionObserver#preStoreScannerOpen(ObserverContext, Store, Scan, NavigableSet, KeyValueScanner)}
+ */
+ public KeyValueScanner preStoreScannerOpen(Store store, Scan scan,
+ final NavigableSet<byte[]> targetCols) throws IOException {
+ KeyValueScanner s = null;
+ ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ctx = ObserverContext.createAndPrepare(env, ctx);
+ try {
+ s = ((RegionObserver) env.getInstance()).preStoreScannerOpen(ctx, store, scan,
+ targetCols, s);
+ } catch (Throwable e) {
+ handleCoprocessorThrowable(env, e);
+ }
+ if (ctx.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return s;
+ }
+
+ /**
* @param scan the Scan specification
* @param s the scanner
* @return the scanner instance to use
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java?rev=1367402&r1=1367401&r2=1367402&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java Tue Jul 31 04:00:08 2012
@@ -33,8 +33,6 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
-
/**
* A query matcher that is specifically designed for the scan case.
*/
@@ -136,7 +134,7 @@ public class ScanQueryMatcher {
* based on TTL
*/
public ScanQueryMatcher(Scan scan, Store.ScanInfo scanInfo,
- NavigableSet<byte[]> columns, StoreScanner.ScanType scanType,
+ NavigableSet<byte[]> columns, ScanType scanType,
long readPointToUse, long earliestPutTs, long oldestUnexpiredTS) {
this.tr = scan.getTimeRange();
this.rowComparator = scanInfo.getComparator().getRawComparator();
@@ -183,7 +181,7 @@ public class ScanQueryMatcher {
*/
ScanQueryMatcher(Scan scan, Store.ScanInfo scanInfo,
NavigableSet<byte[]> columns, long oldestUnexpiredTS) {
- this(scan, scanInfo, columns, StoreScanner.ScanType.USER_SCAN,
+ this(scan, scanInfo, columns, ScanType.USER_SCAN,
Long.MAX_VALUE, /* max Readpoint to track versions */
HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS);
}
Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/ScanType.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/ScanType.java?rev=1367402&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/ScanType.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/ScanType.java Tue Jul 31 04:00:08 2012
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Enum to distinguish general scan types.
+ */
+@InterfaceAudience.Private
+public enum ScanType {
+ MAJOR_COMPACT,
+ MINOR_COMPACT,
+ USER_SCAN
+}
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1367402&r1=1367401&r2=1367402&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Jul 31 04:00:08 2012
@@ -62,7 +62,6 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
-import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -225,9 +224,7 @@ public class Store extends SchemaConfigu
Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
LOG.info("time to purge deletes set to " + timeToPurgeDeletes +
"ms in store " + this);
- scanInfo = new ScanInfo(family.getName(), family.getMinVersions(),
- family.getMaxVersions(), ttl, family.getKeepDeletedCells(),
- timeToPurgeDeletes, this.comparator);
+ scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
this.memstore = new MemStore(conf, this.comparator);
this.storeNameStr = getColumnFamilyName();
@@ -698,15 +695,30 @@ public class Store extends SchemaConfigu
if (set.size() == 0) {
return null;
}
- Scan scan = new Scan();
- scan.setMaxVersions(scanInfo.getMaxVersions());
// Use a store scanner to find which rows to flush.
// Note that we need to retain deletes, hence
// treat this as a minor compaction.
- InternalScanner scanner = new StoreScanner(this, scan, Collections
- .singletonList(new CollectionBackedScanner(set, this.comparator)),
- ScanType.MINOR_COMPACT, this.region.getSmallestReadPoint(),
- HConstants.OLDEST_TIMESTAMP);
+ InternalScanner scanner = null;
+ KeyValueScanner memstoreScanner = new CollectionBackedScanner(set, this.comparator);
+ if (getHRegion().getCoprocessorHost() != null) {
+ scanner = getHRegion().getCoprocessorHost().preFlushScannerOpen(this, memstoreScanner);
+ }
+ if (scanner == null) {
+ Scan scan = new Scan();
+ scan.setMaxVersions(scanInfo.getMaxVersions());
+ scanner = new StoreScanner(this, scanInfo, scan, Collections.singletonList(new CollectionBackedScanner(
+ set, this.comparator)), ScanType.MINOR_COMPACT, this.region.getSmallestReadPoint(),
+ HConstants.OLDEST_TIMESTAMP);
+ }
+ if (getHRegion().getCoprocessorHost() != null) {
+ InternalScanner cpScanner =
+ getHRegion().getCoprocessorHost().preFlush(this, scanner);
+ // NULL scanner returned from coprocessor hooks means skip normal processing
+ if (cpScanner == null) {
+ return null;
+ }
+ scanner = cpScanner;
+ }
try {
// TODO: We can fail in the below block before we complete adding this
// flush to list of store files. Add cleanup of anything put on filesystem
@@ -1543,20 +1555,27 @@ public class Store extends SchemaConfigu
try {
InternalScanner scanner = null;
try {
- Scan scan = new Scan();
- scan.setMaxVersions(family.getMaxVersions());
- /* include deletes, unless we are doing a major compaction */
- scanner = new StoreScanner(this, scan, scanners,
- majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
+ if (getHRegion().getCoprocessorHost() != null) {
+ scanner = getHRegion()
+ .getCoprocessorHost()
+ .preCompactScannerOpen(this, scanners,
+ majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs);
+ }
+ if (scanner == null) {
+ Scan scan = new Scan();
+ scan.setMaxVersions(getFamily().getMaxVersions());
+ /* Include deletes, unless we are doing a major compaction */
+ scanner = new StoreScanner(this, getScanInfo(), scan, scanners,
+ majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
smallestReadPoint, earliestPutTs);
- if (region.getCoprocessorHost() != null) {
- InternalScanner cpScanner = region.getCoprocessorHost().preCompact(
- this, scanner);
+ }
+ if (getHRegion().getCoprocessorHost() != null) {
+ InternalScanner cpScanner =
+ getHRegion().getCoprocessorHost().preCompact(this, scanner);
// NULL scanner returned from coprocessor hooks means skip normal processing
if (cpScanner == null) {
return null;
}
-
scanner = cpScanner;
}
@@ -2036,11 +2055,18 @@ public class Store extends SchemaConfigu
* are not in a compaction.
* @throws IOException
*/
- public StoreScanner getScanner(Scan scan,
+ public KeyValueScanner getScanner(Scan scan,
final NavigableSet<byte []> targetCols) throws IOException {
lock.readLock().lock();
try {
- return new StoreScanner(this, scan, targetCols);
+ KeyValueScanner scanner = null;
+ if (getHRegion().getCoprocessorHost() != null) {
+ scanner = getHRegion().getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols);
+ }
+ if (scanner == null) {
+ scanner = new StoreScanner(this, getScanInfo(), scan, targetCols);
+ }
+ return scanner;
} finally {
lock.readLock().unlock();
}
@@ -2152,7 +2178,7 @@ public class Store extends SchemaConfigu
}
}
- HRegion getHRegion() {
+ public HRegion getHRegion() {
return this.region;
}
@@ -2255,6 +2281,12 @@ public class Store extends SchemaConfigu
}
storeFile = Store.this.commitFile(storeFilePath, cacheFlushId,
snapshotTimeRangeTracker, flushedSize, status);
+ if (Store.this.getHRegion().getCoprocessorHost() != null) {
+ Store.this.getHRegion()
+ .getCoprocessorHost()
+ .postFlush(Store.this, storeFile);
+ }
+
// Add new file to store files. Clear snapshot too while we have
// the Store write lock.
return Store.this.updateStorefiles(storeFile, snapshot);
@@ -2297,6 +2329,10 @@ public class Store extends SchemaConfigu
return comparator;
}
+ public ScanInfo getScanInfo() {
+ return scanInfo;
+ }
+
/**
* Immutable information for scans over a store.
*/
@@ -2314,6 +2350,17 @@ public class Store extends SchemaConfigu
+ Bytes.SIZEOF_LONG + Bytes.SIZEOF_BOOLEAN);
/**
+ * @param family {@link HColumnDescriptor} describing the column family
+ * @param ttl Store's TTL (in ms)
+ * @param timeToPurgeDeletes duration in ms after which a delete marker can
+ * be purged during a major compaction.
+ * @param comparator The store's comparator
+ */
+ public ScanInfo(HColumnDescriptor family, long ttl, long timeToPurgeDeletes, KVComparator comparator) {
+ this(family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl, family
+ .getKeepDeletedCells(), timeToPurgeDeletes, comparator);
+ }
+ /**
* @param family Name of this store's column family
* @param minVersions Store's MIN_VERSIONS setting
* @param maxVersions Store's VERSIONS setting
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1367402&r1=1367401&r2=1367402&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Tue Jul 31 04:00:08 2012
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HConstant
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.regionserver.Store.ScanInfo;
import org.apache.hadoop.hbase.regionserver.metrics.RegionMetricsStorage;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.Bytes;
@@ -41,7 +42,7 @@ import org.apache.hadoop.hbase.util.Envi
* Scanner scans both the memstore and the HStore. Coalesce KeyValue stream
* into List<KeyValue> for a single row.
*/
-class StoreScanner extends NonLazyKeyValueScanner
+public class StoreScanner extends NonLazyKeyValueScanner
implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
static final Log LOG = LogFactory.getLog(StoreScanner.class);
private Store store;
@@ -101,16 +102,16 @@ class StoreScanner extends NonLazyKeyVal
* @param columns which columns we are scanning
* @throws IOException
*/
- StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns)
+ public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns)
throws IOException {
- this(store, scan.getCacheBlocks(), scan, columns, store.scanInfo.getTtl(),
- store.scanInfo.getMinVersions());
+ this(store, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
+ scanInfo.getMinVersions());
initializeMetricNames();
if (columns != null && scan.isRaw()) {
throw new DoNotRetryIOException(
"Cannot specify any column for a raw scan");
}
- matcher = new ScanQueryMatcher(scan, store.scanInfo, columns,
+ matcher = new ScanQueryMatcher(scan, scanInfo, columns,
ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
oldestUnexpiredTS);
@@ -147,13 +148,13 @@ class StoreScanner extends NonLazyKeyVal
* @param smallestReadPoint the readPoint that we should use for tracking
* versions
*/
- StoreScanner(Store store, Scan scan,
+ public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
List<? extends KeyValueScanner> scanners, ScanType scanType,
long smallestReadPoint, long earliestPutTs) throws IOException {
- this(store, false, scan, null, store.scanInfo.getTtl(),
- store.scanInfo.getMinVersions());
+ this(store, false, scan, null, scanInfo.getTtl(),
+ scanInfo.getMinVersions());
initializeMetricNames();
- matcher = new ScanQueryMatcher(scan, store.scanInfo, null, scanType,
+ matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType,
smallestReadPoint, earliestPutTs, oldestUnexpiredTS);
// Filter the list of scanners using Bloom filters, time range, TTL, etc.
@@ -170,7 +171,7 @@ class StoreScanner extends NonLazyKeyVal
/** Constructor for testing. */
StoreScanner(final Scan scan, Store.ScanInfo scanInfo,
- StoreScanner.ScanType scanType, final NavigableSet<byte[]> columns,
+ ScanType scanType, final NavigableSet<byte[]> columns,
final List<KeyValueScanner> scanners) throws IOException {
this(scan, scanInfo, scanType, columns, scanners,
HConstants.LATEST_TIMESTAMP);
@@ -178,7 +179,7 @@ class StoreScanner extends NonLazyKeyVal
// Constructor for testing.
StoreScanner(final Scan scan, Store.ScanInfo scanInfo,
- StoreScanner.ScanType scanType, final NavigableSet<byte[]> columns,
+ ScanType scanType, final NavigableSet<byte[]> columns,
final List<KeyValueScanner> scanners, long earliestPutTs)
throws IOException {
this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
@@ -544,14 +545,5 @@ class StoreScanner extends NonLazyKeyVal
static void enableLazySeekGlobally(boolean enable) {
lazySeekEnabledGlobally = enable;
}
-
- /**
- * Enum to distinguish general scan types.
- */
- public static enum ScanType {
- MAJOR_COMPACT,
- MINOR_COMPACT,
- USER_SCAN
- }
}
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1367402&r1=1367401&r2=1367402&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Tue Jul 31 04:00:08 2012
@@ -758,6 +758,22 @@ public class HBaseTestingUtility {
this.hbaseCluster.flushcache(tableName);
}
+ /**
+ * Compact all regions in the mini hbase cluster
+ * @throws IOException
+ */
+ public void compact(boolean major) throws IOException {
+ this.hbaseCluster.compact(major);
+ }
+
+ /**
+ * Compact all of a table's reagion in the mini hbase cluster
+ * @throws IOException
+ */
+ public void compact(byte [] tableName, boolean major) throws IOException {
+ this.hbaseCluster.compact(tableName, major);
+ }
+
/**
* Create a table.
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java?rev=1367402&r1=1367401&r2=1367402&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java Tue Jul 31 04:00:08 2012
@@ -447,6 +447,34 @@ public class MiniHBaseCluster {
}
/**
+ * Call flushCache on all regions on all participating regionservers.
+ * @throws IOException
+ */
+ public void compact(boolean major) throws IOException {
+ for (JVMClusterUtil.RegionServerThread t:
+ this.hbaseCluster.getRegionServers()) {
+ for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) {
+ r.compactStores(major);
+ }
+ }
+ }
+
+ /**
+ * Call flushCache on all regions of the specified table.
+ * @throws IOException
+ */
+ public void compact(byte [] tableName, boolean major) throws IOException {
+ for (JVMClusterUtil.RegionServerThread t:
+ this.hbaseCluster.getRegionServers()) {
+ for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) {
+ if(Bytes.equals(r.getTableDesc().getName(), tableName)) {
+ r.compactStores(major);
+ }
+ }
+ }
+ }
+
+ /**
* @return List of region server threads.
*/
public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() {
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1367402&r1=1367401&r2=1367402&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java Tue Jul 31 04:00:08 2012
@@ -103,12 +103,12 @@ import org.junit.experimental.categories
@Category(LargeTests.class)
public class TestFromClientSide {
final Log LOG = LogFactory.getLog(getClass());
- private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static byte [] ROW = Bytes.toBytes("testRow");
private static byte [] FAMILY = Bytes.toBytes("testFamily");
private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
private static byte [] VALUE = Bytes.toBytes("testValue");
- private static int SLAVES = 3;
+ protected static int SLAVES = 3;
/**
* @throws java.lang.Exception
Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java?rev=1367402&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java Tue Jul 31 04:00:08 2012
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
+import org.apache.hadoop.hbase.regionserver.NoOpScanPolicyObserver;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test all client operations with a coprocessor that
+ * just implements the default flush/compact/scan policy
+ */
+@Category(LargeTests.class)
+public class TestFromClientSideWithCoprocessor extends TestFromClientSide {
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ MultiRowMutationEndpoint.class.getName(), NoOpScanPolicyObserver.class.getName());
+ // We need more than one region server in this test
+ TEST_UTIL.startMiniCluster(SLAVES);
+ }
+}
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java?rev=1367402&r1=1367401&r2=1367402&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java Tue Jul 31 04:00:08 2012
@@ -29,6 +29,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Arrays;
+import java.util.NavigableSet;
import com.google.common.collect.ImmutableList;
import org.apache.commons.logging.Log;
@@ -42,7 +43,9 @@ import org.apache.hadoop.hbase.client.Re
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -63,11 +66,13 @@ public class SimpleRegionObserver extend
boolean hadPreClose;
boolean hadPostClose;
boolean hadPreFlush;
+ boolean hadPreFlushScannerOpen;
boolean hadPostFlush;
boolean hadPreSplit;
boolean hadPostSplit;
boolean hadPreCompactSelect;
boolean hadPostCompactSelect;
+ boolean hadPreCompactScanner;
boolean hadPreCompact;
boolean hadPostCompact;
boolean hadPreGet = false;
@@ -87,6 +92,7 @@ public class SimpleRegionObserver extend
boolean hadPreScannerClose = false;
boolean hadPostScannerClose = false;
boolean hadPreScannerOpen = false;
+ boolean hadPreStoreScannerOpen = false;
boolean hadPostScannerOpen = false;
boolean hadPreBulkLoadHFile = false;
boolean hadPostBulkLoadHFile = false;
@@ -120,12 +126,20 @@ public class SimpleRegionObserver extend
}
@Override
- public void preFlush(ObserverContext<RegionCoprocessorEnvironment> c) {
+ public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store, InternalScanner scanner) {
hadPreFlush = true;
+ return scanner;
}
@Override
- public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c) {
+ public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+ Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
+ hadPreFlushScannerOpen = true;
+ return null;
+ }
+
+ @Override
+ public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store, StoreFile resultFile) {
hadPostFlush = true;
}
@@ -167,6 +181,14 @@ public class SimpleRegionObserver extend
}
@Override
+ public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+ Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
+ InternalScanner s) throws IOException {
+ hadPreCompactScanner = true;
+ return null;
+ }
+
+ @Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e,
Store store, StoreFile resultFile) {
hadPostCompact = true;
@@ -185,6 +207,14 @@ public class SimpleRegionObserver extend
}
@Override
+ public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Store store, final Scan scan, final NavigableSet<byte[]> targetCols,
+ final KeyValueScanner s) throws IOException {
+ hadPreStoreScannerOpen = true;
+ return null;
+ }
+
+ @Override
public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Scan scan, final RegionScanner s)
throws IOException {
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java?rev=1367402&r1=1367401&r2=1367402&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java Tue Jul 31 04:00:08 2012
@@ -61,7 +61,6 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFilePrettyPrinter;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
-import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.LoadTestTool;
import org.apache.hadoop.hbase.util.MD5Hash;
@@ -408,7 +407,7 @@ public class HFileReadWriteTest {
Scan scan = new Scan();
// Include deletes
- scanner = new StoreScanner(store, scan, scanners,
+ scanner = new StoreScanner(store, store.scanInfo, scan, scanners,
ScanType.MAJOR_COMPACT, Long.MIN_VALUE, Long.MIN_VALUE);
ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java?rev=1367402&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java Tue Jul 31 04:00:08 2012
@@ -0,0 +1,62 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.NavigableSet;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TestFromClientSideWithCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+
+/**
+ * RegionObserver that just reimplements the default behavior,
+ * in order to validate that all the necessary APIs for this are public
+ * This observer is also used in {@link TestFromClientSideWithCoprocessor} and
+ * {@link TestCompactionWithCoprocessor} to make sure that a wide range
+ * of functionality still behaves as expected.
+ */
+public class NoOpScanPolicyObserver extends BaseRegionObserver {
+ /**
+ * Reimplement the default behavior
+ */
+ @Override
+ public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+ Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
+ Store.ScanInfo oldSI = store.getScanInfo();
+ Store.ScanInfo scanInfo = new Store.ScanInfo(store.getFamily(), oldSI.getTtl(),
+ oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
+ Scan scan = new Scan();
+ scan.setMaxVersions(oldSI.getMaxVersions());
+ return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner),
+ ScanType.MINOR_COMPACT, store.getHRegion().getSmallestReadPoint(),
+ HConstants.OLDEST_TIMESTAMP);
+ }
+
+ /**
+ * Reimplement the default behavior
+ */
+ @Override
+ public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+ Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
+ InternalScanner s) throws IOException {
+ // this demonstrates how to override the scanners default behavior
+ Store.ScanInfo oldSI = store.getScanInfo();
+ Store.ScanInfo scanInfo = new Store.ScanInfo(store.getFamily(), oldSI.getTtl(),
+ oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
+ Scan scan = new Scan();
+ scan.setMaxVersions(oldSI.getMaxVersions());
+ return new StoreScanner(store, scanInfo, scan, scanners, scanType, store.getHRegion()
+ .getSmallestReadPoint(), earliestPutTs);
+ }
+
+ @Override
+ public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+ Store store, final Scan scan, final NavigableSet<byte[]> targetCols, KeyValueScanner s)
+ throws IOException {
+ return new StoreScanner(store, store.getScanInfo(), scan, targetCols);
+ }
+}
Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionWithCoprocessor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionWithCoprocessor.java?rev=1367402&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionWithCoprocessor.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionWithCoprocessor.java Tue Jul 31 04:00:08 2012
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Make sure all compaction tests still pass with the preFlush and preCompact
+ * overridden to implement the default behavior
+ */
+@Category(MediumTests.class)
+public class TestCompactionWithCoprocessor extends TestCompaction {
+ /** constructor */
+ public TestCompactionWithCoprocessor() throws Exception {
+ super();
+ conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
+ NoOpScanPolicyObserver.class.getName());
+ }
+}
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java?rev=1367402&r1=1367401&r2=1367402&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java Tue Jul 31 04:00:08 2012
@@ -36,7 +36,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.Store.ScanInfo;
-import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.Bytes;
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java?rev=1367402&r1=1367401&r2=1367402&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java Tue Jul 31 04:00:08 2012
@@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.KeyValueT
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.Store.ScanInfo;
-import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
@@ -559,7 +558,7 @@ public class TestStoreScanner extends Te
KeyValue.COMPARATOR);
StoreScanner scanner =
new StoreScanner(scan, scanInfo,
- StoreScanner.ScanType.MAJOR_COMPACT, null, scanners,
+ ScanType.MAJOR_COMPACT, null, scanners,
HConstants.OLDEST_TIMESTAMP);
List<KeyValue> results = new ArrayList<KeyValue>();
results = new ArrayList<KeyValue>();
Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java?rev=1367402&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java Tue Jul 31 04:00:08 2012
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+// this is deliberately not in the o.a.h.h.regionserver package
+// in order to make sure all required classes/method are available
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.*;
+
+@Category(MediumTests.class)
+public class TestCoprocessorScanPolicy {
+ final Log LOG = LogFactory.getLog(getClass());
+ protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static final byte[] F = Bytes.toBytes("fam");
+ private static final byte[] Q = Bytes.toBytes("qual");
+ private static final byte[] R = Bytes.toBytes("row");
+
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ ScanObserver.class.getName());
+ TEST_UTIL.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testBaseCases() throws Exception {
+ byte[] tableName = Bytes.toBytes("baseCases");
+ HTable t = TEST_UTIL.createTable(tableName, F, 1);
+ // set the version override to 2
+ Put p = new Put(R);
+ p.setAttribute("versions", new byte[]{});
+ p.add(F, tableName, Bytes.toBytes(2));
+ t.put(p);
+
+ // insert 2 versions
+ p = new Put(R);
+ p.add(F, Q, Q);
+ t.put(p);
+ p = new Put(R);
+ p.add(F, Q, Q);
+ t.put(p);
+ Get g = new Get(R);
+ g.setMaxVersions(10);
+ Result r = t.get(g);
+ assertEquals(2, r.size());
+
+ TEST_UTIL.flush(tableName);
+ TEST_UTIL.compact(tableName, true);
+
+ // both version are still visible even after a flush/compaction
+ g = new Get(R);
+ g.setMaxVersions(10);
+ r = t.get(g);
+ assertEquals(2, r.size());
+
+ // insert a 3rd version
+ p = new Put(R);
+ p.add(F, Q, Q);
+ t.put(p);
+ g = new Get(R);
+ g.setMaxVersions(10);
+ r = t.get(g);
+ // still only two version visible
+ assertEquals(2, r.size());
+
+ t.close();
+ }
+
+ @Test
+ public void testTTL() throws Exception {
+ byte[] tableName = Bytes.toBytes("testTTL");
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ HColumnDescriptor hcd = new HColumnDescriptor(F)
+ .setMaxVersions(10)
+ .setTimeToLive(1);
+ desc.addFamily(hcd);
+ TEST_UTIL.getHBaseAdmin().createTable(desc);
+ HTable t = new HTable(new Configuration(TEST_UTIL.getConfiguration()), tableName);
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ ManualEnvironmentEdge me = new ManualEnvironmentEdge();
+ me.setValue(now);
+ EnvironmentEdgeManagerTestHelper.injectEdge(me);
+ // 2s in the past
+ long ts = now - 2000;
+ // Set the TTL override to 3s
+ Put p = new Put(R);
+ p.setAttribute("ttl", new byte[]{});
+ p.add(F, tableName, Bytes.toBytes(3000L));
+ t.put(p);
+
+ p = new Put(R);
+ p.add(F, Q, ts, Q);
+ t.put(p);
+ p = new Put(R);
+ p.add(F, Q, ts+1, Q);
+ t.put(p);
+
+ // these two should be expired but for the override
+ // (their ts was 2s in the past)
+ Get g = new Get(R);
+ g.setMaxVersions(10);
+ Result r = t.get(g);
+ // still there?
+ assertEquals(2, r.size());
+
+ TEST_UTIL.flush(tableName);
+ TEST_UTIL.compact(tableName, true);
+
+ g = new Get(R);
+ g.setMaxVersions(10);
+ r = t.get(g);
+ // still there?
+ assertEquals(2, r.size());
+
+ // roll time forward 2s.
+ me.setValue(now + 2000);
+ // now verify that data eventually does expire
+ g = new Get(R);
+ g.setMaxVersions(10);
+ r = t.get(g);
+ // should be gone now
+ assertEquals(0, r.size());
+ t.close();
+ }
+
+ public static class ScanObserver extends BaseRegionObserver {
+ private Map<String, Long> ttls = new HashMap<String,Long>();
+ private Map<String, Integer> versions = new HashMap<String,Integer>();
+
+ // lame way to communicate with the coprocessor,
+ // since it is loaded by a different class loader
+ @Override
+ public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c, final Put put,
+ final WALEdit edit, final boolean writeToWAL) throws IOException {
+ if (put.getAttribute("ttl") != null) {
+ KeyValue kv = put.getFamilyMap().values().iterator().next().get(0);
+ ttls.put(Bytes.toString(kv.getQualifier()), Bytes.toLong(kv.getValue()));
+ c.bypass();
+ } else if (put.getAttribute("versions") != null) {
+ KeyValue kv = put.getFamilyMap().values().iterator().next().get(0);
+ versions.put(Bytes.toString(kv.getQualifier()), Bytes.toInt(kv.getValue()));
+ c.bypass();
+ }
+ }
+
+ @Override
+ public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+ Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
+ Long newTtl = ttls.get(store.getTableName());
+ if (newTtl != null) {
+ System.out.println("PreFlush:" + newTtl);
+ }
+ Integer newVersions = versions.get(store.getTableName());
+ Store.ScanInfo oldSI = store.getScanInfo();
+ HColumnDescriptor family = store.getFamily();
+ Store.ScanInfo scanInfo = new Store.ScanInfo(family.getName(), family.getMinVersions(),
+ newVersions == null ? family.getMaxVersions() : newVersions,
+ newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
+ oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
+ Scan scan = new Scan();
+ scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions);
+ return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner),
+ ScanType.MINOR_COMPACT, store.getHRegion().getSmallestReadPoint(),
+ HConstants.OLDEST_TIMESTAMP);
+ }
+
+ @Override
+ public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+ Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
+ long earliestPutTs, InternalScanner s) throws IOException {
+ Long newTtl = ttls.get(store.getTableName());
+ Integer newVersions = versions.get(store.getTableName());
+ Store.ScanInfo oldSI = store.getScanInfo();
+ HColumnDescriptor family = store.getFamily();
+ Store.ScanInfo scanInfo = new Store.ScanInfo(family.getName(), family.getMinVersions(),
+ newVersions == null ? family.getMaxVersions() : newVersions,
+ newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
+ oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
+ Scan scan = new Scan();
+ scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions);
+ return new StoreScanner(store, scanInfo, scan, scanners, scanType, store.getHRegion()
+ .getSmallestReadPoint(), earliestPutTs);
+ }
+
+ @Override
+ public KeyValueScanner preStoreScannerOpen(
+ final ObserverContext<RegionCoprocessorEnvironment> c, Store store, final Scan scan,
+ final NavigableSet<byte[]> targetCols, KeyValueScanner s) throws IOException {
+ Long newTtl = ttls.get(store.getTableName());
+ Integer newVersions = versions.get(store.getTableName());
+ Store.ScanInfo oldSI = store.getScanInfo();
+ HColumnDescriptor family = store.getFamily();
+ Store.ScanInfo scanInfo = new Store.ScanInfo(family.getName(), family.getMinVersions(),
+ newVersions == null ? family.getMaxVersions() : newVersions,
+ newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
+ oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
+ return new StoreScanner(store, scanInfo, scan, targetCols);
+ }
+ }
+
+ @org.junit.Rule
+ public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
+ new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
+}