You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/05/02 20:18:27 UTC

svn commit: r1591995 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/coprocessor/endpoints/ main/java/org/apache/hadoop/hbase/ipc/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/util/ test/java/org...

Author: liyin
Date: Fri May  2 18:18:27 2014
New Revision: 1591995

URL: http://svn.apache.org/r1591995
Log:
[master] Add LongAggregator endpoint.

Author: daviddeng

Summary:
Implements min/max/sum fuctions.
Some refactoring.

Test Plan: `TestLongAggregator`

Reviewers: adela, manukranthk, gauravm

Reviewed By: adela

CC: hbase-eng@

Differential Revision: https://phabricator.fb.com/D1297685

Task ID: 4223215

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/ILongAggregator.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/LongAggregator.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestLongAggregator.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointLib.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpoint.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointContext.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ThriftHRegionInterface.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionIf.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpoint.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointLib.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointLib.java?rev=1591995&r1=1591994&r2=1591995&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointLib.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointLib.java Fri May  2 18:18:27 2014
@@ -24,7 +24,7 @@ import java.util.ArrayList;
 
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionIf;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 
 /**
@@ -44,7 +44,7 @@ public class EndpointLib {
   /**
    * Aggregates all KeyValue's in a region defined by a Scan.
    */
-  public static void aggregateScan(HRegion region, Scan scan, IAggregator aggr)
+  public static void aggregateScan(HRegionIf region, Scan scan, IAggregator aggr)
       throws IOException {
     try (InternalScanner scanner = region.getScanner(scan)) {
       ArrayList<KeyValue> kvs = new ArrayList<>();

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java?rev=1591995&r1=1591994&r2=1591995&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java Fri May  2 18:18:27 2014
@@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.coprocessor.endpoints.EndpointManager.EndpointInfo;
 import org.apache.hadoop.hbase.ipc.thrift.exceptions.ThriftHBaseException;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionIf;
 import org.apache.hadoop.hbase.regionserver.HRegionServerIf;
 
 /**
@@ -62,7 +62,7 @@ public class EndpointServer implements I
       // Set the context.
       ep.setContext(new IEndpointContext() {
         @Override
-        public HRegion getRegion() throws NotServingRegionException {
+        public HRegionIf getRegion() throws NotServingRegionException {
           return EndpointServer.this.server.getRegion(regionName);
         }
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpoint.java?rev=1591995&r1=1591994&r2=1591995&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpoint.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpoint.java Fri May  2 18:18:27 2014
@@ -19,6 +19,8 @@
  */
 package org.apache.hadoop.hbase.coprocessor.endpoints;
 
+import java.io.IOException;
+
 /**
  * The common parent of all endpoint interfaces.
  */
@@ -31,5 +33,5 @@ public interface IEndpoint {
    * @param a
    *          non-null IEndpointContext
    */
-  void setContext(IEndpointContext context);
+  void setContext(IEndpointContext context) throws IOException;
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointContext.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointContext.java?rev=1591995&r1=1591994&r2=1591995&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointContext.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointContext.java Fri May  2 18:18:27 2014
@@ -20,7 +20,7 @@
 package org.apache.hadoop.hbase.coprocessor.endpoints;
 
 import org.apache.hadoop.hbase.NotServingRegionException;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionIf;
 
 /**
  * The context of an endpoint calling.
@@ -34,7 +34,7 @@ public interface IEndpointContext {
    * @throws NotServingRegionException
    *           if the region is not served on this server.
    */
-  HRegion getRegion() throws NotServingRegionException;
+  HRegionIf getRegion() throws NotServingRegionException;
 
   /**
    * The start row, inclusive, within this region of this call.

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/ILongAggregator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/ILongAggregator.java?rev=1591995&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/ILongAggregator.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/ILongAggregator.java Fri May  2 18:18:27 2014
@@ -0,0 +1,57 @@
+/**
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.endpoints;
+
+import java.io.IOException;
+
+/**
+ * The endpoint that aggregates values.
+ */
+public interface ILongAggregator extends IEndpoint {
+  /**
+   * Sum of the values.
+   *
+   * @param family the column family name. All families if empty.
+   * @param qualifier the column name. All columns if empty.
+   * @param offset the offset of the long in the value in bytes.
+   */
+  public long sum(byte[] family, byte[] qualifier, int offset)
+      throws IOException;
+
+  /**
+   * Min of the values. If no values found, Long.MAX_VALUE is returned.
+   *
+   * @param family the column family name. All families if empty.
+   * @param qualifier the column name. All columns if empty.
+   * @param offset the offset of the long in the value in bytes.
+   */
+  public long min(byte[] family, byte[] qualifier, int offset)
+      throws IOException;
+
+  /**
+   * Max of the values.If no values found, Long.MIN_VALUE is returned.
+   *
+   * @param family the column family name. All families if empty.
+   * @param qualifier the column name. All columns if empty.
+   * @param offset the offset of the long in the value in bytes.
+   */
+  public long max(byte[] family, byte[] qualifier, int offset)
+      throws IOException;
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/LongAggregator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/LongAggregator.java?rev=1591995&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/LongAggregator.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/LongAggregator.java Fri May  2 18:18:27 2014
@@ -0,0 +1,146 @@
+/**
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.endpoints;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.HRegionIf;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Implementation of ILongAggregator for aggregation of sum/min/max
+ */
+public class LongAggregator implements ILongAggregator {
+
+  /**
+   * The factory of ILongAggregator
+   */
+  public static class Factory implements IEndpointFactory<ILongAggregator> {
+    @Override
+    public ILongAggregator create() {
+      return new LongAggregator();
+    }
+
+    @Override
+    public Class<ILongAggregator> getEndpointInterface() {
+      return ILongAggregator.class;
+    }
+  }
+
+  private HRegionIf region;
+  @Override
+  public void setContext(IEndpointContext context) throws IOException {
+    this.region = context.getRegion();
+  }
+
+  private interface IUpdater {
+    /**
+     * Updates current with newValue and returns the result.
+     */
+    long update(long current, long newValue);
+  }
+
+  /**
+   * Aggregates with a specified updater.
+   *
+   * @param offset the offset of the long in the value in bytes.
+   * @param initValue the initial value for the aggregating variable.
+   */
+  private long cacluate(byte[] family, byte[] qualifier, int offset,
+      IUpdater updater, long initValue) throws IOException {
+    Scan.Builder builder = new Scan.Builder();
+    if (family != null) {
+      if (qualifier == null) {
+        builder.addColumn(family, qualifier);
+      } else {
+        builder.addFamily(family);
+      }
+    }
+
+    try (InternalScanner scanner = this.region.getScanner(builder.create())) {
+
+      long res = initValue;
+
+      List<KeyValue> kvs = new ArrayList<>();
+      boolean hasMore = true;
+      while (hasMore) {
+        // fetch key-values.
+        kvs.clear();
+        hasMore = scanner.next(kvs);
+
+        for (KeyValue kv : kvs) {
+          // check bound
+          if (offset + Bytes.LONG_BYTES > kv.getValueLength()) {
+            throw new IndexOutOfBoundsException();
+          }
+          // decode the value
+          long vl = Bytes.toLong(kv.getBuffer(), kv.getValueOffset() + offset);
+          // aggregate
+          res = updater.update(res, vl);
+        }
+      }
+      return res;
+    }
+  }
+
+  private static final IUpdater SUM_UPDATER = new IUpdater() {
+    @Override
+    public long update(long current, long newValue) {
+      return current + newValue;
+    }
+  };
+
+  @Override
+  public long sum(byte[] family, byte[] qualifier, int offset)
+      throws IOException {
+    return cacluate(family, qualifier, offset, SUM_UPDATER, 0L);
+  }
+
+  private static final IUpdater MIN_UPDATER = new IUpdater() {
+    @Override
+    public long update(long current, long newValue) {
+      return Math.min(current, newValue);
+    }
+  };
+
+  @Override
+  public long min(byte[] family, byte[] qualifier, int offset)
+      throws IOException {
+    return cacluate(family, qualifier, offset, MIN_UPDATER, Long.MAX_VALUE);
+  }
+
+  private static final IUpdater MAX_UPDATER = new IUpdater() {
+    @Override
+    public long update(long current, long newValue) {
+      return Math.max(current, newValue);
+    }
+  };
+
+  @Override
+  public long max(byte[] family, byte[] qualifier, int offset)
+      throws IOException {
+    return cacluate(family, qualifier, offset, MAX_UPDATER, Long.MIN_VALUE);
+  }
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ThriftHRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ThriftHRegionInterface.java?rev=1591995&r1=1591994&r2=1591995&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ThriftHRegionInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ThriftHRegionInterface.java Fri May  2 18:18:27 2014
@@ -60,17 +60,6 @@ import com.google.common.util.concurrent
 public interface ThriftHRegionInterface extends ThriftClientInterface,
     IEndpointService, IRegionScanService {
 
-  /**
-   * Opens a scanner, optionally returns some data if numberOfRows > 0.
-   *
-   * @param regionName the name of the region to scan
-   * @param scan the Scan instance defining scan query.
-   * @param numberOfRows maximum number of rows to return after successfully
-   *          open the scanner.
-   * @return the result as a ScannerResult.
-   *         The length of the Result list of the return value could be empty
-   *         and EOR is set to true for sure in this case.
-   */
   @Override
   @ThriftMethod(value = "scanOpen", exception = {
       @ThriftException(type = ThriftHBaseException.class, id = 1) })
@@ -79,15 +68,6 @@ public interface ThriftHRegionInterface 
       @ThriftField(name = "numberOfRows") int numberOfRows)
       throws ThriftHBaseException;
 
-  /**
-   * Returns next scanning results.
-   *
-   * @param ID the ID of the scanner
-   * @param numberOfRows maximum number of rows to return,
-   * @return the result as a ScannerResult.
-   *         The length of the Result list of the return value could be empty
-   *         and EOR is set to true for sure in this case.
-   */
   @Override
   @ThriftMethod(value = "scanNext", exception = {
       @ThriftException(type = ThriftHBaseException.class, id = 1) })
@@ -95,12 +75,6 @@ public interface ThriftHRegionInterface 
       @ThriftField(name = "numberOfRows") int numberOfRows)
       throws ThriftHBaseException;
 
-  /**
-   * Closes the scanner on the server side.
-   *
-   * @param id the ID of the scanner to close
-   * @return true if a scanner is closed. false if the scanner doesn't exist.
-   */
   @Override
   @ThriftMethod(value = "scanClose", exception = {
       @ThriftException(type = ThriftHBaseException.class, id = 1) })

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1591995&r1=1591994&r2=1591995&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri May  2 18:18:27 2014
@@ -1789,16 +1789,7 @@ public class HRegion implements HeapSize
     }
   }
 
-  /**
-   * Return an iterator that scans over the HRegion, returning the indicated
-   * columns and rows specified by the {@link Scan}.
-   * <p>
-   * This Iterator must be closed by the caller.
-   *
-   * @param scan configured {@link Scan}
-   * @return InternalScanner
-   * @throws IOException read exceptions
-   */
+  @Override
   public InternalScanner getScanner(Scan scan) throws IOException {
     return getScanner(scan, null);
   }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionIf.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionIf.java?rev=1591995&r1=1591994&r2=1591995&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionIf.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionIf.java Fri May  2 18:18:27 2014
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Pair;
 
 /**
@@ -34,7 +35,7 @@ public interface HRegionIf {
   /**
    * @return the HRegionInfo of this region
    */
-  public HRegionInfo getRegionInfo();
+  HRegionInfo getRegionInfo();
 
   /**
    * Flushes the cache.
@@ -57,21 +58,33 @@ public interface HRegionIf {
    *
    * @return true if cache was flushed
    */
-  public boolean flushMemstoreShapshot(boolean selectiveFlushRequest)
+  boolean flushMemstoreShapshot(boolean selectiveFlushRequest)
       throws IOException;
 
   /**
    * @return how info about the last flushes <time, size>
    */
-  public List<Pair<Long, Long>> getRecentFlushInfo();
+  List<Pair<Long, Long>> getRecentFlushInfo();
 
   /**
    * @return True if this region has references.
    */
-  public boolean hasReferences();
+  boolean hasReferences();
 
   /**
    * @return the maximum number of files among all stores.
    */
-  public int maxStoreFilesCount();
+  int maxStoreFilesCount();
+
+  /**
+   * Return an iterator that scans over the HRegion, returning the indicated
+   * columns and rows specified by the {@link Scan}.
+   * <p>
+   * This Iterator must be closed by the caller.
+   *
+   * @param scan configured {@link Scan}
+   * @return InternalScanner
+   * @throws IOException read exceptions
+   */
+  InternalScanner getScanner(Scan scan) throws IOException;
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java?rev=1591995&r1=1591994&r2=1591995&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java Fri May  2 18:18:27 2014
@@ -569,6 +569,13 @@ public class Bytes {
   }
 
   /**
+   * Converts a Long value to a byte array using big-endian.
+   */
+  public static byte[] toBytes(Long val) {
+    return toBytes(val.longValue());
+  }
+
+  /**
    * Converts a byte array to a long value. Reverses
    * {@link #toBytes(long)}
    * @param bytes array
@@ -807,7 +814,36 @@ public class Bytes {
   }
 
   /**
+   * Converts a char value to a byte array of {@link #SIZEOF_CHAR} bytes long.
+   *
+   * @param val value
+   * @return the byte array
+   */
+  public static byte[] toBytes(char val) {
+    return toBytes((short) val);
+  }
+
+  /**
+   * Converts a Character value to a byte array of {@link #SIZEOF_CHAR} bytes
+   * long.
+   *
+   * @param val value
+   * @return the byte array
+   */
+  public static byte[] toBytes(Character val) {
+    return toBytes(val.charValue());
+  }
+
+  /**
+   * Converts a byte array to a char value
+   */
+  public static char toChar(byte[] bytes) {
+    return (char) Bytes.toShort(bytes);
+  }
+
+  /**
    * Convert a short value to a byte array of {@link #SIZEOF_SHORT} bytes long.
+   *
    * @param val value
    * @return the byte array
    */

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpoint.java?rev=1591995&r1=1591994&r2=1591995&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpoint.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpoint.java Fri May  2 18:18:27 2014
@@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.endpoints.EndpointLib.IAggregator;
 import org.apache.hadoop.hbase.coprocessor.endpoints.IEndpointClient.Caller;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionIf;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.StringBytes;
 import org.junit.After;
@@ -101,7 +101,7 @@ public class TestEndpoint {
 
     @Override
     public long sum(int offset) throws IOException {
-      HRegion region = context.getRegion();
+      HRegionIf region = context.getRegion();
       Scan scan = new Scan();
       scan.addFamily(FAMILY_NAME);
       scan.addColumn(FAMILY_NAME, QUALITY_NAME);

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestLongAggregator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestLongAggregator.java?rev=1591995&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestLongAggregator.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestLongAggregator.java Fri May  2 18:18:27 2014
@@ -0,0 +1,129 @@
+/**
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.endpoints;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.endpoints.IEndpointClient.Caller;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.StringBytes;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Testcases for endpoints defined in LongAggregators.
+ */
+public class TestLongAggregator {
+  private static final HBaseTestingUtility TEST_UTIL =
+      new HBaseTestingUtility();
+  private static final byte[] FAMILY_NAME = Bytes.toBytes("f");
+  private static final byte[] QUALITY_NAME = Bytes.toBytes("q");
+
+  @Before
+  public void setUp() throws Exception {
+    TEST_UTIL.getConfiguration().setStrings(EndpointLoader.FACTORY_CLASSES_KEY,
+        LongAggregator.Factory.class.getName());
+
+    TEST_UTIL.startMiniCluster();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testCall() throws Exception {
+    final StringBytes TABLE_NAME = new StringBytes("testCall");
+    // Create the table
+    HTableInterface table = TEST_UTIL.createTable(TABLE_NAME, FAMILY_NAME);
+
+    final byte[] PREFIX = new byte[] { 'f', 'b' };
+
+    // Put some values
+    for (int i = 1; i <= 10; i++) {
+      table.put(new Put(Bytes.toBytes("row" + i)).add(FAMILY_NAME,
+          QUALITY_NAME, Bytes.add(PREFIX, Bytes.toBytes((long) i))));
+    }
+
+    // Calling endpoints.
+    IEndpointClient cp = (IEndpointClient) table;
+    Map<HRegionInfo, Long> results =
+        cp.coprocessorEndpoint(ILongAggregator.class, null, null,
+            new Caller<ILongAggregator, Long>() {
+              @Override
+              public Long call(ILongAggregator client) throws IOException {
+                return client.sum(FAMILY_NAME, null, PREFIX.length);
+              }
+            });
+
+    // Aggregates results from all regions
+    long sum = 0;
+    for (Long res : results.values()) {
+      sum += res;
+    }
+
+    // Check the final results
+    Assert.assertEquals("sum", 55, sum);
+
+    results =
+        cp.coprocessorEndpoint(ILongAggregator.class, null, null,
+            new Caller<ILongAggregator, Long>() {
+              @Override
+              public Long call(ILongAggregator client) throws IOException {
+                return client.max(FAMILY_NAME, null, PREFIX.length);
+              }
+            });
+
+    // Aggregates results from all regions
+    long max = Long.MIN_VALUE;
+    for (Long res : results.values()) {
+      max = Math.max(max, res);
+    }
+
+    // Check the final results
+    Assert.assertEquals("max", 10, max);
+
+    results =
+        cp.coprocessorEndpoint(ILongAggregator.class, null, null,
+            new Caller<ILongAggregator, Long>() {
+              @Override
+              public Long call(ILongAggregator client) throws IOException {
+                return client.min(FAMILY_NAME, null, PREFIX.length);
+              }
+            });
+
+    // Aggregates results from all regions
+    long min = Long.MAX_VALUE;
+    for (Long res : results.values()) {
+      min = Math.min(min, res);
+    }
+
+    // Check the final results
+    Assert.assertEquals("min", 1, min);
+  }
+}