You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/10/10 14:51:00 UTC

svn commit: r1396575 - in /hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver: DefaultKeyValueAggregator.java KeyValueAggregator.java StoreScanner.java

Author: mbautin
Date: Wed Oct 10 12:51:00 2012
New Revision: 1396575

URL: http://svn.apache.org/viewvc?rev=1396575&view=rev
Log:
[HBASE-6967] KeyValueAggregator interface

Author: adela

Summary:
This is a hook into the StoreScanner to allow aggregation of KeyValues
in a single row.

Test Plan:
Not sure which unit-tests should be run for this, I ran
TestGetClosestAtOrBefore

Revert Plan:

Tags:

- begin *PUBLIC* platform impact section -
Bugzilla: #
- end platform impact -

Reviewers: kranganathan, kannan, rvadali

Reviewed By: kannan

CC: hbase-eng@, aaiyer, adela, mbautin

Differential Revision: https://phabricator.fb.com/D551035

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultKeyValueAggregator.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueAggregator.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultKeyValueAggregator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultKeyValueAggregator.java?rev=1396575&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultKeyValueAggregator.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultKeyValueAggregator.java Wed Oct 10 12:51:00 2012
@@ -0,0 +1,60 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.KeyValue;
+
+/**
+ * Default aggregator used by {@link StoreScanner}.
+ */
+public class DefaultKeyValueAggregator implements KeyValueAggregator {
+
+  private static DefaultKeyValueAggregator instance = null;
+
+  protected DefaultKeyValueAggregator() {
+  }
+
+  public static DefaultKeyValueAggregator getInstance() {
+    if (instance == null) {
+      instance = new DefaultKeyValueAggregator();
+    }
+    return instance;
+  }
+
+  @Override
+  public void reset() {
+  }
+
+  @Override
+  public KeyValue process(KeyValue kv) {
+    return kv;
+  }
+
+  @Override
+  public ScanQueryMatcher.MatchCode nextAction(
+    ScanQueryMatcher.MatchCode origCode) {
+    return origCode;
+  }
+
+  @Override
+  public KeyValue finalizeKeyValues() {
+    return null;
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueAggregator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueAggregator.java?rev=1396575&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueAggregator.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueAggregator.java Wed Oct 10 12:51:00 2012
@@ -0,0 +1,73 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.KeyValue;
+
+/**
+ * <p>
+ * Used by the {@link StoreScanner} to aggregate KeyValue instances to form a
+ * single row/column. Implementations of this interface can expect the following
+ * call sequence:
+ * </p>
+ * <ul>
+ *   <li>reset(): When the scanner starts the row/col</li>
+ *   <li>process(): This is called for each included KeyValue as determined by
+ *     {@link ScanQueryMatcher}. The aggregator can decide to emit an intermediate
+ *     KeyValue.
+ *   </li>
+ *   <li>nextAction(): The scanner will decide the next action by calling this if
+ *     a call to process() returns a non-null value.
+ *   </li>
+ *   <li>finalizeKeyValues(): The aggregator can manipulate the final row/column
+ *     contents that will be returned by the scanner. This lets it use the
+ *     intermediate KeyValue instances returned till then and any accumulated state
+ *     at the end to create the final row.
+ *   </li>
+ * </ul>
+ */
+public interface KeyValueAggregator {
+  /**
+   * Called when the {@link StoreScanner} starts its attempt to get a row/col.
+   */
+  public void reset();
+
+  /**
+   * Make the aggregator process a single KeyValue.
+   * @param kv
+   * @return
+   */
+  public KeyValue process(KeyValue kv);
+
+  /**
+   * Called if the previous call to
+   * {@link #process(org.apache.hadoop.hbase.KeyValue)} had a non-null
+   * return value.
+   * @param origCode
+   * @return
+   */
+  public ScanQueryMatcher.MatchCode nextAction(
+    ScanQueryMatcher.MatchCode origCode);
+
+  /**
+   * Called at the end of the scan to flush out any remaining KeyValue.
+   */
+  public KeyValue finalizeKeyValues();
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1396575&r1=1396574&r2=1396575&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Wed Oct 10 12:51:00 2012
@@ -31,12 +31,14 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HBaseServer.Call;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
+
+import static org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
+
 /**
  * Scanner scans both the memstore and the HStore. Coalesce KeyValue stream
  * into List<KeyValue> for a single row.
@@ -59,6 +61,7 @@ class StoreScanner extends NonLazyKeyVal
   private final boolean explicitColumnQuery;
   private final boolean useRowColBloom;
   private final Scan scan;
+  private final KeyValueAggregator keyValueAggregator;
   private final NavigableSet<byte[]> columns;
   private final long oldestUnexpiredTS;
 
@@ -82,6 +85,7 @@ class StoreScanner extends NonLazyKeyVal
     int numCol = columns == null ? 0 : columns.size();
     explicitColumnQuery = numCol > 0;
     this.scan = scan;
+    this.keyValueAggregator = DefaultKeyValueAggregator.getInstance();
     this.columns = columns;
     oldestUnexpiredTS = EnvironmentEdgeManager.currentTimeMillis() - ttl;
 
@@ -147,7 +151,7 @@ class StoreScanner extends NonLazyKeyVal
    * @param scanners ancillary scanners
    * @param smallestReadPoint the readPoint that we should use for tracking
    *          versions
-   * @param retainDeletesInOutput should we retain deletes after compaction?
+   * @param retainDeletesInOutputUntil should we retain deletes after compaction?
    */
   StoreScanner(Store store, Scan scan,
       List<? extends KeyValueScanner> scanners, long smallestReadPoint,
@@ -331,7 +335,7 @@ class StoreScanner extends NonLazyKeyVal
     KeyValue kv;
     KeyValue prevKV = null;
     int numNewKeyValues = 0;
-
+    keyValueAggregator.reset();
     Call call = HRegionServer.callContext.get();
     long quotaRemaining = (call == null) ? Long.MAX_VALUE
         : HRegionServer.getResponseSizeLimit() - call.getPartialResponseSize();
@@ -354,6 +358,18 @@ class StoreScanner extends NonLazyKeyVal
         prevKV = kv;
         ScanQueryMatcher.MatchCode qcode = matcher.match(copyKv);
 
+        if ((qcode == MatchCode.INCLUDE) ||
+          (qcode == MatchCode.INCLUDE_AND_SEEK_NEXT_COL) ||
+          (qcode == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW)) {
+          copyKv = keyValueAggregator.process(copyKv);
+          // A null return is an indication to skip the KV.
+          if (copyKv == null) {
+            qcode = MatchCode.SKIP;
+          } else {
+            qcode = keyValueAggregator.nextAction(qcode);
+          }
+        }
+
         switch(qcode) {
           case INCLUDE:
           case INCLUDE_AND_SEEK_NEXT_ROW:
@@ -363,6 +379,7 @@ class StoreScanner extends NonLazyKeyVal
                 this.countPerRow > (storeLimit + storeOffset)) {
               // do what SEEK_NEXT_ROW does.
               if (!matcher.moreRowsMayExistAfter(kv)) {
+                numNewKeyValues += processLastKeyValue(outResult);
                 return false;
               }
               reseek(matcher.getKeyForNextRow(kv));
@@ -379,13 +396,13 @@ class StoreScanner extends NonLazyKeyVal
                     + HRegionServer.getResponseSizeLimit() + " bytes.");
                 throw new DoNotRetryIOException("Result too large");
               }
-
               outResult.add(copyKv);
               numNewKeyValues++;
             }
 
             if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
               if (!matcher.moreRowsMayExistAfter(kv)) {
+                numNewKeyValues += processLastKeyValue(outResult);
                 return false;
               }
               reseek(matcher.getKeyForNextRow(kv));
@@ -401,17 +418,19 @@ class StoreScanner extends NonLazyKeyVal
             continue;
 
           case DONE:
+            numNewKeyValues += processLastKeyValue(outResult);
             return true;
 
           case DONE_SCAN:
             close();
-
+            numNewKeyValues += processLastKeyValue(outResult);
             return false;
 
           case SEEK_NEXT_ROW:
             // This is just a relatively simple end of scan fix, to short-cut end
             // us if there is an endKey in the scan.
             if (!matcher.moreRowsMayExistAfter(kv)) {
+              numNewKeyValues += processLastKeyValue(outResult);
               return false;
             }
 
@@ -464,12 +483,12 @@ class StoreScanner extends NonLazyKeyVal
 
       throw e;
 
-    } finally { 
-      // update the counter 
+    } finally {
+      // update the counter
       if (addedResultsSize > 0 && metric != null) {
-        HRegion.incrNumericMetric(this.metricNamePrefix + metric, 
+        HRegion.incrNumericMetric(this.metricNamePrefix + metric,
             addedResultsSize);
-      } 
+      }
       // update the partial results size
       if (call != null) {
         call.setPartialResponseSize(call.getPartialResponseSize()
@@ -477,6 +496,7 @@ class StoreScanner extends NonLazyKeyVal
       }
     }
 
+    numNewKeyValues += processLastKeyValue(outResult);
     if (numNewKeyValues > 0) {
       return true;
     }
@@ -486,6 +506,15 @@ class StoreScanner extends NonLazyKeyVal
     return false;
   }
 
+  private byte processLastKeyValue(List<KeyValue> outResult){
+    KeyValue lastKV = keyValueAggregator.finalizeKeyValues();
+    if (lastKV != null) {
+      outResult.add(lastKV);
+      return 1;
+    }
+    return 0;
+  }
+
   @Override
   public synchronized boolean next(List<KeyValue> outResult) throws IOException {
     return next(outResult, -1, null);