You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/10/10 14:51:00 UTC
svn commit: r1396575 - in
/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver:
DefaultKeyValueAggregator.java KeyValueAggregator.java StoreScanner.java
Author: mbautin
Date: Wed Oct 10 12:51:00 2012
New Revision: 1396575
URL: http://svn.apache.org/viewvc?rev=1396575&view=rev
Log:
[HBASE-6967] KeyValueAggregator interface
Author: adela
Summary:
This is a hook into the StoreScanner to allow aggregation of KeyValues
in a single row.
Test Plan:
Not sure which unit-tests should be run for this, I ran
TestGetClosestAtOrBefore
Revert Plan:
Tags:
- begin *PUBLIC* platform impact section -
Bugzilla: #
- end platform impact -
Reviewers: kranganathan, kannan, rvadali
Reviewed By: kannan
CC: hbase-eng@, aaiyer, adela, mbautin
Differential Revision: https://phabricator.fb.com/D551035
Added:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultKeyValueAggregator.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueAggregator.java
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultKeyValueAggregator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultKeyValueAggregator.java?rev=1396575&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultKeyValueAggregator.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultKeyValueAggregator.java Wed Oct 10 12:51:00 2012
@@ -0,0 +1,60 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.KeyValue;
+
+/**
+ * Default aggregator used by {@link StoreScanner}.
+ */
+public class DefaultKeyValueAggregator implements KeyValueAggregator {
+
+ private static DefaultKeyValueAggregator instance = null;
+
+ protected DefaultKeyValueAggregator() {
+ }
+
+ public static DefaultKeyValueAggregator getInstance() {
+ if (instance == null) {
+ instance = new DefaultKeyValueAggregator();
+ }
+ return instance;
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public KeyValue process(KeyValue kv) {
+ return kv;
+ }
+
+ @Override
+ public ScanQueryMatcher.MatchCode nextAction(
+ ScanQueryMatcher.MatchCode origCode) {
+ return origCode;
+ }
+
+ @Override
+ public KeyValue finalizeKeyValues() {
+ return null;
+ }
+}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueAggregator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueAggregator.java?rev=1396575&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueAggregator.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueAggregator.java Wed Oct 10 12:51:00 2012
@@ -0,0 +1,73 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.KeyValue;
+
+/**
+ * <p>
+ * Used by the {@link StoreScanner} to aggregate KeyValue instances to form a
+ * single row/column. Implementations of this interface can expect the following
+ * call sequence:
+ * </p>
+ * <ul>
+ * <li>reset(): When the scanner starts the row/col</li>
+ * <li>process(): This is called for each included KeyValue as determined by
+ * {@link ScanQueryMatcher}. The aggregator can decide to emit an intermediate
+ * KeyValue.
+ * </li>
+ * <li>nextAction(): The scanner will decide the next action by calling this if
+ * a call to process() returns a non-null value.
+ * </li>
+ * <li>finalizeKeyValues(): The aggregator can manipulate the final row/column
+ * contents that will be returned by the scanner. This lets it use the
+ * intermediate KeyValue instances returned till then and any accumulated state
+ * at the end to create the final row.
+ * </li>
+ * </ul>
+ */
+public interface KeyValueAggregator {
+ /**
+ * Called when the {@link StoreScanner} starts its attempt to get a row/col.
+ */
+ public void reset();
+
+ /**
+ * Make the aggregator process a single KeyValue.
+ * @param kv
+ * @return
+ */
+ public KeyValue process(KeyValue kv);
+
+ /**
+ * Called if the previous call to
+ * {@link #process(org.apache.hadoop.hbase.KeyValue)} had a non-null
+ * return value.
+ * @param origCode
+ * @return
+ */
+ public ScanQueryMatcher.MatchCode nextAction(
+ ScanQueryMatcher.MatchCode origCode);
+
+ /**
+ * Called at the end of the scan to flush out any remaining KeyValue.
+ */
+ public KeyValue finalizeKeyValues();
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1396575&r1=1396574&r2=1396575&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Wed Oct 10 12:51:00 2012
@@ -31,12 +31,14 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseServer.Call;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import static org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
+
/**
* Scanner scans both the memstore and the HStore. Coalesce KeyValue stream
* into List<KeyValue> for a single row.
@@ -59,6 +61,7 @@ class StoreScanner extends NonLazyKeyVal
private final boolean explicitColumnQuery;
private final boolean useRowColBloom;
private final Scan scan;
+ private final KeyValueAggregator keyValueAggregator;
private final NavigableSet<byte[]> columns;
private final long oldestUnexpiredTS;
@@ -82,6 +85,7 @@ class StoreScanner extends NonLazyKeyVal
int numCol = columns == null ? 0 : columns.size();
explicitColumnQuery = numCol > 0;
this.scan = scan;
+ this.keyValueAggregator = DefaultKeyValueAggregator.getInstance();
this.columns = columns;
oldestUnexpiredTS = EnvironmentEdgeManager.currentTimeMillis() - ttl;
@@ -147,7 +151,7 @@ class StoreScanner extends NonLazyKeyVal
* @param scanners ancillary scanners
* @param smallestReadPoint the readPoint that we should use for tracking
* versions
- * @param retainDeletesInOutput should we retain deletes after compaction?
+ * @param retainDeletesInOutputUntil should we retain deletes after compaction?
*/
StoreScanner(Store store, Scan scan,
List<? extends KeyValueScanner> scanners, long smallestReadPoint,
@@ -331,7 +335,7 @@ class StoreScanner extends NonLazyKeyVal
KeyValue kv;
KeyValue prevKV = null;
int numNewKeyValues = 0;
-
+ keyValueAggregator.reset();
Call call = HRegionServer.callContext.get();
long quotaRemaining = (call == null) ? Long.MAX_VALUE
: HRegionServer.getResponseSizeLimit() - call.getPartialResponseSize();
@@ -354,6 +358,18 @@ class StoreScanner extends NonLazyKeyVal
prevKV = kv;
ScanQueryMatcher.MatchCode qcode = matcher.match(copyKv);
+ if ((qcode == MatchCode.INCLUDE) ||
+ (qcode == MatchCode.INCLUDE_AND_SEEK_NEXT_COL) ||
+ (qcode == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW)) {
+ copyKv = keyValueAggregator.process(copyKv);
+ // A null return is an indication to skip the KV.
+ if (copyKv == null) {
+ qcode = MatchCode.SKIP;
+ } else {
+ qcode = keyValueAggregator.nextAction(qcode);
+ }
+ }
+
switch(qcode) {
case INCLUDE:
case INCLUDE_AND_SEEK_NEXT_ROW:
@@ -363,6 +379,7 @@ class StoreScanner extends NonLazyKeyVal
this.countPerRow > (storeLimit + storeOffset)) {
// do what SEEK_NEXT_ROW does.
if (!matcher.moreRowsMayExistAfter(kv)) {
+ numNewKeyValues += processLastKeyValue(outResult);
return false;
}
reseek(matcher.getKeyForNextRow(kv));
@@ -379,13 +396,13 @@ class StoreScanner extends NonLazyKeyVal
+ HRegionServer.getResponseSizeLimit() + " bytes.");
throw new DoNotRetryIOException("Result too large");
}
-
outResult.add(copyKv);
numNewKeyValues++;
}
if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
if (!matcher.moreRowsMayExistAfter(kv)) {
+ numNewKeyValues += processLastKeyValue(outResult);
return false;
}
reseek(matcher.getKeyForNextRow(kv));
@@ -401,17 +418,19 @@ class StoreScanner extends NonLazyKeyVal
continue;
case DONE:
+ numNewKeyValues += processLastKeyValue(outResult);
return true;
case DONE_SCAN:
close();
-
+ numNewKeyValues += processLastKeyValue(outResult);
return false;
case SEEK_NEXT_ROW:
// This is just a relatively simple end of scan fix, to short-cut end
// us if there is an endKey in the scan.
if (!matcher.moreRowsMayExistAfter(kv)) {
+ numNewKeyValues += processLastKeyValue(outResult);
return false;
}
@@ -464,12 +483,12 @@ class StoreScanner extends NonLazyKeyVal
throw e;
- } finally {
- // update the counter
+ } finally {
+ // update the counter
if (addedResultsSize > 0 && metric != null) {
- HRegion.incrNumericMetric(this.metricNamePrefix + metric,
+ HRegion.incrNumericMetric(this.metricNamePrefix + metric,
addedResultsSize);
- }
+ }
// update the partial results size
if (call != null) {
call.setPartialResponseSize(call.getPartialResponseSize()
@@ -477,6 +496,7 @@ class StoreScanner extends NonLazyKeyVal
}
}
+ numNewKeyValues += processLastKeyValue(outResult);
if (numNewKeyValues > 0) {
return true;
}
@@ -486,6 +506,15 @@ class StoreScanner extends NonLazyKeyVal
return false;
}
+ private byte processLastKeyValue(List<KeyValue> outResult){
+ KeyValue lastKV = keyValueAggregator.finalizeKeyValues();
+ if (lastKV != null) {
+ outResult.add(lastKV);
+ return 1;
+ }
+ return 0;
+ }
+
@Override
public synchronized boolean next(List<KeyValue> outResult) throws IOException {
return next(outResult, -1, null);