You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/10/20 14:52:35 UTC
svn commit: r1400431 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/regionserver/
main/java/org/apache/hadoop/hbase/regionserver/kvaggregator/
test/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/region...
Author: mbautin
Date: Sat Oct 20 12:52:34 2012
New Revision: 1400431
URL: http://svn.apache.org/viewvc?rev=1400431&view=rev
Log:
[HBASE-6967] Compaction hook framework
Author: adela
Summary: as in title
Test Plan: make dummy KVAggregator and check whether it works (TestKeyValueAggregator unit test added)
Reviewers: kannan, kranganathan
Reviewed By: kannan
CC: hbase-eng@, erling
Differential Revision: https://phabricator.fb.com/D598973
Task ID: 1749101
Added:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/kvaggregator/
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/kvaggregator/DefaultKeyValueAggregator.java
- copied, changed from r1400430, hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultKeyValueAggregator.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/kvaggregator/KeyValueAggregator.java
- copied, changed from r1400430, hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueAggregator.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/kvaggregator/
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/kvaggregator/LowerToUpperAggregator.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/kvaggregator/TestKeyValueAggregator.java
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultKeyValueAggregator.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueAggregator.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultKeyValueAggregator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultKeyValueAggregator.java?rev=1400431&r1=1400430&r2=1400431&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultKeyValueAggregator.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultKeyValueAggregator.java Sat Oct 20 12:52:34 2012
@@ -1,60 +0,0 @@
-/**
- * Copyright The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import org.apache.hadoop.hbase.KeyValue;
-
-/**
- * Default aggregator used by {@link StoreScanner}.
- */
-public class DefaultKeyValueAggregator implements KeyValueAggregator {
-
- private static DefaultKeyValueAggregator instance = null;
-
- protected DefaultKeyValueAggregator() {
- }
-
- public static DefaultKeyValueAggregator getInstance() {
- if (instance == null) {
- instance = new DefaultKeyValueAggregator();
- }
- return instance;
- }
-
- @Override
- public void reset() {
- }
-
- @Override
- public KeyValue process(KeyValue kv) {
- return kv;
- }
-
- @Override
- public ScanQueryMatcher.MatchCode nextAction(
- ScanQueryMatcher.MatchCode origCode) {
- return origCode;
- }
-
- @Override
- public KeyValue finalizeKeyValues() {
- return null;
- }
-}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueAggregator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueAggregator.java?rev=1400431&r1=1400430&r2=1400431&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueAggregator.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueAggregator.java Sat Oct 20 12:52:34 2012
@@ -1,73 +0,0 @@
-/**
- * Copyright The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import org.apache.hadoop.hbase.KeyValue;
-
-/**
- * <p>
- * Used by the {@link StoreScanner} to aggregate KeyValue instances to form a
- * single row/column. Implementations of this interface can expect the following
- * call sequence:
- * </p>
- * <ul>
- * <li>reset(): When the scanner starts the row/col</li>
- * <li>process(): This is called for each included KeyValue as determined by
- * {@link ScanQueryMatcher}. The aggregator can decide to emit an intermediate
- * KeyValue.
- * </li>
- * <li>nextAction(): The scanner will decide the next action by calling this if
- * a call to process() returns a non-null value.
- * </li>
- * <li>finalizeKeyValues(): The aggregator can manipulate the final row/column
- * contents that will be returned by the scanner. This lets it use the
- * intermediate KeyValue instances returned till then and any accumulated state
- * at the end to create the final row.
- * </li>
- * </ul>
- */
-public interface KeyValueAggregator {
- /**
- * Called when the {@link StoreScanner} starts its attempt to get a row/col.
- */
- public void reset();
-
- /**
- * Make the aggregator process a single KeyValue.
- * @param kv
- * @return
- */
- public KeyValue process(KeyValue kv);
-
- /**
- * Called if the previous call to
- * {@link #process(org.apache.hadoop.hbase.KeyValue)} had a non-null
- * return value.
- * @param origCode
- * @return
- */
- public ScanQueryMatcher.MatchCode nextAction(
- ScanQueryMatcher.MatchCode origCode);
-
- /**
- * Called at the end of the scan to flush out any remaining KeyValue.
- */
- public KeyValue finalizeKeyValues();
-}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1400431&r1=1400430&r2=1400431&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Sat Oct 20 12:52:34 2012
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.HColumnDe
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.HeapSize;
@@ -60,6 +61,8 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.regionserver.kvaggregator.DefaultKeyValueAggregator;
+import org.apache.hadoop.hbase.regionserver.kvaggregator.KeyValueAggregator;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.Bytes;
@@ -143,6 +146,8 @@ public class Store extends SchemaConfigu
// Comparing KeyValues
final KeyValue.KVComparator comparator;
+ private Class<KeyValueAggregator> aggregatorClass = null;
+
/**
* Constructor
* @param basedir qualified path under which the region directory lives;
@@ -154,6 +159,7 @@ public class Store extends SchemaConfigu
* failed. Can be null.
* @throws IOException
*/
+ @SuppressWarnings("unchecked")
protected Store(Path basedir, HRegion region, HColumnDescriptor family,
FileSystem fs, Configuration confParam)
throws IOException {
@@ -235,6 +241,16 @@ public class Store extends SchemaConfigu
setCompactionPolicy(conf.get(HConstants.COMPACTION_MANAGER_CLASS,
HConstants.DEFAULT_COMPACTION_MANAGER_CLASS));
+
+ String aggregatorString = conf.get("aggregator");
+ if (aggregatorString != null && !aggregatorString.isEmpty()) {
+ try {
+ this.aggregatorClass = ((Class<KeyValueAggregator>) Class
+ .forName(aggregatorString));
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
}
/**
@@ -484,7 +500,7 @@ public class Store extends SchemaConfigu
srcPath = tmpPath;
}
- Path dstPath = StoreFile.getRandomFilename(fs, homedir,
+ Path dstPath = StoreFile.getRandomFilename(fs, homedir,
(sequenceId >= 0) ? ("_SeqId_" + sequenceId + "_") : null);
LOG.info("Renaming bulk load file " + srcPath + " to " + dstPath);
StoreFile.rename(fs, srcPath, dstPath);
@@ -637,7 +653,7 @@ public class Store extends SchemaConfigu
InternalScanner scanner = new StoreScanner(this, scan,
MemStore.getSnapshotScanners(snapshot, this.comparator),
this.region.getSmallestReadPoint(),
- Long.MIN_VALUE); // include all deletes
+ Long.MIN_VALUE, getAggregator()); // include all deletes
String fileName;
try {
@@ -1151,7 +1167,7 @@ public class Store extends SchemaConfigu
(System.currentTimeMillis() - this.timeToPurgeDeletes))
: Long.MIN_VALUE;
scanner = new StoreScanner(this, scan, scanners, smallestReadPoint,
- retainDeletesUntil);
+ retainDeletesUntil, getAggregator());
int bytesWritten = 0;
// since scanner.next() can return 'false' but still be delivering data,
// we have to use a do/while loop.
@@ -1620,7 +1636,7 @@ public class Store extends SchemaConfigu
final NavigableSet<byte []> targetCols) throws IOException {
lock.readLock().lock();
try {
- return new StoreScanner(this, scan, targetCols);
+ return new StoreScanner(this, scan, targetCols, getAggregator());
} finally {
lock.readLock().unlock();
}
@@ -1817,8 +1833,8 @@ public class Store extends SchemaConfigu
public static final long FIXED_OVERHEAD =
ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
- + (17 * ClassSize.REFERENCE) + (5 * Bytes.SIZEOF_LONG)
- + (4 * Bytes.SIZEOF_INT) + 2 * Bytes.SIZEOF_BOOLEAN);
+ + (18 * ClassSize.REFERENCE) + (5 * Bytes.SIZEOF_LONG)
+ + (4 * Bytes.SIZEOF_INT) + 2 * Bytes.SIZEOF_BOOLEAN) ;
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
ClassSize.OBJECT + ClassSize.REENTRANT_LOCK +
@@ -1839,4 +1855,23 @@ public class Store extends SchemaConfigu
HConstants.DEFAULT_COMPACTION_MANAGER_CLASS));
}
+ /**
+ * Gets the aggregator which is specified in the Configuration. If none is
+ * specified, returns the DefaultKeyValueAggregator
+ * @throws IllegalAccessException
+ * @throws InstantiationException
+ */
+ private KeyValueAggregator getAggregator() {
+ if (aggregatorClass == null) {
+ return DefaultKeyValueAggregator.getInstance();
+ } else {
+ try {
+ return aggregatorClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException(e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+ }
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1400431&r1=1400430&r2=1400431&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Sat Oct 20 12:52:34 2012
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.ipc.HBase
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.regionserver.kvaggregator.KeyValueAggregator;
import static org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
@@ -43,7 +44,7 @@ import static org.apache.hadoop.hbase.re
* Scanner scans both the memstore and the HStore. Coalesce KeyValue stream
* into List<KeyValue> for a single row.
*/
-class StoreScanner extends NonLazyKeyValueScanner
+public class StoreScanner extends NonLazyKeyValueScanner
implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
static final Log LOG = LogFactory.getLog(StoreScanner.class);
private Store store;
@@ -77,7 +78,7 @@ class StoreScanner extends NonLazyKeyVal
/** An internal constructor. */
private StoreScanner(Store store, boolean cacheBlocks, Scan scan,
- final NavigableSet<byte[]> columns, long ttl) {
+ final NavigableSet<byte[]> columns, long ttl, KeyValueAggregator keyValueAggregator) {
this.store = store;
initializeMetricNames();
this.cacheBlocks = cacheBlocks;
@@ -85,7 +86,7 @@ class StoreScanner extends NonLazyKeyVal
int numCol = columns == null ? 0 : columns.size();
explicitColumnQuery = numCol > 0;
this.scan = scan;
- this.keyValueAggregator = DefaultKeyValueAggregator.getInstance();
+ this.keyValueAggregator = keyValueAggregator;
this.columns = columns;
oldestUnexpiredTS = EnvironmentEdgeManager.currentTimeMillis() - ttl;
@@ -105,9 +106,9 @@ class StoreScanner extends NonLazyKeyVal
* @param columns which columns we are scanning
* @throws IOException
*/
- StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns)
- throws IOException {
- this(store, scan.getCacheBlocks(), scan, columns, store.ttl);
+ StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns,
+ KeyValueAggregator keyValueAggregator) throws IOException {
+ this(store, scan.getCacheBlocks(), scan, columns, store.ttl, keyValueAggregator);
matcher =
new ScanQueryMatcher(scan, store.getFamily().getName(), columns,
store.comparator.getRawComparator(),
@@ -155,8 +156,8 @@ class StoreScanner extends NonLazyKeyVal
*/
StoreScanner(Store store, Scan scan,
List<? extends KeyValueScanner> scanners, long smallestReadPoint,
- long retainDeletesInOutputUntil) throws IOException {
- this(store, false, scan, null, store.ttl);
+ long retainDeletesInOutputUntil, KeyValueAggregator keyValueAggregator) throws IOException {
+ this(store, false, scan, null, store.ttl, keyValueAggregator);
matcher =
new ScanQueryMatcher(scan, store.getFamily().getName(), null,
@@ -180,9 +181,10 @@ class StoreScanner extends NonLazyKeyVal
StoreScanner(final Scan scan, final byte [] colFamily, final long ttl,
final KeyValue.KVComparator comparator,
final NavigableSet<byte[]> columns,
- final List<KeyValueScanner> scanners)
+ final List<KeyValueScanner> scanners,
+ final KeyValueAggregator keyValueAggregator)
throws IOException {
- this(scan, colFamily, ttl, comparator, columns, scanners, Long.MAX_VALUE);
+ this(scan, colFamily, ttl, comparator, columns, scanners, Long.MAX_VALUE, keyValueAggregator);
}
/** Constructor for testing. */
@@ -190,9 +192,10 @@ class StoreScanner extends NonLazyKeyVal
final KeyValue.KVComparator comparator,
final NavigableSet<byte[]> columns,
final List<KeyValueScanner> scanners,
- final long retainDeletesInOutputUntil)
+ final long retainDeletesInOutputUntil,
+ final KeyValueAggregator keyValueAggregator)
throws IOException {
- this(null, scan.getCacheBlocks(), scan, columns, ttl);
+ this(null, scan.getCacheBlocks(), scan, columns, ttl, keyValueAggregator);
this.matcher =
new ScanQueryMatcher(scan, colFamily, columns,
comparator.getRawComparator(), scan.getMaxVersions(),
Copied: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/kvaggregator/DefaultKeyValueAggregator.java (from r1400430, hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultKeyValueAggregator.java)
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/kvaggregator/DefaultKeyValueAggregator.java?p2=hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/kvaggregator/DefaultKeyValueAggregator.java&p1=hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultKeyValueAggregator.java&r1=1400430&r2=1400431&rev=1400431&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultKeyValueAggregator.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/kvaggregator/DefaultKeyValueAggregator.java Sat Oct 20 12:52:34 2012
@@ -17,24 +17,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hbase.regionserver;
+package org.apache.hadoop.hbase.regionserver.kvaggregator;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher;
/**
* Default aggregator used by {@link StoreScanner}.
*/
public class DefaultKeyValueAggregator implements KeyValueAggregator {
- private static DefaultKeyValueAggregator instance = null;
+ static DefaultKeyValueAggregator instance = new DefaultKeyValueAggregator();
protected DefaultKeyValueAggregator() {
}
- public static DefaultKeyValueAggregator getInstance() {
- if (instance == null) {
- instance = new DefaultKeyValueAggregator();
- }
+ public static KeyValueAggregator getInstance() {
return instance;
}
@@ -57,4 +55,5 @@ public class DefaultKeyValueAggregator i
public KeyValue finalizeKeyValues() {
return null;
}
+
}
Copied: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/kvaggregator/KeyValueAggregator.java (from r1400430, hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueAggregator.java)
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/kvaggregator/KeyValueAggregator.java?p2=hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/kvaggregator/KeyValueAggregator.java&p1=hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueAggregator.java&r1=1400430&r2=1400431&rev=1400431&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueAggregator.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/kvaggregator/KeyValueAggregator.java Sat Oct 20 12:52:34 2012
@@ -17,9 +17,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hbase.regionserver;
+package org.apache.hadoop.hbase.regionserver.kvaggregator;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher;
/**
* <p>
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java?rev=1400431&r1=1400430&r2=1400431&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java Sat Oct 20 12:52:34 2012
@@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFilePrettyPrinter;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
+import org.apache.hadoop.hbase.regionserver.kvaggregator.DefaultKeyValueAggregator;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.MD5Hash;
import org.apache.hadoop.util.StringUtils;
@@ -417,7 +418,8 @@ public class HFileReadWriteTest {
Scan scan = new Scan();
// Include deletes
- scanner = new StoreScanner(store, scan, scanners, Long.MAX_VALUE, Long.MIN_VALUE);
+ scanner = new StoreScanner(store, scan, scanners, Long.MAX_VALUE,
+ Long.MIN_VALUE, DefaultKeyValueAggregator.getInstance());
ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
while (scanner.next(kvs) || kvs.size() != 0) {
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java?rev=1400431&r1=1400430&r2=1400431&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java Sat Oct 20 12:52:34 2012
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueTestUtil;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.kvaggregator.DefaultKeyValueAggregator;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.Bytes;
@@ -63,8 +64,8 @@ public class TestMemStore extends TestCa
super.setUp();
this.mvcc = new MultiVersionConsistencyControl();
this.memstore = new MemStore();
- SchemaMetrics.configureGlobally(HBaseConfiguration.create());
- }
+ SchemaMetrics.configureGlobally(HBaseConfiguration.create());
+ }
public void testPutSameKey() {
byte [] bytes = Bytes.toBytes(getName());
@@ -90,7 +91,7 @@ public class TestMemStore extends TestCa
List<KeyValue> result = new ArrayList<KeyValue>();
MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
StoreScanner s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
- this.memstore.comparator, null, memstorescanners);
+ this.memstore.comparator, null, memstorescanners, DefaultKeyValueAggregator.getInstance());
int count = 0;
try {
while (s.next(result)) {
@@ -112,7 +113,7 @@ public class TestMemStore extends TestCa
memstorescanners = this.memstore.getScanners();
// Now assert can count same number even if a snapshot mid-scan.
s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
- this.memstore.comparator, null, memstorescanners);
+ this.memstore.comparator, null, memstorescanners, DefaultKeyValueAggregator.getInstance());
count = 0;
try {
while (s.next(result)) {
@@ -139,7 +140,7 @@ public class TestMemStore extends TestCa
// Assert that new values are seen in kvset as we scan.
long ts = System.currentTimeMillis();
s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
- this.memstore.comparator, null, memstorescanners);
+ this.memstore.comparator, null, memstorescanners, DefaultKeyValueAggregator.getInstance());
count = 0;
int snapshotIndex = 5;
try {
@@ -556,7 +557,7 @@ public class TestMemStore extends TestCa
InternalScanner scanner =
new StoreScanner(new Scan(Bytes.toBytes(startRowId)), FAMILY,
Integer.MAX_VALUE, this.memstore.comparator, null,
- memstore.getScanners());
+ memstore.getScanners(), DefaultKeyValueAggregator.getInstance());
List<KeyValue> results = new ArrayList<KeyValue>();
for (int i = 0; scanner.next(results); i++) {
int rowId = startRowId + i;
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java?rev=1400431&r1=1400430&r2=1400431&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java Sat Oct 20 12:52:34 2012
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.HBaseConf
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueTestUtil;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.kvaggregator.DefaultKeyValueAggregator;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.Bytes;
@@ -79,7 +80,7 @@ public class TestStoreScanner extends Te
scanSpec.setMaxVersions();
StoreScanner scan =
new StoreScanner(scanSpec, CF, Long.MAX_VALUE,
- KeyValue.COMPARATOR, getCols("a"), scanners);
+ KeyValue.COMPARATOR, getCols("a"), scanners, DefaultKeyValueAggregator.getInstance());
List<KeyValue> results = new ArrayList<KeyValue>();
assertEquals(true, scan.next(results));
assertEquals(5, results.size());
@@ -89,7 +90,7 @@ public class TestStoreScanner extends Te
scanSpec.setTimeRange(1, 3);
scanSpec.setMaxVersions();
scan = new StoreScanner(scanSpec, CF, Long.MAX_VALUE,
- KeyValue.COMPARATOR, getCols("a"), scanners);
+ KeyValue.COMPARATOR, getCols("a"), scanners, DefaultKeyValueAggregator.getInstance());
results = new ArrayList<KeyValue>();
assertEquals(true, scan.next(results));
assertEquals(2, results.size());
@@ -98,7 +99,7 @@ public class TestStoreScanner extends Te
scanSpec.setTimeRange(5, 10);
scanSpec.setMaxVersions();
scan = new StoreScanner(scanSpec, CF, Long.MAX_VALUE,
- KeyValue.COMPARATOR, getCols("a"), scanners);
+ KeyValue.COMPARATOR, getCols("a"), scanners, DefaultKeyValueAggregator.getInstance());
results = new ArrayList<KeyValue>();
assertEquals(true, scan.next(results));
assertEquals(1, results.size());
@@ -108,7 +109,7 @@ public class TestStoreScanner extends Te
scanSpec.setTimeRange(0, 10);
scanSpec.setMaxVersions(3);
scan = new StoreScanner(scanSpec, CF, Long.MAX_VALUE,
- KeyValue.COMPARATOR, getCols("a"), scanners);
+ KeyValue.COMPARATOR, getCols("a"), scanners, DefaultKeyValueAggregator.getInstance());
results = new ArrayList<KeyValue>();
assertEquals(true, scan.next(results));
assertEquals(3, results.size());
@@ -130,7 +131,7 @@ public class TestStoreScanner extends Te
StoreScanner scan =
new StoreScanner(scanSpec, CF, Long.MAX_VALUE,
KeyValue.COMPARATOR, getCols("a"),
- scanners);
+ scanners, DefaultKeyValueAggregator.getInstance());
List<KeyValue> results = new ArrayList<KeyValue>();
assertEquals(true, scan.next(results));
@@ -159,7 +160,7 @@ public class TestStoreScanner extends Te
StoreScanner scan =
new StoreScanner(scanSpec, CF, Long.MAX_VALUE,
KeyValue.COMPARATOR, getCols("a"),
- scanners);
+ scanners, DefaultKeyValueAggregator.getInstance());
List<KeyValue> results = new ArrayList<KeyValue>();
scan.next(results);
@@ -188,7 +189,7 @@ public class TestStoreScanner extends Te
Scan scanSpec = new Scan(Bytes.toBytes("R1"));
StoreScanner scan =
new StoreScanner(scanSpec, CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
- getCols("a"), scanners);
+ getCols("a"), scanners, DefaultKeyValueAggregator.getInstance());
List<KeyValue> results = new ArrayList<KeyValue>();
assertFalse(scan.next(results));
@@ -209,7 +210,7 @@ public class TestStoreScanner extends Te
Scan scanSpec = new Scan(Bytes.toBytes("R1"));
StoreScanner scan =
new StoreScanner(scanSpec, CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
- getCols("a"), scanners);
+ getCols("a"), scanners, DefaultKeyValueAggregator.getInstance());
List<KeyValue> results = new ArrayList<KeyValue>();
assertEquals(true, scan.next(results));
@@ -237,7 +238,7 @@ public class TestStoreScanner extends Te
StoreScanner scan =
new StoreScanner(new Scan(Bytes.toBytes("R1")), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
- getCols("a"), scanners);
+ getCols("a"), scanners, DefaultKeyValueAggregator.getInstance());
List<KeyValue> results = new ArrayList<KeyValue>();
// the two put at ts=now will be masked by the 1 delete, and
// since the scan default returns 1 version we'll return the newest
@@ -263,7 +264,7 @@ public class TestStoreScanner extends Te
Scan scanSpec = new Scan(Bytes.toBytes("R1")).setMaxVersions(2);
StoreScanner scan =
new StoreScanner(scanSpec, CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
- getCols("a"), scanners);
+ getCols("a"), scanners, DefaultKeyValueAggregator.getInstance());
List<KeyValue> results = new ArrayList<KeyValue>();
assertEquals(true, scan.next(results));
assertEquals(2, results.size());
@@ -280,7 +281,7 @@ public class TestStoreScanner extends Te
List<KeyValueScanner> scanners = scanFixture(kvs);
StoreScanner scan =
new StoreScanner(new Scan(Bytes.toBytes("R1")), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
- null, scanners);
+ null, scanners, DefaultKeyValueAggregator.getInstance());
List<KeyValue> results = new ArrayList<KeyValue>();
assertEquals(true, scan.next(results));
assertEquals(2, results.size());
@@ -310,7 +311,7 @@ public class TestStoreScanner extends Te
List<KeyValueScanner> scanners = scanFixture(kvs);
StoreScanner scan =
new StoreScanner(new Scan().setMaxVersions(2), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
- null, scanners);
+ null, scanners, DefaultKeyValueAggregator.getInstance());
List<KeyValue> results = new ArrayList<KeyValue>();
assertEquals(true, scan.next(results));
assertEquals(5, results.size());
@@ -339,7 +340,7 @@ public class TestStoreScanner extends Te
List<KeyValueScanner> scanners = scanFixture(kvs);
StoreScanner scan =
new StoreScanner(new Scan().setMaxVersions(Integer.MAX_VALUE), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
- null, scanners);
+ null, scanners, DefaultKeyValueAggregator.getInstance());
List<KeyValue> results = new ArrayList<KeyValue>();
assertEquals(true, scan.next(results));
assertEquals(0, results.size());
@@ -389,7 +390,7 @@ public class TestStoreScanner extends Te
List<KeyValueScanner> scanners = scanFixture(kvs);
StoreScanner scanner =
new StoreScanner(new Scan().setMaxVersions(2), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
- null, scanners, now - 200);
+ null, scanners, now - 200, DefaultKeyValueAggregator.getInstance());
List<KeyValue> results = new ArrayList<KeyValue>();
results = new ArrayList<KeyValue>();
assertEquals(true, scanner.next(results));
@@ -413,7 +414,7 @@ public class TestStoreScanner extends Te
List<KeyValueScanner> scanners = scanFixture(kvs);
StoreScanner scan =
new StoreScanner(new Scan(), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
- null, scanners);
+ null, scanners, DefaultKeyValueAggregator.getInstance());
List<KeyValue> results = new ArrayList<KeyValue>();
assertEquals(true, scan.next(results));
assertEquals(1, results.size());
@@ -437,7 +438,7 @@ public class TestStoreScanner extends Te
List<KeyValueScanner> scanners = scanFixture(kvs);
StoreScanner scan =
new StoreScanner(new Scan(), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
- getCols("a", "d"), scanners);
+ getCols("a", "d"), scanners, DefaultKeyValueAggregator.getInstance());
List<KeyValue> results = new ArrayList<KeyValue>();
assertEquals(true, scan.next(results));
@@ -475,7 +476,7 @@ public class TestStoreScanner extends Te
scan.setMaxVersions(1);
StoreScanner scanner =
new StoreScanner(scan, CF, 500, KeyValue.COMPARATOR,
- null, scanners);
+ null, scanners, DefaultKeyValueAggregator.getInstance());
List<KeyValue> results = new ArrayList<KeyValue>();
assertEquals(true, scanner.next(results));
@@ -498,7 +499,7 @@ public class TestStoreScanner extends Te
List<KeyValueScanner> scanners = scanFixture(kvs);
StoreScanner scan =
new StoreScanner(new Scan(), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
- getCols("a", "d"), scanners);
+ getCols("a", "d"), scanners, DefaultKeyValueAggregator.getInstance());
// Previously a updateReaders twice in a row would cause an NPE. In test this would also
@@ -525,7 +526,7 @@ public class TestStoreScanner extends Te
Scan scanSpec = new Scan(Bytes.toBytes("R1"));
StoreScanner scan =
new StoreScanner(scanSpec, CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
- getCols("a"), scanners);
+ getCols("a"), scanners, DefaultKeyValueAggregator.getInstance());
assertNull(scan.peek());
}
@@ -543,7 +544,7 @@ public class TestStoreScanner extends Te
// scanner with ttl equal to 500
StoreScanner scanner =
new StoreScanner(scan, CF, 500, KeyValue.COMPARATOR,
- null, scanners);
+ null, scanners, DefaultKeyValueAggregator.getInstance());
List<KeyValue> results = new ArrayList<KeyValue>();
assertEquals(true, scanner.next(results));
Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/kvaggregator/LowerToUpperAggregator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/kvaggregator/LowerToUpperAggregator.java?rev=1400431&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/kvaggregator/LowerToUpperAggregator.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/kvaggregator/LowerToUpperAggregator.java Sat Oct 20 12:52:34 2012
@@ -0,0 +1,41 @@
+package org.apache.hadoop.hbase.regionserver.kvaggregator;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
+
+/**
+ * <p>
+ * Dummy test aggregator which takes the String Value of the KV which is
+ * expected to be in lowercase and transforms it to uppercase
+ * </p>
+ */
+public class LowerToUpperAggregator implements KeyValueAggregator {
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public KeyValue process(KeyValue kv) {
+ byte[] newValue;
+ String currentValue = new String(kv.getValue());
+ /**
+ * transform it to uppercase.
+ */
+ String newValueString = currentValue.toUpperCase();
+ newValue = newValueString.getBytes();
+ KeyValue newKv = new KeyValue(kv.getRow(), kv.getFamily(),
+ kv.getQualifier(), kv.getTimestamp(), newValue);
+ return newKv;
+ }
+
+ @Override
+ public MatchCode nextAction(MatchCode origCode) {
+ return origCode;
+ }
+
+ @Override
+ public KeyValue finalizeKeyValues() {
+ return null;
+ }
+}
Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/kvaggregator/TestKeyValueAggregator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/kvaggregator/TestKeyValueAggregator.java?rev=1400431&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/kvaggregator/TestKeyValueAggregator.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/kvaggregator/TestKeyValueAggregator.java Sat Oct 20 12:52:34 2012
@@ -0,0 +1,85 @@
+package org.apache.hadoop.hbase.regionserver.kvaggregator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+@SuppressWarnings("deprecation")
+public class TestKeyValueAggregator {
+ private static byte[] TABLE = Bytes.toBytes("TestKeyValueAggregator");
+ private static byte[] FAMILY = Bytes.toBytes("family");
+ private static byte[] START_KEY = Bytes.toBytes("aaa");
+ private static byte[] END_KEY = Bytes.toBytes("zzz");
+ private static int BLOCK_SIZE = 70;
+
+ private static HBaseTestingUtility TEST_UTIL = null;
+ private static HTableDescriptor TESTTABLEDESC = null;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ SchemaMetrics.setUseTableNameInTest(true);
+ TEST_UTIL = new HBaseTestingUtility();
+ TESTTABLEDESC = new HTableDescriptor(TABLE);
+
+ TESTTABLEDESC.addFamily(new HColumnDescriptor(FAMILY).setMaxVersions(10)
+ .setBlockCacheEnabled(true).setBlocksize(BLOCK_SIZE)
+ .setCompressionType(Compression.Algorithm.NONE));
+ TEST_UTIL
+ .getConfiguration()
+ .set("aggregator",
+ "org.apache.hadoop.hbase.regionserver.kvaggregator.LowerToUpperAggregator");
+ }
+
+ @Test
+ public void testDummyKvAggregator() throws Exception {
+ HRegion r = HBaseTestCase.createNewHRegion(TESTTABLEDESC, START_KEY,
+ END_KEY, TEST_UTIL.getConfiguration());
+ Put[] puts = new Put[25];
+ // put some lowercase strings
+ for (int i = 0; i < 25; i++) {
+ byte[] row = Bytes.toBytes("row" + i);
+ Put put = new Put(row);
+ byte[] qualifier = Bytes.toBytes("qual" + i);
+ byte[] value = Bytes.toBytes("ab" + (char) (i + 97));
+ put.add(FAMILY, qualifier, value);
+ puts[i] = put;
+ }
+ r.put(puts);
+ Scan scan = new Scan();
+ InternalScanner s = r.getScanner(scan);
+ List<KeyValue> results = new ArrayList<KeyValue>();
+ while (s.next(results))
+ ;
+ // check if the values in results are in upper case
+ s.close();
+ Assert.assertTrue(checkIfLowerCase(results));
+ // check if we got all 25 results back
+ Assert.assertEquals(25, results.size());
+ }
+
+ public boolean checkIfLowerCase(List<KeyValue> result) {
+ for (KeyValue kv : result) {
+ String currValue = new String(kv.getValue());
+ String uppercase = currValue.toUpperCase();
+ if (!currValue.equals(uppercase)) {
+ return false;
+ }
+ }
+ return true;
+ }
+}