You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/10/20 14:52:35 UTC

svn commit: r1400431 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/kvaggregator/ test/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/region...

Author: mbautin
Date: Sat Oct 20 12:52:34 2012
New Revision: 1400431

URL: http://svn.apache.org/viewvc?rev=1400431&view=rev
Log:
[HBASE-6967] Compaction hook framework

Author: adela

Summary: as in title

Test Plan: make dummy KVAggregator and check whether it works (TestKeyValueAggregator unit test added)

Reviewers: kannan, kranganathan

Reviewed By: kannan

CC: hbase-eng@, erling

Differential Revision: https://phabricator.fb.com/D598973

Task ID: 1749101

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/kvaggregator/
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/kvaggregator/DefaultKeyValueAggregator.java
      - copied, changed from r1400430, hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultKeyValueAggregator.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/kvaggregator/KeyValueAggregator.java
      - copied, changed from r1400430, hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueAggregator.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/kvaggregator/
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/kvaggregator/LowerToUpperAggregator.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/kvaggregator/TestKeyValueAggregator.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultKeyValueAggregator.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueAggregator.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultKeyValueAggregator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultKeyValueAggregator.java?rev=1400431&r1=1400430&r2=1400431&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultKeyValueAggregator.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultKeyValueAggregator.java Sat Oct 20 12:52:34 2012
@@ -1,60 +0,0 @@
-/**
- * Copyright The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import org.apache.hadoop.hbase.KeyValue;
-
-/**
- * Default aggregator used by {@link StoreScanner}.
- */
-public class DefaultKeyValueAggregator implements KeyValueAggregator {
-
-  private static DefaultKeyValueAggregator instance = null;
-
-  protected DefaultKeyValueAggregator() {
-  }
-
-  public static DefaultKeyValueAggregator getInstance() {
-    if (instance == null) {
-      instance = new DefaultKeyValueAggregator();
-    }
-    return instance;
-  }
-
-  @Override
-  public void reset() {
-  }
-
-  @Override
-  public KeyValue process(KeyValue kv) {
-    return kv;
-  }
-
-  @Override
-  public ScanQueryMatcher.MatchCode nextAction(
-    ScanQueryMatcher.MatchCode origCode) {
-    return origCode;
-  }
-
-  @Override
-  public KeyValue finalizeKeyValues() {
-    return null;
-  }
-}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueAggregator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueAggregator.java?rev=1400431&r1=1400430&r2=1400431&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueAggregator.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueAggregator.java Sat Oct 20 12:52:34 2012
@@ -1,73 +0,0 @@
-/**
- * Copyright The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import org.apache.hadoop.hbase.KeyValue;
-
-/**
- * <p>
- * Used by the {@link StoreScanner} to aggregate KeyValue instances to form a
- * single row/column. Implementations of this interface can expect the following
- * call sequence:
- * </p>
- * <ul>
- *   <li>reset(): When the scanner starts the row/col</li>
- *   <li>process(): This is called for each included KeyValue as determined by
- *     {@link ScanQueryMatcher}. The aggregator can decide to emit an intermediate
- *     KeyValue.
- *   </li>
- *   <li>nextAction(): The scanner will decide the next action by calling this if
- *     a call to process() returns a non-null value.
- *   </li>
- *   <li>finalizeKeyValues(): The aggregator can manipulate the final row/column
- *     contents that will be returned by the scanner. This lets it use the
- *     intermediate KeyValue instances returned till then and any accumulated state
- *     at the end to create the final row.
- *   </li>
- * </ul>
- */
-public interface KeyValueAggregator {
-  /**
-   * Called when the {@link StoreScanner} starts its attempt to get a row/col.
-   */
-  public void reset();
-
-  /**
-   * Make the aggregator process a single KeyValue.
-   * @param kv
-   * @return
-   */
-  public KeyValue process(KeyValue kv);
-
-  /**
-   * Called if the previous call to
-   * {@link #process(org.apache.hadoop.hbase.KeyValue)} had a non-null
-   * return value.
-   * @param origCode
-   * @return
-   */
-  public ScanQueryMatcher.MatchCode nextAction(
-    ScanQueryMatcher.MatchCode origCode);
-
-  /**
-   * Called at the end of the scan to flush out any remaining KeyValue.
-   */
-  public KeyValue finalizeKeyValues();
-}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1400431&r1=1400430&r2=1400431&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Sat Oct 20 12:52:34 2012
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.HColumnDe
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.HeapSize;
@@ -60,6 +61,8 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.regionserver.kvaggregator.DefaultKeyValueAggregator;
+import org.apache.hadoop.hbase.regionserver.kvaggregator.KeyValueAggregator;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -143,6 +146,8 @@ public class Store extends SchemaConfigu
   // Comparing KeyValues
   final KeyValue.KVComparator comparator;
 
+  private Class<KeyValueAggregator> aggregatorClass = null;
+
   /**
    * Constructor
    * @param basedir qualified path under which the region directory lives;
@@ -154,6 +159,7 @@ public class Store extends SchemaConfigu
    * failed.  Can be null.
    * @throws IOException
    */
+  @SuppressWarnings("unchecked")
   protected Store(Path basedir, HRegion region, HColumnDescriptor family,
       FileSystem fs, Configuration confParam)
   throws IOException {
@@ -235,6 +241,16 @@ public class Store extends SchemaConfigu
 
     setCompactionPolicy(conf.get(HConstants.COMPACTION_MANAGER_CLASS,
                                  HConstants.DEFAULT_COMPACTION_MANAGER_CLASS));
+
+    String aggregatorString = conf.get("aggregator");
+    if (aggregatorString != null && !aggregatorString.isEmpty()) {
+      try {
+        this.aggregatorClass = ((Class<KeyValueAggregator>) Class
+            .forName(aggregatorString));
+      } catch (ClassNotFoundException e) {
+        throw new IllegalArgumentException(e);
+      }
+    }
   }
 
   /**
@@ -484,7 +500,7 @@ public class Store extends SchemaConfigu
       srcPath = tmpPath;
     }
 
-    Path dstPath = StoreFile.getRandomFilename(fs, homedir, 
+    Path dstPath = StoreFile.getRandomFilename(fs, homedir,
         (sequenceId >= 0) ? ("_SeqId_" + sequenceId + "_") : null);
     LOG.info("Renaming bulk load file " + srcPath + " to " + dstPath);
     StoreFile.rename(fs, srcPath, dstPath);
@@ -637,7 +653,7 @@ public class Store extends SchemaConfigu
     InternalScanner scanner = new StoreScanner(this, scan,
         MemStore.getSnapshotScanners(snapshot, this.comparator),
         this.region.getSmallestReadPoint(),
-        Long.MIN_VALUE); // include all deletes
+        Long.MIN_VALUE, getAggregator()); // include all deletes
 
     String fileName;
     try {
@@ -1151,7 +1167,7 @@ public class Store extends SchemaConfigu
              (System.currentTimeMillis() - this.timeToPurgeDeletes))
           : Long.MIN_VALUE;
         scanner = new StoreScanner(this, scan, scanners, smallestReadPoint,
-            retainDeletesUntil);
+            retainDeletesUntil, getAggregator());
         int bytesWritten = 0;
         // since scanner.next() can return 'false' but still be delivering data,
         // we have to use a do/while loop.
@@ -1620,7 +1636,7 @@ public class Store extends SchemaConfigu
       final NavigableSet<byte []> targetCols) throws IOException {
     lock.readLock().lock();
     try {
-      return new StoreScanner(this, scan, targetCols);
+      return new StoreScanner(this, scan, targetCols, getAggregator());
     } finally {
       lock.readLock().unlock();
     }
@@ -1817,8 +1833,8 @@ public class Store extends SchemaConfigu
 
   public static final long FIXED_OVERHEAD =
       ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
-          + (17 * ClassSize.REFERENCE) + (5 * Bytes.SIZEOF_LONG)
-          + (4 * Bytes.SIZEOF_INT) + 2 * Bytes.SIZEOF_BOOLEAN);
+          + (18 * ClassSize.REFERENCE) + (5 * Bytes.SIZEOF_LONG)
+          + (4 * Bytes.SIZEOF_INT) + 2 * Bytes.SIZEOF_BOOLEAN) ;
 
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
       ClassSize.OBJECT + ClassSize.REENTRANT_LOCK +
@@ -1839,4 +1855,23 @@ public class Store extends SchemaConfigu
                                  HConstants.DEFAULT_COMPACTION_MANAGER_CLASS));
   }
 
+  /**
+   * Gets the aggregator which is specified in the Configuration. If none is
+   * specified, returns the DefaultKeyValueAggregator
+   * @throws IllegalAccessException
+   * @throws InstantiationException
+   */
+  private KeyValueAggregator getAggregator() {
+   if (aggregatorClass == null) {
+     return DefaultKeyValueAggregator.getInstance();
+   } else {
+     try {
+      return aggregatorClass.newInstance();
+    } catch (InstantiationException e) {
+      throw new IllegalArgumentException(e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalArgumentException(e);
+    }
+   }
+  }
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1400431&r1=1400430&r2=1400431&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Sat Oct 20 12:52:34 2012
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.ipc.HBase
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.regionserver.kvaggregator.KeyValueAggregator;
 
 
 import static org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
@@ -43,7 +44,7 @@ import static org.apache.hadoop.hbase.re
  * Scanner scans both the memstore and the HStore. Coalesce KeyValue stream
  * into List<KeyValue> for a single row.
  */
-class StoreScanner extends NonLazyKeyValueScanner
+public class StoreScanner extends NonLazyKeyValueScanner
     implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
   static final Log LOG = LogFactory.getLog(StoreScanner.class);
   private Store store;
@@ -77,7 +78,7 @@ class StoreScanner extends NonLazyKeyVal
 
   /** An internal constructor. */
   private StoreScanner(Store store, boolean cacheBlocks, Scan scan,
-      final NavigableSet<byte[]> columns, long ttl) {
+      final NavigableSet<byte[]> columns, long ttl, KeyValueAggregator keyValueAggregator) {
     this.store = store;
     initializeMetricNames();
     this.cacheBlocks = cacheBlocks;
@@ -85,7 +86,7 @@ class StoreScanner extends NonLazyKeyVal
     int numCol = columns == null ? 0 : columns.size();
     explicitColumnQuery = numCol > 0;
     this.scan = scan;
-    this.keyValueAggregator = DefaultKeyValueAggregator.getInstance();
+    this.keyValueAggregator = keyValueAggregator;
     this.columns = columns;
     oldestUnexpiredTS = EnvironmentEdgeManager.currentTimeMillis() - ttl;
 
@@ -105,9 +106,9 @@ class StoreScanner extends NonLazyKeyVal
    * @param columns which columns we are scanning
    * @throws IOException
    */
-  StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns)
-      throws IOException {
-    this(store, scan.getCacheBlocks(), scan, columns, store.ttl);
+  StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns,
+      KeyValueAggregator keyValueAggregator) throws IOException {
+    this(store, scan.getCacheBlocks(), scan, columns, store.ttl, keyValueAggregator);
     matcher =
         new ScanQueryMatcher(scan, store.getFamily().getName(), columns,
             store.comparator.getRawComparator(),
@@ -155,8 +156,8 @@ class StoreScanner extends NonLazyKeyVal
    */
   StoreScanner(Store store, Scan scan,
       List<? extends KeyValueScanner> scanners, long smallestReadPoint,
-      long retainDeletesInOutputUntil) throws IOException {
-    this(store, false, scan, null, store.ttl);
+      long retainDeletesInOutputUntil, KeyValueAggregator keyValueAggregator) throws IOException {
+    this(store, false, scan, null, store.ttl, keyValueAggregator);
 
     matcher =
         new ScanQueryMatcher(scan, store.getFamily().getName(), null,
@@ -180,9 +181,10 @@ class StoreScanner extends NonLazyKeyVal
   StoreScanner(final Scan scan, final byte [] colFamily, final long ttl,
       final KeyValue.KVComparator comparator,
       final NavigableSet<byte[]> columns,
-      final List<KeyValueScanner> scanners)
+      final List<KeyValueScanner> scanners,
+      final KeyValueAggregator keyValueAggregator)
         throws IOException {
-    this(scan, colFamily, ttl, comparator, columns, scanners, Long.MAX_VALUE);
+    this(scan, colFamily, ttl, comparator, columns, scanners, Long.MAX_VALUE, keyValueAggregator);
   }
 
   /** Constructor for testing. */
@@ -190,9 +192,10 @@ class StoreScanner extends NonLazyKeyVal
       final KeyValue.KVComparator comparator,
       final NavigableSet<byte[]> columns,
       final List<KeyValueScanner> scanners,
-      final long retainDeletesInOutputUntil)
+      final long retainDeletesInOutputUntil,
+      final KeyValueAggregator keyValueAggregator)
         throws IOException {
-    this(null, scan.getCacheBlocks(), scan, columns, ttl);
+    this(null, scan.getCacheBlocks(), scan, columns, ttl, keyValueAggregator);
     this.matcher =
         new ScanQueryMatcher(scan, colFamily, columns,
             comparator.getRawComparator(), scan.getMaxVersions(),

Copied: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/kvaggregator/DefaultKeyValueAggregator.java (from r1400430, hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultKeyValueAggregator.java)
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/kvaggregator/DefaultKeyValueAggregator.java?p2=hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/kvaggregator/DefaultKeyValueAggregator.java&p1=hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultKeyValueAggregator.java&r1=1400430&r2=1400431&rev=1400431&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultKeyValueAggregator.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/kvaggregator/DefaultKeyValueAggregator.java Sat Oct 20 12:52:34 2012
@@ -17,24 +17,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.regionserver;
+package org.apache.hadoop.hbase.regionserver.kvaggregator;
 
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher;
 
 /**
  * Default aggregator used by {@link StoreScanner}.
  */
 public class DefaultKeyValueAggregator implements KeyValueAggregator {
 
-  private static DefaultKeyValueAggregator instance = null;
+  static DefaultKeyValueAggregator instance = new DefaultKeyValueAggregator();
 
   protected DefaultKeyValueAggregator() {
   }
 
-  public static DefaultKeyValueAggregator getInstance() {
-    if (instance == null) {
-      instance = new DefaultKeyValueAggregator();
-    }
+  public static KeyValueAggregator getInstance() {
     return instance;
   }
 
@@ -57,4 +55,5 @@ public class DefaultKeyValueAggregator i
   public KeyValue finalizeKeyValues() {
     return null;
   }
+
 }

Copied: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/kvaggregator/KeyValueAggregator.java (from r1400430, hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueAggregator.java)
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/kvaggregator/KeyValueAggregator.java?p2=hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/kvaggregator/KeyValueAggregator.java&p1=hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueAggregator.java&r1=1400430&r2=1400431&rev=1400431&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueAggregator.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/kvaggregator/KeyValueAggregator.java Sat Oct 20 12:52:34 2012
@@ -17,9 +17,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.regionserver;
+package org.apache.hadoop.hbase.regionserver.kvaggregator;
 
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher;
 
 /**
  * <p>

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java?rev=1400431&r1=1400430&r2=1400431&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java Sat Oct 20 12:52:34 2012
@@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
 import org.apache.hadoop.hbase.io.hfile.HFilePrettyPrinter;
 import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
+import org.apache.hadoop.hbase.regionserver.kvaggregator.DefaultKeyValueAggregator;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.MD5Hash;
 import org.apache.hadoop.util.StringUtils;
@@ -417,7 +418,8 @@ public class HFileReadWriteTest {
       Scan scan = new Scan();
 
       // Include deletes
-      scanner = new StoreScanner(store, scan, scanners, Long.MAX_VALUE, Long.MIN_VALUE);
+      scanner = new StoreScanner(store, scan, scanners, Long.MAX_VALUE,
+          Long.MIN_VALUE, DefaultKeyValueAggregator.getInstance());
       ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
 
       while (scanner.next(kvs) || kvs.size() != 0) {

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java?rev=1400431&r1=1400430&r2=1400431&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java Sat Oct 20 12:52:34 2012
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueTestUtil;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.kvaggregator.DefaultKeyValueAggregator;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -63,8 +64,8 @@ public class TestMemStore extends TestCa
     super.setUp();
     this.mvcc = new MultiVersionConsistencyControl();
     this.memstore = new MemStore();
-		SchemaMetrics.configureGlobally(HBaseConfiguration.create());
-	}
+    SchemaMetrics.configureGlobally(HBaseConfiguration.create());
+  }
 
   public void testPutSameKey() {
     byte [] bytes = Bytes.toBytes(getName());
@@ -90,7 +91,7 @@ public class TestMemStore extends TestCa
     List<KeyValue> result = new ArrayList<KeyValue>();
     MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
     StoreScanner s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
-      this.memstore.comparator, null, memstorescanners);
+      this.memstore.comparator, null, memstorescanners, DefaultKeyValueAggregator.getInstance());
     int count = 0;
     try {
       while (s.next(result)) {
@@ -112,7 +113,7 @@ public class TestMemStore extends TestCa
     memstorescanners = this.memstore.getScanners();
     // Now assert can count same number even if a snapshot mid-scan.
     s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
-      this.memstore.comparator, null, memstorescanners);
+      this.memstore.comparator, null, memstorescanners, DefaultKeyValueAggregator.getInstance());
     count = 0;
     try {
       while (s.next(result)) {
@@ -139,7 +140,7 @@ public class TestMemStore extends TestCa
     // Assert that new values are seen in kvset as we scan.
     long ts = System.currentTimeMillis();
     s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
-      this.memstore.comparator, null, memstorescanners);
+      this.memstore.comparator, null, memstorescanners, DefaultKeyValueAggregator.getInstance());
     count = 0;
     int snapshotIndex = 5;
     try {
@@ -556,7 +557,7 @@ public class TestMemStore extends TestCa
       InternalScanner scanner =
           new StoreScanner(new Scan(Bytes.toBytes(startRowId)), FAMILY,
               Integer.MAX_VALUE, this.memstore.comparator, null,
-              memstore.getScanners());
+              memstore.getScanners(), DefaultKeyValueAggregator.getInstance());
       List<KeyValue> results = new ArrayList<KeyValue>();
       for (int i = 0; scanner.next(results); i++) {
         int rowId = startRowId + i;

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java?rev=1400431&r1=1400430&r2=1400431&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java Sat Oct 20 12:52:34 2012
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.HBaseConf
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueTestUtil;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.kvaggregator.DefaultKeyValueAggregator;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -79,7 +80,7 @@ public class TestStoreScanner extends Te
     scanSpec.setMaxVersions();
     StoreScanner scan =
       new StoreScanner(scanSpec, CF, Long.MAX_VALUE,
-        KeyValue.COMPARATOR, getCols("a"), scanners);
+        KeyValue.COMPARATOR, getCols("a"), scanners, DefaultKeyValueAggregator.getInstance());
     List<KeyValue> results = new ArrayList<KeyValue>();
     assertEquals(true, scan.next(results));
     assertEquals(5, results.size());
@@ -89,7 +90,7 @@ public class TestStoreScanner extends Te
     scanSpec.setTimeRange(1, 3);
     scanSpec.setMaxVersions();
     scan = new StoreScanner(scanSpec, CF, Long.MAX_VALUE,
-      KeyValue.COMPARATOR, getCols("a"), scanners);
+      KeyValue.COMPARATOR, getCols("a"), scanners, DefaultKeyValueAggregator.getInstance());
     results = new ArrayList<KeyValue>();
     assertEquals(true, scan.next(results));
     assertEquals(2, results.size());
@@ -98,7 +99,7 @@ public class TestStoreScanner extends Te
     scanSpec.setTimeRange(5, 10);
     scanSpec.setMaxVersions();
     scan = new StoreScanner(scanSpec, CF, Long.MAX_VALUE,
-      KeyValue.COMPARATOR, getCols("a"), scanners);
+      KeyValue.COMPARATOR, getCols("a"), scanners, DefaultKeyValueAggregator.getInstance());
     results = new ArrayList<KeyValue>();
     assertEquals(true, scan.next(results));
     assertEquals(1, results.size());
@@ -108,7 +109,7 @@ public class TestStoreScanner extends Te
     scanSpec.setTimeRange(0, 10);
     scanSpec.setMaxVersions(3);
     scan = new StoreScanner(scanSpec, CF, Long.MAX_VALUE,
-      KeyValue.COMPARATOR, getCols("a"), scanners);
+      KeyValue.COMPARATOR, getCols("a"), scanners, DefaultKeyValueAggregator.getInstance());
     results = new ArrayList<KeyValue>();
     assertEquals(true, scan.next(results));
     assertEquals(3, results.size());
@@ -130,7 +131,7 @@ public class TestStoreScanner extends Te
     StoreScanner scan =
       new StoreScanner(scanSpec, CF, Long.MAX_VALUE,
           KeyValue.COMPARATOR, getCols("a"),
-          scanners);
+          scanners, DefaultKeyValueAggregator.getInstance());
 
     List<KeyValue> results = new ArrayList<KeyValue>();
     assertEquals(true, scan.next(results));
@@ -159,7 +160,7 @@ public class TestStoreScanner extends Te
     StoreScanner scan =
       new StoreScanner(scanSpec, CF, Long.MAX_VALUE,
           KeyValue.COMPARATOR, getCols("a"),
-          scanners);
+          scanners, DefaultKeyValueAggregator.getInstance());
 
     List<KeyValue> results = new ArrayList<KeyValue>();
     scan.next(results);
@@ -188,7 +189,7 @@ public class TestStoreScanner extends Te
     Scan scanSpec = new Scan(Bytes.toBytes("R1"));
     StoreScanner scan =
       new StoreScanner(scanSpec, CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
-          getCols("a"), scanners);
+          getCols("a"), scanners,  DefaultKeyValueAggregator.getInstance());
 
     List<KeyValue> results = new ArrayList<KeyValue>();
     assertFalse(scan.next(results));
@@ -209,7 +210,7 @@ public class TestStoreScanner extends Te
     Scan scanSpec = new Scan(Bytes.toBytes("R1"));
     StoreScanner scan =
       new StoreScanner(scanSpec, CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
-          getCols("a"), scanners);
+          getCols("a"), scanners, DefaultKeyValueAggregator.getInstance());
 
     List<KeyValue> results = new ArrayList<KeyValue>();
     assertEquals(true, scan.next(results));
@@ -237,7 +238,7 @@ public class TestStoreScanner extends Te
 
     StoreScanner scan =
       new StoreScanner(new Scan(Bytes.toBytes("R1")), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
-          getCols("a"), scanners);
+          getCols("a"), scanners, DefaultKeyValueAggregator.getInstance());
     List<KeyValue> results = new ArrayList<KeyValue>();
     // the two put at ts=now will be masked by the 1 delete, and
     // since the scan default returns 1 version we'll return the newest
@@ -263,7 +264,7 @@ public class TestStoreScanner extends Te
     Scan scanSpec = new Scan(Bytes.toBytes("R1")).setMaxVersions(2);
     StoreScanner scan =
       new StoreScanner(scanSpec, CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
-          getCols("a"), scanners);
+          getCols("a"), scanners, DefaultKeyValueAggregator.getInstance());
     List<KeyValue> results = new ArrayList<KeyValue>();
     assertEquals(true, scan.next(results));
     assertEquals(2, results.size());
@@ -280,7 +281,7 @@ public class TestStoreScanner extends Te
     List<KeyValueScanner> scanners = scanFixture(kvs);
     StoreScanner scan =
       new StoreScanner(new Scan(Bytes.toBytes("R1")), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
-          null, scanners);
+          null, scanners, DefaultKeyValueAggregator.getInstance());
     List<KeyValue> results = new ArrayList<KeyValue>();
     assertEquals(true, scan.next(results));
     assertEquals(2, results.size());
@@ -310,7 +311,7 @@ public class TestStoreScanner extends Te
     List<KeyValueScanner> scanners = scanFixture(kvs);
     StoreScanner scan =
       new StoreScanner(new Scan().setMaxVersions(2), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
-          null, scanners);
+          null, scanners, DefaultKeyValueAggregator.getInstance());
     List<KeyValue> results = new ArrayList<KeyValue>();
     assertEquals(true, scan.next(results));
     assertEquals(5, results.size());
@@ -339,7 +340,7 @@ public class TestStoreScanner extends Te
     List<KeyValueScanner> scanners = scanFixture(kvs);
     StoreScanner scan =
       new StoreScanner(new Scan().setMaxVersions(Integer.MAX_VALUE), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
-          null, scanners);
+          null, scanners, DefaultKeyValueAggregator.getInstance());
     List<KeyValue> results = new ArrayList<KeyValue>();
     assertEquals(true, scan.next(results));
     assertEquals(0, results.size());
@@ -389,7 +390,7 @@ public class TestStoreScanner extends Te
     List<KeyValueScanner> scanners = scanFixture(kvs);
     StoreScanner scanner =
       new StoreScanner(new Scan().setMaxVersions(2), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
-          null, scanners, now - 200);
+          null, scanners, now - 200, DefaultKeyValueAggregator.getInstance());
     List<KeyValue> results = new ArrayList<KeyValue>();
     results = new ArrayList<KeyValue>();
     assertEquals(true, scanner.next(results));
@@ -413,7 +414,7 @@ public class TestStoreScanner extends Te
     List<KeyValueScanner> scanners = scanFixture(kvs);
     StoreScanner scan =
       new StoreScanner(new Scan(), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
-          null, scanners);
+          null, scanners, DefaultKeyValueAggregator.getInstance());
     List<KeyValue> results = new ArrayList<KeyValue>();
     assertEquals(true, scan.next(results));
     assertEquals(1, results.size());
@@ -437,7 +438,7 @@ public class TestStoreScanner extends Te
     List<KeyValueScanner> scanners = scanFixture(kvs);
     StoreScanner scan =
       new StoreScanner(new Scan(), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
-          getCols("a", "d"), scanners);
+          getCols("a", "d"), scanners, DefaultKeyValueAggregator.getInstance());
 
     List<KeyValue> results = new ArrayList<KeyValue>();
     assertEquals(true, scan.next(results));
@@ -475,7 +476,7 @@ public class TestStoreScanner extends Te
     scan.setMaxVersions(1);
     StoreScanner scanner =
       new StoreScanner(scan, CF, 500, KeyValue.COMPARATOR,
-          null, scanners);
+          null, scanners, DefaultKeyValueAggregator.getInstance());
 
     List<KeyValue> results = new ArrayList<KeyValue>();
     assertEquals(true, scanner.next(results));
@@ -498,7 +499,7 @@ public class TestStoreScanner extends Te
     List<KeyValueScanner> scanners = scanFixture(kvs);
     StoreScanner scan =
         new StoreScanner(new Scan(), CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
-            getCols("a", "d"), scanners);
+            getCols("a", "d"), scanners, DefaultKeyValueAggregator.getInstance());
 
 
     // Previously a updateReaders twice in a row would cause an NPE.  In test this would also
@@ -525,7 +526,7 @@ public class TestStoreScanner extends Te
     Scan scanSpec = new Scan(Bytes.toBytes("R1"));
     StoreScanner scan =
       new StoreScanner(scanSpec, CF, Long.MAX_VALUE, KeyValue.COMPARATOR,
-          getCols("a"), scanners);
+          getCols("a"), scanners, DefaultKeyValueAggregator.getInstance());
     assertNull(scan.peek());
   }
 
@@ -543,7 +544,7 @@ public class TestStoreScanner extends Te
     // scanner with ttl equal to 500
     StoreScanner scanner =
         new StoreScanner(scan, CF, 500, KeyValue.COMPARATOR,
-            null, scanners);
+            null, scanners, DefaultKeyValueAggregator.getInstance());
 
     List<KeyValue> results = new ArrayList<KeyValue>();
     assertEquals(true, scanner.next(results));

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/kvaggregator/LowerToUpperAggregator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/kvaggregator/LowerToUpperAggregator.java?rev=1400431&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/kvaggregator/LowerToUpperAggregator.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/kvaggregator/LowerToUpperAggregator.java Sat Oct 20 12:52:34 2012
@@ -0,0 +1,41 @@
+package org.apache.hadoop.hbase.regionserver.kvaggregator;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
+
+/**
+ * <p>
+ * Dummy test aggregator which takes the String Value of the KV which is
+ * expected to be in lowercase and transforms it to uppercase
+ * </p>
+ */
+public class LowerToUpperAggregator implements KeyValueAggregator {
+
+  @Override
+  public void reset() {
+  }
+
+  @Override
+  public KeyValue process(KeyValue kv) {
+    byte[] newValue;
+    String currentValue = new String(kv.getValue());
+    /**
+     * transform it to uppercase.
+     */
+    String newValueString = currentValue.toUpperCase();
+    newValue = newValueString.getBytes();
+    KeyValue newKv = new KeyValue(kv.getRow(), kv.getFamily(),
+        kv.getQualifier(), kv.getTimestamp(), newValue);
+    return newKv;
+  }
+
+  @Override
+  public MatchCode nextAction(MatchCode origCode) {
+    return origCode;
+  }
+
+  @Override
+  public KeyValue finalizeKeyValues() {
+    return null;
+  }
+}

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/kvaggregator/TestKeyValueAggregator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/kvaggregator/TestKeyValueAggregator.java?rev=1400431&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/kvaggregator/TestKeyValueAggregator.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/kvaggregator/TestKeyValueAggregator.java Sat Oct 20 12:52:34 2012
@@ -0,0 +1,85 @@
+package org.apache.hadoop.hbase.regionserver.kvaggregator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+@SuppressWarnings("deprecation")
+public class TestKeyValueAggregator {
+  private static byte[] TABLE = Bytes.toBytes("TestKeyValueAggregator");
+  private static byte[] FAMILY = Bytes.toBytes("family");
+  private static byte[] START_KEY = Bytes.toBytes("aaa");
+  private static byte[] END_KEY = Bytes.toBytes("zzz");
+  private static int BLOCK_SIZE = 70;
+
+  private static HBaseTestingUtility TEST_UTIL = null;
+  private static HTableDescriptor TESTTABLEDESC = null;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    SchemaMetrics.setUseTableNameInTest(true);
+    TEST_UTIL = new HBaseTestingUtility();
+    TESTTABLEDESC = new HTableDescriptor(TABLE);
+
+    TESTTABLEDESC.addFamily(new HColumnDescriptor(FAMILY).setMaxVersions(10)
+        .setBlockCacheEnabled(true).setBlocksize(BLOCK_SIZE)
+        .setCompressionType(Compression.Algorithm.NONE));
+    TEST_UTIL
+        .getConfiguration()
+        .set("aggregator",
+            "org.apache.hadoop.hbase.regionserver.kvaggregator.LowerToUpperAggregator");
+  }
+
+  @Test
+  public void testDummyKvAggregator() throws Exception {
+    HRegion r = HBaseTestCase.createNewHRegion(TESTTABLEDESC, START_KEY,
+        END_KEY, TEST_UTIL.getConfiguration());
+    Put[] puts = new Put[25];
+    // put some lowercase strings
+    for (int i = 0; i < 25; i++) {
+      byte[] row = Bytes.toBytes("row" + i);
+      Put put = new Put(row);
+      byte[] qualifier = Bytes.toBytes("qual" + i);
+      byte[] value = Bytes.toBytes("ab" + (char) (i + 97));
+      put.add(FAMILY, qualifier, value);
+      puts[i] = put;
+    }
+    r.put(puts);
+    Scan scan = new Scan();
+    InternalScanner s = r.getScanner(scan);
+    List<KeyValue> results = new ArrayList<KeyValue>();
+    while (s.next(results))
+      ;
+    // check if the values in results are in upper case
+    s.close();
+    Assert.assertTrue(checkIfLowerCase(results));
+    // check if we got all 25 results back
+    Assert.assertEquals(25, results.size());
+  }
+
+  public boolean checkIfLowerCase(List<KeyValue> result) {
+    for (KeyValue kv : result) {
+      String currValue = new String(kv.getValue());
+      String uppercase = currValue.toUpperCase();
+      if (!currValue.equals(uppercase)) {
+        return false;
+      }
+    }
+    return true;
+  }
+}