You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/05/02 20:18:27 UTC
svn commit: r1591995 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/coprocessor/endpoints/
main/java/org/apache/hadoop/hbase/ipc/
main/java/org/apache/hadoop/hbase/regionserver/
main/java/org/apache/hadoop/hbase/util/ test/java/org...
Author: liyin
Date: Fri May 2 18:18:27 2014
New Revision: 1591995
URL: http://svn.apache.org/r1591995
Log:
[master] Add LongAggregator endpoint.
Author: daviddeng
Summary:
Implements min/max/sum fuctions.
Some refactoring.
Test Plan: `TestLongAggregator`
Reviewers: adela, manukranthk, gauravm
Reviewed By: adela
CC: hbase-eng@
Differential Revision: https://phabricator.fb.com/D1297685
Task ID: 4223215
Added:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/ILongAggregator.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/LongAggregator.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestLongAggregator.java
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointLib.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpoint.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointContext.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ThriftHRegionInterface.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionIf.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpoint.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointLib.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointLib.java?rev=1591995&r1=1591994&r2=1591995&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointLib.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointLib.java Fri May 2 18:18:27 2014
@@ -24,7 +24,7 @@ import java.util.ArrayList;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionIf;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
/**
@@ -44,7 +44,7 @@ public class EndpointLib {
/**
* Aggregates all KeyValue's in a region defined by a Scan.
*/
- public static void aggregateScan(HRegion region, Scan scan, IAggregator aggr)
+ public static void aggregateScan(HRegionIf region, Scan scan, IAggregator aggr)
throws IOException {
try (InternalScanner scanner = region.getScanner(scan)) {
ArrayList<KeyValue> kvs = new ArrayList<>();
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java?rev=1591995&r1=1591994&r2=1591995&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java Fri May 2 18:18:27 2014
@@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.coprocessor.endpoints.EndpointManager.EndpointInfo;
import org.apache.hadoop.hbase.ipc.thrift.exceptions.ThriftHBaseException;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionIf;
import org.apache.hadoop.hbase.regionserver.HRegionServerIf;
/**
@@ -62,7 +62,7 @@ public class EndpointServer implements I
// Set the context.
ep.setContext(new IEndpointContext() {
@Override
- public HRegion getRegion() throws NotServingRegionException {
+ public HRegionIf getRegion() throws NotServingRegionException {
return EndpointServer.this.server.getRegion(regionName);
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpoint.java?rev=1591995&r1=1591994&r2=1591995&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpoint.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpoint.java Fri May 2 18:18:27 2014
@@ -19,6 +19,8 @@
*/
package org.apache.hadoop.hbase.coprocessor.endpoints;
+import java.io.IOException;
+
/**
* The common parent of all endpoint interfaces.
*/
@@ -31,5 +33,5 @@ public interface IEndpoint {
* @param a
* non-null IEndpointContext
*/
- void setContext(IEndpointContext context);
+ void setContext(IEndpointContext context) throws IOException;
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointContext.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointContext.java?rev=1591995&r1=1591994&r2=1591995&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointContext.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointContext.java Fri May 2 18:18:27 2014
@@ -20,7 +20,7 @@
package org.apache.hadoop.hbase.coprocessor.endpoints;
import org.apache.hadoop.hbase.NotServingRegionException;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionIf;
/**
* The context of an endpoint calling.
@@ -34,7 +34,7 @@ public interface IEndpointContext {
* @throws NotServingRegionException
* if the region is not served on this server.
*/
- HRegion getRegion() throws NotServingRegionException;
+ HRegionIf getRegion() throws NotServingRegionException;
/**
* The start row, inclusive, within this region of this call.
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/ILongAggregator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/ILongAggregator.java?rev=1591995&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/ILongAggregator.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/ILongAggregator.java Fri May 2 18:18:27 2014
@@ -0,0 +1,57 @@
+/**
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.endpoints;
+
+import java.io.IOException;
+
+/**
+ * The endpoint that aggregates values.
+ */
+public interface ILongAggregator extends IEndpoint {
+ /**
+ * Sum of the values.
+ *
+ * @param family the column family name. All families if empty.
+ * @param qualifier the column name. All columns if empty.
+ * @param offset the offset of the long in the value in bytes.
+ */
+ public long sum(byte[] family, byte[] qualifier, int offset)
+ throws IOException;
+
+ /**
+ * Min of the values. If no values found, Long.MAX_VALUE is returned.
+ *
+ * @param family the column family name. All families if empty.
+ * @param qualifier the column name. All columns if empty.
+ * @param offset the offset of the long in the value in bytes.
+ */
+ public long min(byte[] family, byte[] qualifier, int offset)
+ throws IOException;
+
+ /**
+ * Max of the values.If no values found, Long.MIN_VALUE is returned.
+ *
+ * @param family the column family name. All families if empty.
+ * @param qualifier the column name. All columns if empty.
+ * @param offset the offset of the long in the value in bytes.
+ */
+ public long max(byte[] family, byte[] qualifier, int offset)
+ throws IOException;
+}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/LongAggregator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/LongAggregator.java?rev=1591995&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/LongAggregator.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/LongAggregator.java Fri May 2 18:18:27 2014
@@ -0,0 +1,146 @@
+/**
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.endpoints;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.HRegionIf;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Implementation of ILongAggregator for aggregation of sum/min/max
+ */
+public class LongAggregator implements ILongAggregator {
+
+ /**
+ * The factory of ILongAggregator
+ */
+ public static class Factory implements IEndpointFactory<ILongAggregator> {
+ @Override
+ public ILongAggregator create() {
+ return new LongAggregator();
+ }
+
+ @Override
+ public Class<ILongAggregator> getEndpointInterface() {
+ return ILongAggregator.class;
+ }
+ }
+
+ private HRegionIf region;
+ @Override
+ public void setContext(IEndpointContext context) throws IOException {
+ this.region = context.getRegion();
+ }
+
+ private interface IUpdater {
+ /**
+ * Updates current with newValue and returns the result.
+ */
+ long update(long current, long newValue);
+ }
+
+ /**
+ * Aggregates with a specified updater.
+ *
+ * @param offset the offset of the long in the value in bytes.
+ * @param initValue the initial value for the aggregating variable.
+ */
+ private long cacluate(byte[] family, byte[] qualifier, int offset,
+ IUpdater updater, long initValue) throws IOException {
+ Scan.Builder builder = new Scan.Builder();
+ if (family != null) {
+ if (qualifier == null) {
+ builder.addColumn(family, qualifier);
+ } else {
+ builder.addFamily(family);
+ }
+ }
+
+ try (InternalScanner scanner = this.region.getScanner(builder.create())) {
+
+ long res = initValue;
+
+ List<KeyValue> kvs = new ArrayList<>();
+ boolean hasMore = true;
+ while (hasMore) {
+ // fetch key-values.
+ kvs.clear();
+ hasMore = scanner.next(kvs);
+
+ for (KeyValue kv : kvs) {
+ // check bound
+ if (offset + Bytes.LONG_BYTES > kv.getValueLength()) {
+ throw new IndexOutOfBoundsException();
+ }
+ // decode the value
+ long vl = Bytes.toLong(kv.getBuffer(), kv.getValueOffset() + offset);
+ // aggregate
+ res = updater.update(res, vl);
+ }
+ }
+ return res;
+ }
+ }
+
+ private static final IUpdater SUM_UPDATER = new IUpdater() {
+ @Override
+ public long update(long current, long newValue) {
+ return current + newValue;
+ }
+ };
+
+ @Override
+ public long sum(byte[] family, byte[] qualifier, int offset)
+ throws IOException {
+ return cacluate(family, qualifier, offset, SUM_UPDATER, 0L);
+ }
+
+ private static final IUpdater MIN_UPDATER = new IUpdater() {
+ @Override
+ public long update(long current, long newValue) {
+ return Math.min(current, newValue);
+ }
+ };
+
+ @Override
+ public long min(byte[] family, byte[] qualifier, int offset)
+ throws IOException {
+ return cacluate(family, qualifier, offset, MIN_UPDATER, Long.MAX_VALUE);
+ }
+
+ private static final IUpdater MAX_UPDATER = new IUpdater() {
+ @Override
+ public long update(long current, long newValue) {
+ return Math.max(current, newValue);
+ }
+ };
+
+ @Override
+ public long max(byte[] family, byte[] qualifier, int offset)
+ throws IOException {
+ return cacluate(family, qualifier, offset, MAX_UPDATER, Long.MIN_VALUE);
+ }
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ThriftHRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ThriftHRegionInterface.java?rev=1591995&r1=1591994&r2=1591995&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ThriftHRegionInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ThriftHRegionInterface.java Fri May 2 18:18:27 2014
@@ -60,17 +60,6 @@ import com.google.common.util.concurrent
public interface ThriftHRegionInterface extends ThriftClientInterface,
IEndpointService, IRegionScanService {
- /**
- * Opens a scanner, optionally returns some data if numberOfRows > 0.
- *
- * @param regionName the name of the region to scan
- * @param scan the Scan instance defining scan query.
- * @param numberOfRows maximum number of rows to return after successfully
- * open the scanner.
- * @return the result as a ScannerResult.
- * The length of the Result list of the return value could be empty
- * and EOR is set to true for sure in this case.
- */
@Override
@ThriftMethod(value = "scanOpen", exception = {
@ThriftException(type = ThriftHBaseException.class, id = 1) })
@@ -79,15 +68,6 @@ public interface ThriftHRegionInterface
@ThriftField(name = "numberOfRows") int numberOfRows)
throws ThriftHBaseException;
- /**
- * Returns next scanning results.
- *
- * @param ID the ID of the scanner
- * @param numberOfRows maximum number of rows to return,
- * @return the result as a ScannerResult.
- * The length of the Result list of the return value could be empty
- * and EOR is set to true for sure in this case.
- */
@Override
@ThriftMethod(value = "scanNext", exception = {
@ThriftException(type = ThriftHBaseException.class, id = 1) })
@@ -95,12 +75,6 @@ public interface ThriftHRegionInterface
@ThriftField(name = "numberOfRows") int numberOfRows)
throws ThriftHBaseException;
- /**
- * Closes the scanner on the server side.
- *
- * @param id the ID of the scanner to close
- * @return true if a scanner is closed. false if the scanner doesn't exist.
- */
@Override
@ThriftMethod(value = "scanClose", exception = {
@ThriftException(type = ThriftHBaseException.class, id = 1) })
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1591995&r1=1591994&r2=1591995&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri May 2 18:18:27 2014
@@ -1789,16 +1789,7 @@ public class HRegion implements HeapSize
}
}
- /**
- * Return an iterator that scans over the HRegion, returning the indicated
- * columns and rows specified by the {@link Scan}.
- * <p>
- * This Iterator must be closed by the caller.
- *
- * @param scan configured {@link Scan}
- * @return InternalScanner
- * @throws IOException read exceptions
- */
+ @Override
public InternalScanner getScanner(Scan scan) throws IOException {
return getScanner(scan, null);
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionIf.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionIf.java?rev=1591995&r1=1591994&r2=1591995&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionIf.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionIf.java Fri May 2 18:18:27 2014
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Pair;
/**
@@ -34,7 +35,7 @@ public interface HRegionIf {
/**
* @return the HRegionInfo of this region
*/
- public HRegionInfo getRegionInfo();
+ HRegionInfo getRegionInfo();
/**
* Flushes the cache.
@@ -57,21 +58,33 @@ public interface HRegionIf {
*
* @return true if cache was flushed
*/
- public boolean flushMemstoreShapshot(boolean selectiveFlushRequest)
+ boolean flushMemstoreShapshot(boolean selectiveFlushRequest)
throws IOException;
/**
* @return how info about the last flushes <time, size>
*/
- public List<Pair<Long, Long>> getRecentFlushInfo();
+ List<Pair<Long, Long>> getRecentFlushInfo();
/**
* @return True if this region has references.
*/
- public boolean hasReferences();
+ boolean hasReferences();
/**
* @return the maximum number of files among all stores.
*/
- public int maxStoreFilesCount();
+ int maxStoreFilesCount();
+
+ /**
+ * Return an iterator that scans over the HRegion, returning the indicated
+ * columns and rows specified by the {@link Scan}.
+ * <p>
+ * This Iterator must be closed by the caller.
+ *
+ * @param scan configured {@link Scan}
+ * @return InternalScanner
+ * @throws IOException read exceptions
+ */
+ InternalScanner getScanner(Scan scan) throws IOException;
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java?rev=1591995&r1=1591994&r2=1591995&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java Fri May 2 18:18:27 2014
@@ -569,6 +569,13 @@ public class Bytes {
}
/**
+ * Converts a Long value to a byte array using big-endian.
+ */
+ public static byte[] toBytes(Long val) {
+ return toBytes(val.longValue());
+ }
+
+ /**
* Converts a byte array to a long value. Reverses
* {@link #toBytes(long)}
* @param bytes array
@@ -807,7 +814,36 @@ public class Bytes {
}
/**
+ * Converts a char value to a byte array of {@link #SIZEOF_CHAR} bytes long.
+ *
+ * @param val value
+ * @return the byte array
+ */
+ public static byte[] toBytes(char val) {
+ return toBytes((short) val);
+ }
+
+ /**
+ * Converts a Character value to a byte array of {@link #SIZEOF_CHAR} bytes
+ * long.
+ *
+ * @param val value
+ * @return the byte array
+ */
+ public static byte[] toBytes(Character val) {
+ return toBytes(val.charValue());
+ }
+
+ /**
+ * Converts a byte array to a char value
+ */
+ public static char toChar(byte[] bytes) {
+ return (char) Bytes.toShort(bytes);
+ }
+
+ /**
* Convert a short value to a byte array of {@link #SIZEOF_SHORT} bytes long.
+ *
* @param val value
* @return the byte array
*/
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpoint.java?rev=1591995&r1=1591994&r2=1591995&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpoint.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpoint.java Fri May 2 18:18:27 2014
@@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.client.Pu
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.endpoints.EndpointLib.IAggregator;
import org.apache.hadoop.hbase.coprocessor.endpoints.IEndpointClient.Caller;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionIf;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.StringBytes;
import org.junit.After;
@@ -101,7 +101,7 @@ public class TestEndpoint {
@Override
public long sum(int offset) throws IOException {
- HRegion region = context.getRegion();
+ HRegionIf region = context.getRegion();
Scan scan = new Scan();
scan.addFamily(FAMILY_NAME);
scan.addColumn(FAMILY_NAME, QUALITY_NAME);
Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestLongAggregator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestLongAggregator.java?rev=1591995&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestLongAggregator.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestLongAggregator.java Fri May 2 18:18:27 2014
@@ -0,0 +1,129 @@
+/**
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.endpoints;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.endpoints.IEndpointClient.Caller;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.StringBytes;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Testcases for endpoints defined in LongAggregators.
+ */
+public class TestLongAggregator {
+ private static final HBaseTestingUtility TEST_UTIL =
+ new HBaseTestingUtility();
+ private static final byte[] FAMILY_NAME = Bytes.toBytes("f");
+ private static final byte[] QUALITY_NAME = Bytes.toBytes("q");
+
+ @Before
+ public void setUp() throws Exception {
+ TEST_UTIL.getConfiguration().setStrings(EndpointLoader.FACTORY_CLASSES_KEY,
+ LongAggregator.Factory.class.getName());
+
+ TEST_UTIL.startMiniCluster();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testCall() throws Exception {
+ final StringBytes TABLE_NAME = new StringBytes("testCall");
+ // Create the table
+ HTableInterface table = TEST_UTIL.createTable(TABLE_NAME, FAMILY_NAME);
+
+ final byte[] PREFIX = new byte[] { 'f', 'b' };
+
+ // Put some values
+ for (int i = 1; i <= 10; i++) {
+ table.put(new Put(Bytes.toBytes("row" + i)).add(FAMILY_NAME,
+ QUALITY_NAME, Bytes.add(PREFIX, Bytes.toBytes((long) i))));
+ }
+
+ // Calling endpoints.
+ IEndpointClient cp = (IEndpointClient) table;
+ Map<HRegionInfo, Long> results =
+ cp.coprocessorEndpoint(ILongAggregator.class, null, null,
+ new Caller<ILongAggregator, Long>() {
+ @Override
+ public Long call(ILongAggregator client) throws IOException {
+ return client.sum(FAMILY_NAME, null, PREFIX.length);
+ }
+ });
+
+ // Aggregates results from all regions
+ long sum = 0;
+ for (Long res : results.values()) {
+ sum += res;
+ }
+
+ // Check the final results
+ Assert.assertEquals("sum", 55, sum);
+
+ results =
+ cp.coprocessorEndpoint(ILongAggregator.class, null, null,
+ new Caller<ILongAggregator, Long>() {
+ @Override
+ public Long call(ILongAggregator client) throws IOException {
+ return client.max(FAMILY_NAME, null, PREFIX.length);
+ }
+ });
+
+ // Aggregates results from all regions
+ long max = Long.MIN_VALUE;
+ for (Long res : results.values()) {
+ max = Math.max(max, res);
+ }
+
+ // Check the final results
+ Assert.assertEquals("max", 10, max);
+
+ results =
+ cp.coprocessorEndpoint(ILongAggregator.class, null, null,
+ new Caller<ILongAggregator, Long>() {
+ @Override
+ public Long call(ILongAggregator client) throws IOException {
+ return client.min(FAMILY_NAME, null, PREFIX.length);
+ }
+ });
+
+ // Aggregates results from all regions
+ long min = Long.MAX_VALUE;
+ for (Long res : results.values()) {
+ min = Math.min(min, res);
+ }
+
+ // Check the final results
+ Assert.assertEquals("min", 1, min);
+ }
+}