You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/09/07 12:30:52 UTC
[3/8] incubator-eagle git commit: [EAGLE-520] Fix and decouple
co-processor from eagle aggreation query service
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/impl/AggregateClientImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/impl/AggregateClientImpl.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/impl/AggregateClientImpl.java
index 5835738..0e92c64 100755
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/impl/AggregateClientImpl.java
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/impl/AggregateClientImpl.java
@@ -18,8 +18,8 @@ package org.apache.eagle.storage.hbase.query.coprocessor.impl;
import org.apache.eagle.log.entity.meta.EntityDefinition;
import org.apache.eagle.query.aggregate.AggregateFunctionType;
-import org.apache.eagle.storage.hbase.query.coprocessor.generated.AggregateProtos;
import org.apache.eagle.storage.hbase.query.coprocessor.*;
+import org.apache.eagle.storage.hbase.query.coprocessor.generated.AggregateProtos;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
@@ -32,115 +32,111 @@ import java.util.ArrayList;
import java.util.List;
/**
- * Not thread safe
- *
- * @since : 11/2/14,2014
+ * Not thread safe.
*/
public class AggregateClientImpl implements AggregateClient {
- private final static Logger LOG = LoggerFactory.getLogger(AggregateClient.class);
- private AggregateResultCallback callback;
+ private static final Logger LOG = LoggerFactory.getLogger(AggregateClient.class);
+ private AggregateResultCallback callback;
- private void checkNotNull(Object obj,String name) {
- if(obj==null) throw new NullPointerException(name+" is null");
- }
+ private void checkNotNull(Object obj, String name) {
+ if (obj == null) {
+ throw new NullPointerException(name + " is null");
+ }
+ }
- @Override
- public AggregateResult aggregate(final HTableInterface table,
- final EntityDefinition entityDefinition,
- final Scan scan,
- final List<String> groupbyFields,
- final List<AggregateFunctionType> aggregateFuncTypes,
- final List<String> aggregatedFields,
- final boolean timeSeries,
- final long startTime,
- final long endTime,
- final long intervalMin) throws IOException {
- checkNotNull(entityDefinition,"entityDefinition");
- final List<AggregateFunctionType> _aggregateFuncTypes = convertToCoprocessorAggregateFunc(aggregateFuncTypes);
- final List<byte[]> _aggregateFuncTypesBytes = AggregateFunctionType.toBytesList(_aggregateFuncTypes);
-// if(timeSeries) TimeSeriesAggregator.validateTimeRange(startTime,endTime,intervalMin);
- callback = new AggregateResultCallbackImpl(aggregateFuncTypes);
- try{
- if(!LOG.isDebugEnabled()){
- LOG.info("Going to exec coprocessor: "+AggregateProtocol.class.getSimpleName());
- }else{
- LOG.debug("Going to exec coprocessor: "+AggregateProtocol.class.getName());
- }
+ @Override
+ public AggregateResult aggregate(final HTableInterface table,
+ final EntityDefinition entityDefinition,
+ final Scan scan,
+ final List<String> groupbyFields,
+ final List<AggregateFunctionType> aggregateFuncTypes,
+ final List<String> aggregatedFields,
+ final boolean timeSeries,
+ final long startTime,
+ final long endTime,
+ final long intervalMin) throws IOException {
+ checkNotNull(entityDefinition, "entityDefinition");
+ final List<AggregateFunctionType> _aggregateFuncTypes = convertToCoprocessorAggregateFunc(aggregateFuncTypes);
+ final List<byte[]> _aggregateFuncTypesBytes = AggregateFunctionType.toBytesList(_aggregateFuncTypes);
+ // if(timeSeries) TimeSeriesAggregator.validateTimeRange(startTime,endTime,intervalMin);
+ callback = new AggregateResultCallbackImpl(aggregateFuncTypes);
+ try {
+ if (!LOG.isDebugEnabled()) {
+ LOG.info("Going to exec coprocessor: " + AggregateProtocol.class.getSimpleName());
+ } else {
+ LOG.debug("Going to exec coprocessor: " + AggregateProtocol.class.getName());
+ }
-// table.coprocessorExec(AggregateProtocol.class,scan.getStartRow(),scan.getStopRow(),new Batch.Call<AggregateProtocol, AggregateResult>(){
-// @Override
-// public AggregateResult call(AggregateProtocol instance) throws IOException {
-// if(timeSeries){
-// return instance.aggregate(entityDefinition, scan, groupbyFields, _aggregateFuncTypesBytes, aggregatedFields,startTime,endTime,intervalMin);
-// }else{
-// return instance.aggregate(entityDefinition, scan, groupbyFields, _aggregateFuncTypesBytes, aggregatedFields);
-// }
-// }
-// },callback);
+ // table.coprocessorExec(AggregateProtocol.class,scan.getStartRow(),scan.getStopRow(),new Batch.Call<AggregateProtocol, AggregateResult>(){
+ // @Override
+ // public AggregateResult call(AggregateProtocol instance) throws IOException {
+ // if(timeSeries){
+ // return instance.aggregate(entityDefinition, scan, groupbyFields, _aggregateFuncTypesBytes, aggregatedFields,startTime,endTime,intervalMin);
+ // }else{
+ // return instance.aggregate(entityDefinition, scan, groupbyFields, _aggregateFuncTypesBytes, aggregatedFields);
+ // }
+ // }
+ // },callback);
- table.coprocessorService(AggregateProtos.AggregateProtocol.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call<AggregateProtos.AggregateProtocol, AggregateProtos.AggregateResult>() {
- @Override
- public AggregateProtos.AggregateResult call(AggregateProtos.AggregateProtocol instance) throws IOException {
- BlockingRpcCallback<AggregateProtos.AggregateResult> rpcCallback = new BlockingRpcCallback<AggregateProtos.AggregateResult>();
- if(timeSeries){
- AggregateProtos.TimeSeriesAggregateRequest timeSeriesAggregateRequest = ProtoBufConverter
- .toPBTimeSeriesRequest(
- entityDefinition,
- scan,
- groupbyFields,
- _aggregateFuncTypesBytes,
- aggregatedFields,
- startTime,
- endTime,
- intervalMin);
- instance.timeseriesAggregate(null, timeSeriesAggregateRequest, rpcCallback);
- return rpcCallback.get();
- }else{
- AggregateProtos.AggregateRequest aggregateRequest = ProtoBufConverter.toPBRequest(
- entityDefinition, scan, groupbyFields, _aggregateFuncTypesBytes, aggregatedFields);
- instance.aggregate(null, aggregateRequest, rpcCallback);
- return rpcCallback.get();
- }
- }
- }, callback);
- } catch (Throwable t){
- LOG.error(t.getMessage(),t);
- throw new IOException(t);
- }
- return callback.result();
- }
-
-// @Override
-// public void result(final GroupbyKeyValueCreationListener[] listeners) {
-// callback.asyncRead(Arrays.asList(listeners));
-// }
+ table.coprocessorService(AggregateProtos.AggregateProtocol.class,
+ scan.getStartRow(), scan.getStopRow(), new Batch.Call<AggregateProtos.AggregateProtocol, AggregateProtos.AggregateResult>() {
+ @Override
+ public AggregateProtos.AggregateResult call(AggregateProtos.AggregateProtocol instance) throws IOException {
+ BlockingRpcCallback<AggregateProtos.AggregateResult> rpcCallback = new BlockingRpcCallback<>();
+ if (timeSeries) {
+ AggregateProtos.TimeSeriesAggregateRequest timeSeriesAggregateRequest = ProtoBufConverter
+ .toPBTimeSeriesRequest(
+ entityDefinition,
+ scan,
+ groupbyFields,
+ _aggregateFuncTypesBytes,
+ aggregatedFields,
+ startTime,
+ endTime,
+ intervalMin);
+ instance.timeseriesAggregate(null, timeSeriesAggregateRequest, rpcCallback);
+ return rpcCallback.get();
+ } else {
+ AggregateProtos.AggregateRequest aggregateRequest = ProtoBufConverter.toPBRequest(
+ entityDefinition, scan, groupbyFields, _aggregateFuncTypesBytes, aggregatedFields);
+ instance.aggregate(null, aggregateRequest, rpcCallback);
+ return rpcCallback.get();
+ }
+ }
+ }, callback);
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
+ throw new IOException(t);
+ }
+ return callback.result();
+ }
- @Override
- public AggregateResult aggregate(HTableInterface table, EntityDefinition entityDefinition, Scan scan, List<String> groupbyFields, List<AggregateFunctionType> aggregateFuncTypes, List<String> aggregatedFields) throws IOException {
- return this.aggregate(table,entityDefinition,scan,groupbyFields,aggregateFuncTypes,aggregatedFields,false,0,0,0);
- }
+ @Override
+ public AggregateResult aggregate(HTableInterface table, EntityDefinition entityDefinition, Scan scan,
+ List<String> groupbyFields, List<AggregateFunctionType> aggregateFuncTypes, List<String> aggregatedFields) throws IOException {
+ return this.aggregate(table, entityDefinition, scan, groupbyFields, aggregateFuncTypes, aggregatedFields, false, 0, 0, 0);
+ }
- /**
- *
- * <h4>
- * Convert client side funcs to server side funcs, especially for <b>avg</b>
- * </h4>
- * <ul>
- * <li><b>avg</b>:
- * Coprocessor[ <b><sum,count></b>] => Callback[(sum<SUB>1</SUB>+sum<SUB>2</SUB>+...+sum<SUB>n</SUB>)/(count<SUB>1</SUB>+count<SUB>2</SUB>+...+count<SUB>n</SUB>)]
- * </li>
- * </ul>
- * @param funcs List<AggregateFunctionType>
- * @return
- */
- private List<AggregateFunctionType> convertToCoprocessorAggregateFunc(List<AggregateFunctionType> funcs){
- List<AggregateFunctionType> copy = new ArrayList<AggregateFunctionType>(funcs);
- for(int i=0;i<funcs.size();i++){
- AggregateFunctionType func = copy.get(i);
- if(AggregateFunctionType.avg.equals(func)){
- copy.set(i,AggregateFunctionType.sum);
- }
- }
- return copy;
- }
+ /**
+ * <h4>
+ * Convert client side funcs to server side funcs, especially for <b>avg</b>
+ * </h4>
+ * <ul>
+ * <li><b>avg</b>:
+ * Coprocessor[ <b><sum,count></b>] => Callback[(sum<SUB>1</SUB>+sum<SUB>2</SUB>+...+sum<SUB>n</SUB>)/(count<SUB>1</SUB>+count<SUB>2</SUB>+...+count<SUB>n</SUB>)]
+ * </li>
+ * </ul>
+ *
+ * @param funcs List<AggregateFunctionType>
+ */
+ private List<AggregateFunctionType> convertToCoprocessorAggregateFunc(List<AggregateFunctionType> funcs) {
+ List<AggregateFunctionType> copy = new ArrayList<AggregateFunctionType>(funcs);
+ for (int i = 0; i < funcs.size(); i++) {
+ AggregateFunctionType func = copy.get(i);
+ if (AggregateFunctionType.avg.equals(func)) {
+ copy.set(i, AggregateFunctionType.sum);
+ }
+ }
+ return copy;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/impl/AggregateResultCallbackImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/impl/AggregateResultCallbackImpl.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/impl/AggregateResultCallbackImpl.java
index 2e0248f..5d7011f 100755
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/impl/AggregateResultCallbackImpl.java
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/impl/AggregateResultCallbackImpl.java
@@ -16,13 +16,13 @@
*/
package org.apache.eagle.storage.hbase.query.coprocessor.impl;
+import org.apache.eagle.common.ByteUtil;
import org.apache.eagle.query.aggregate.AggregateFunctionType;
+import org.apache.eagle.query.aggregate.raw.*;
import org.apache.eagle.storage.hbase.query.coprocessor.AggregateResult;
import org.apache.eagle.storage.hbase.query.coprocessor.AggregateResultCallback;
import org.apache.eagle.storage.hbase.query.coprocessor.ProtoBufConverter;
import org.apache.eagle.storage.hbase.query.coprocessor.generated.AggregateProtos;
-import org.apache.eagle.common.ByteUtil;
-import org.apache.eagle.query.aggregate.raw.*;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,111 +33,113 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-/**
- * @since : 11/3/14,2014
- */
public class AggregateResultCallbackImpl implements AggregateResultCallback {
- private final static Logger LOG = LoggerFactory.getLogger(AggregateResultCallback.class);
- private Map<GroupbyKey,List<Function>> groupedFuncs = new HashMap<GroupbyKey, List<Function>>();
- private List<FunctionFactory> functionFactories = new ArrayList<FunctionFactory>();
- private int numFuncs = 0;
- private long kvCounter = 0;
- private int regionCounter = 0;
- private long startTimestamp;
- private long stopTimestamp;
-
- // Start RPC call time (i.e constructor initialized time)
- private final long _start;
-
- public AggregateResultCallbackImpl(List<AggregateFunctionType> aggregateFunctionTypes){
- this.numFuncs = aggregateFunctionTypes.size();
- for(AggregateFunctionType type: aggregateFunctionTypes){
- functionFactories.add(FunctionFactory.locateFunctionFactory(type));
- }
- this._start = System.currentTimeMillis();
- }
+ private static final Logger LOG = LoggerFactory.getLogger(AggregateResultCallback.class);
+ private Map<GroupbyKey, List<Function>> groupedFuncs = new HashMap<GroupbyKey, List<Function>>();
+ private List<FunctionFactory> functionFactories = new ArrayList<FunctionFactory>();
+ private int numFuncs = 0;
+ private long kvCounter = 0;
+ private int regionCounter = 0;
+ private long startTimestamp;
+ private long stopTimestamp;
-// @Override
- public void update(byte[] region, byte[] row, AggregateResult result) {
- AggregateResult _result = result;
- regionCounter ++;
- kvCounter += _result.getKeyValues().size();
- if(this.startTimestamp == 0 || this.startTimestamp > _result.getStartTimestamp()){
- this.startTimestamp = _result.getStartTimestamp();
- }
- if(this.stopTimestamp == 0 || this.stopTimestamp < _result.getStopTimestamp()){
- this.stopTimestamp = _result.getStopTimestamp();
- }
- for(GroupbyKeyValue keyValue:_result.getKeyValues()){
- update(keyValue);
- }
- }
+ // Start RPC call time (i.e constructor initialized time)
+ private final long _start;
- public void update(GroupbyKeyValue keyValue) {
- // Incr kvCounter if call #update(GroupbyKeyValue) directly
- // instead of #update(byte[] region, byte[] row, AggregateResult result)
- if(this.getKVCounter() == 0) this.kvCounter ++;
- // Accumulate key value for GroubyKey mapped Functions
- GroupbyKey groupedKey = keyValue.getKey();
- List<Function> funcs = groupedFuncs.get(groupedKey);
- if(funcs==null){
- funcs = new ArrayList<Function>();
- for(FunctionFactory functionFactory:this.functionFactories){
- funcs.add(functionFactory.createFunction());
- }
- groupedFuncs.put(groupedKey, funcs);
- }
- for(int i=0;i<this.numFuncs;i++){
- int intCount = 1;
- byte[] count = keyValue.getValue().getMeta(i).getBytes();
- if(count != null){
- intCount = ByteUtil.bytesToInt(count);
- }
- funcs.get(i).run(keyValue.getValue().get(i).get(), intCount);
- }
- }
+ public AggregateResultCallbackImpl(List<AggregateFunctionType> aggregateFunctionTypes) {
+ this.numFuncs = aggregateFunctionTypes.size();
+ for (AggregateFunctionType type : aggregateFunctionTypes) {
+ functionFactories.add(FunctionFactory.locateFunctionFactory(type));
+ }
+ this._start = System.currentTimeMillis();
+ }
- public long getKVCounter(){
- return this.kvCounter;
- }
+ public long getKVCounter() {
+ return this.kvCounter;
+ }
- public long getRegionCounter(){
- return this.regionCounter;
- }
+ public long getRegionCounter() {
+ return this.regionCounter;
+ }
- public AggregateResult result(){
- List<GroupbyKeyValue> mergedKeyValues = new ArrayList<GroupbyKeyValue>();
- for(Map.Entry<GroupbyKey,List<Function>> entry:this.groupedFuncs.entrySet()){
- GroupbyValue value = new GroupbyValue(this.numFuncs);
- for(Function func:entry.getValue()){
- double _result = func.result();
- int _count = func.count();
- value.add(_result);
- value.addMeta(_count);
- }
- mergedKeyValues.add(new GroupbyKeyValue(entry.getKey(),value));
- }
-
- final long _stop = System.currentTimeMillis();
- if(this.getRegionCounter() > 0) {
- LOG.info(String.format("result = %d rows, startTime = %d, endTime = %d, source = %d rows, regions = %d, , spend = %d ms", mergedKeyValues.size(),this.startTimestamp,this.stopTimestamp, this.getKVCounter(), this.getRegionCounter(),(_stop - _start)));
- }else{
- LOG.info(String.format("result = %d rows, startTime = %d, endTime = %d, source = %d rows, spend = %d ms", mergedKeyValues.size(),this.startTimestamp,this.stopTimestamp,this.getKVCounter(), (_stop - _start)));
- }
- AggregateResult result = new AggregateResult();
- result.setKeyValues(mergedKeyValues);
- result.setStartTimestamp(this.startTimestamp);
- result.setStopTimestamp(this.stopTimestamp);
- return result;
- }
+ public AggregateResult result() {
+ List<GroupbyKeyValue> mergedKeyValues = new ArrayList<GroupbyKeyValue>();
+ for (Map.Entry<GroupbyKey, List<Function>> entry : this.groupedFuncs.entrySet()) {
+ GroupbyValue value = new GroupbyValue(this.numFuncs);
+ for (Function func : entry.getValue()) {
+ double _result = func.result();
+ int _count = func.count();
+ value.add(_result);
+ value.addMeta(_count);
+ }
+ mergedKeyValues.add(new GroupbyKeyValue(entry.getKey(), value));
+ }
+
+ final long _stop = System.currentTimeMillis();
+ if (this.getRegionCounter() > 0) {
+ LOG.info(String.format("result = %d rows, startTime = %d, endTime = %d, source = %d rows, regions = %d, , spend = %d ms",
+ mergedKeyValues.size(), this.startTimestamp, this.stopTimestamp, this.getKVCounter(), this.getRegionCounter(), (_stop - _start)));
+ } else {
+ LOG.info(String.format("result = %d rows, startTime = %d, endTime = %d, source = %d rows, spend = %d ms",
+ mergedKeyValues.size(), this.startTimestamp, this.stopTimestamp, this.getKVCounter(), (_stop - _start)));
+ }
+ AggregateResult result = new AggregateResult();
+ result.setKeyValues(mergedKeyValues);
+ result.setStartTimestamp(this.startTimestamp);
+ result.setStopTimestamp(this.stopTimestamp);
+ return result;
+ }
@Override
public void update(byte[] region, byte[] row, AggregateProtos.AggregateResult result) {
try {
- if(result == null) throw new IllegalStateException(new CoprocessorException("result is null"));
- this.update(region,row, ProtoBufConverter.fromPBResult(result));
+ if (result == null) {
+ throw new IllegalStateException(new CoprocessorException("result is null"));
+ }
+ this.update(region, row, ProtoBufConverter.fromPBResult(result));
} catch (IOException e) {
- LOG.error("Failed to convert PB-Based message",e);
+ LOG.error("Failed to convert PB-Based message", e);
+ }
+ }
+
+ public void update(GroupbyKeyValue keyValue) {
+ // Incr kvCounter if call #update(GroupbyKeyValue) directly
+ // instead of #update(byte[] region, byte[] row, AggregateResult result)
+ if (this.getKVCounter() == 0) {
+ this.kvCounter++;
+ }
+ // Accumulate key value for GroubyKey mapped Functions
+ GroupbyKey groupedKey = keyValue.getKey();
+ List<Function> funcs = groupedFuncs.get(groupedKey);
+ if (funcs == null) {
+ funcs = new ArrayList<Function>();
+ for (FunctionFactory functionFactory : this.functionFactories) {
+ funcs.add(functionFactory.createFunction());
+ }
+ groupedFuncs.put(groupedKey, funcs);
+ }
+ for (int i = 0; i < this.numFuncs; i++) {
+ int intCount = 1;
+ byte[] count = keyValue.getValue().getMeta(i).getBytes();
+ if (count != null) {
+ intCount = ByteUtil.bytesToInt(count);
+ }
+ funcs.get(i).run(keyValue.getValue().get(i).get(), intCount);
+ }
+ }
+
+ public void update(byte[] region, byte[] row, AggregateResult result) {
+ AggregateResult _result = result;
+ regionCounter++;
+ kvCounter += _result.getKeyValues().size();
+ if (this.startTimestamp == 0 || this.startTimestamp > _result.getStartTimestamp()) {
+ this.startTimestamp = _result.getStartTimestamp();
+ }
+ if (this.stopTimestamp == 0 || this.stopTimestamp < _result.getStopTimestamp()) {
+ this.stopTimestamp = _result.getStopTimestamp();
+ }
+ for (GroupbyKeyValue keyValue : _result.getKeyValues()) {
+ update(keyValue);
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/package-info.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/package-info.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/package-info.java
index 7d81872..09cf983 100755
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/package-info.java
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/package-info.java
@@ -14,35 +14,35 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
/**
- *
* <h1>Eagle Aggregation Coprocessor</h1>
- *
+ * <p>
* <h2>Deployment and Usage</h2>
* <ol>
- * <li>
- * Firstly deploy jar files to cluster on local file system or HDFS.<br/>
- * </li>
- * <li>
- * Secondly configure in <code>hbase-site.xml</code> as following:
- * <pre><property>
+ * <li>
+ * Firstly deploy jar files to cluster on local file system or HDFS.<br/>
+ * </li>
+ * <li>
+ * Secondly configure in <code>hbase-site.xml</code> as following:
+ * <pre><property>
* <name>hbase.coprocessor.region.classes</name>
* <value>AggregateProtocolEndPoint</value>
* </property>
- * </pre>
- * Or register on related hbase tables
- * <pre> hbase(main):005:0> alter 't1', METHOD => 'table_att', 'coprocessor'=>'hdfs:///foo.jar|AggregateProtocolEndPoint|1001|'</pre>
- * </li>
- * <li>
+ * </pre>
+ * Or register on related hbase tables
+ * <pre> hbase(main):005:0> alter 't1', METHOD => 'table_att', 'coprocessor'=>'hdfs:///foo.jar|AggregateProtocolEndPoint|1001|'</pre>
+ * </li>
+ * <li>
* <code>
* AggregateClient client = new AggregateClientImpl();
* client.aggregate
* AggregateResult result = client.result
- *
+ *
* </code>
* </li>
* </ol>
- *
+ *
* <h2>Performance</h2>
*
* <b>NOTE:</b>
@@ -56,56 +56,53 @@
* <b>A simple benchmark report for reference</b>
* <br/>
* <table border="1">
- * <thead>
- * <tr>
- * <th>Region Servers</th> <th>Record Count</th>
- * <th>Coprocessor</th><th>No-Coprocessor</th><th>Aggregation</th>
- * </tr>
- * </thead>
- * <tbody>
- * <tr>
- * <td rowspan="10">1</td><td rowspan="10">1000,000</td>
- * <td>10193 ms</td><td>21988 ms</t...@datacenter>{count}</td>
- * </tr>
- * <tr>
- * <td>10010 ms</td><td>22547 ms</t...@datacenter>{sum(numTotalMaps)}</td>
- * </tr>
- * <tr>
- * <td>10334 ms</td><td>23433 ms</t...@datacenter>{avg(numTotalMaps)}</td>
- * </tr>
- * <tr>
- * <td>10045 ms</td><td>22690 ms</t...@datacenter>{max(numTotalMaps)}</td>
- * </tr>
- * <tr>
- * <td>10190 ms</td><td>21902 ms</t...@datacenter>{min(numTotalMaps)}</td>
- * </tr>
- * </tbody>
+ * <thead>
+ * <tr>
+ * <th>Region Servers</th> <th>Record Count</th>
+ * <th>Coprocessor</th><th>No-Coprocessor</th><th>Aggregation</th>
+ * </tr>
+ * </thead>
+ * <tbody>
+ * <tr>
+ * <td rowspan="10">1</td><td rowspan="10">1000,000</td>
+ * <td>10193 ms</td><td>21988 ms</t...@datacenter>{count}</td>
+ * </tr>
+ * <tr>
+ * <td>10010 ms</td><td>22547 ms</t...@datacenter>{sum(numTotalMaps)}</td>
+ * </tr>
+ * <tr>
+ * <td>10334 ms</td><td>23433 ms</t...@datacenter>{avg(numTotalMaps)}</td>
+ * </tr>
+ * <tr>
+ * <td>10045 ms</td><td>22690 ms</t...@datacenter>{max(numTotalMaps)}</td>
+ * </tr>
+ * <tr>
+ * <td>10190 ms</td><td>21902 ms</t...@datacenter>{min(numTotalMaps)}</td>
+ * </tr>
+ * </tbody>
* </table>
* <h2>Reference</h2>
* <a href="https://blogs.apache.org/hbase/entry/coprocessor_introduction">
- * Coprocessor Introduction
+ * Coprocessor Introduction
* </a>
* (Trend Micro Hadoop Group: Mingjie Lai, Eugene Koontz, Andrew Purtell)
- *
+ *
* <h2>TO-DO</h2>
* <ol>
* <li>
- * TODO: Pass writable self-described entity definition into HBase coprocessor instead of serviceName in String
+ * TODO: Pass writable self-described entity definition into HBase coprocessor instead of serviceName in String
*
- * Because using serviceName to get entity definition will reply on entity API code under eagle-app, so that
- * when modifying or creating new entities, we have to update coprocessor jar in HBase side
- * (hchen9@xyz.com)
+ * Because using serviceName to get entity definition will reply on entity API code under eagle-app, so that
+ * when modifying or creating new entities, we have to update coprocessor jar in HBase side
+ * (hchen9@xyz.com)
* </li>
* <li>
- * TODO: Using String.format instead substrings addition for long log to avoid recreating string objects
+ * TODO: Using String.format instead substrings addition for long log to avoid recreating string objects
* </li>
* </ol>
*
* </table>
- * @see eagle.query.aggregate.coprocessor.AggregateClient
- * @see eagle.query.aggregate.coprocessor.AggregateResult
- * @see eagle.query.aggregate.coprocessor.AggregateProtocol
- *
- * @since : 11/10/14,2014
+ *
+ * @since : 11/10/14,2014
*/
package org.apache.eagle.storage.hbase.query.coprocessor;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/spi/HBaseStorageServiceProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/spi/HBaseStorageServiceProvider.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/spi/HBaseStorageServiceProvider.java
index 1ee1c52..3f3f831 100644
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/spi/HBaseStorageServiceProvider.java
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/spi/HBaseStorageServiceProvider.java
@@ -20,11 +20,8 @@ import org.apache.eagle.storage.DataStorage;
import org.apache.eagle.storage.hbase.HBaseStorage;
import org.apache.eagle.storage.spi.DataStorageServiceProvider;
-/**
- * @since 3/20/15
- */
public final class HBaseStorageServiceProvider implements DataStorageServiceProvider {
- private final static String HBASE = "hbase";
+ private static final String HBASE = "hbase";
@Override
public String getType() {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/tools/CoprocessorTool.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/tools/CoprocessorTool.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/tools/CoprocessorTool.java
new file mode 100644
index 0000000..c5aecab
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/tools/CoprocessorTool.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.storage.hbase.tools;
+
+import org.apache.eagle.storage.hbase.query.coprocessor.AggregateProtocolEndPoint;
+
+import org.apache.commons.cli.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+/**
+ * Coprocessor CLI Tool.
+ */
+public class CoprocessorTool extends Configured implements Tool {
+ private static final Logger LOGGER = LoggerFactory.getLogger(CoprocessorTool.class);
+
+ public static void main(String[] args) throws Exception {
+ System.exit(ToolRunner.run(new CoprocessorTool(), args));
+ }
+
+ private void unregisterCoprocessor(String tableName) throws IOException {
+ Configuration configuration = getConf();
+ TableName table = TableName.valueOf(tableName);
+ try (HBaseAdmin admin = new HBaseAdmin(configuration)) {
+ HTableDescriptor tableDescriptor = admin.getTableDescriptor(table);
+ LOGGER.info("Table {} found", tableName);
+ if (tableDescriptor.hasCoprocessor(AggregateProtocolEndPoint.class.getName())) {
+ LOGGER.warn("No coprocessor was registered on table '{}'", tableName);
+ throw new IOException("No coprocessor was registered on table " + tableName);
+ } else {
+ tableDescriptor.removeCoprocessor(AggregateProtocolEndPoint.class.getName());
+ admin.modifyTable(table, tableDescriptor);
+ LOGGER.info("Succeed to remove coprocessor from table " + tableName);
+ }
+ }
+ }
+
+ private void registerCoprocessor(String jarPath, String tableName, String localJarPath) throws IOException {
+ Configuration configuration = getConf();
+ try (FileSystem fs = FileSystem.get(configuration); HBaseAdmin admin = new HBaseAdmin(configuration)) {
+ Path path = new Path(fs.getUri() + Path.SEPARATOR + jarPath);
+ LOGGER.info("Checking path {} ... ", path.toString());
+ if (!fs.exists(path)) {
+ LOGGER.info("Path: {} not exist, uploading jar ...", path.toString());
+ if (localJarPath == null) {
+ throw new IOException("local jar path is not given, please manually upload coprocessor jar onto hdfs at " + jarPath
+ + " and retry, or provide local coprocessor jar path through CLI argument and upload automatically");
+ }
+ LOGGER.info("Copying from local {} to {}", localJarPath, jarPath);
+ fs.copyFromLocalFile(new Path(localJarPath), path);
+ LOGGER.info("Succeed to copied coprocessor jar to {}", path.toString());
+ } else {
+ LOGGER.info("Path {} already exists", path.toString());
+ }
+ LOGGER.info("Checking hbase table {}", tableName);
+ TableName table = TableName.valueOf(tableName);
+ HTableDescriptor tableDescriptor = admin.getTableDescriptor(table);
+ LOGGER.info("Table {} found", tableName);
+ if (tableDescriptor.hasCoprocessor(AggregateProtocolEndPoint.class.getName())) {
+ LOGGER.warn("Table '" + tableName + "' already registered coprocessor: " + AggregateProtocolEndPoint.class.getName() + ", removing firstly");
+ tableDescriptor.removeCoprocessor(AggregateProtocolEndPoint.class.getName());
+ admin.modifyTable(table, tableDescriptor);
+ tableDescriptor = admin.getTableDescriptor(table);
+ }
+ tableDescriptor.addCoprocessor(AggregateProtocolEndPoint.class.getName(),
+ path, Coprocessor.PRIORITY_USER, new HashMap<>());
+ admin.modifyTable(table, tableDescriptor);
+ LOGGER.info("Succeed to enable coprocessor on table " + tableName);
+ }
+ }
+
+ private void printHelpMessage(Options cmdOptions) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("java " + CoprocessorTool.class.getName() + " [--register/--unregister] [OPTIONS]", cmdOptions);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options cmdOptions = new Options();
+ cmdOptions.addOption(new Option("register", false, "Register coprocessor"));
+ cmdOptions.addOption(new Option("unregister", false, "Unregister coprocessor"));
+
+ cmdOptions.addOption("table", true, "HBase table name, separated with comma, for example, table1,table2,..");
+ cmdOptions.addOption("jar", true, "Coprocessor target jar path");
+ cmdOptions.addOption("localJar", true, "Coprocessor local source jar path");
+ cmdOptions.addOption("config", true, "Configuration file");
+
+ cmdOptions.getOption("table").setType(String.class);
+ cmdOptions.getOption("table").setRequired(true);
+ cmdOptions.getOption("jar").setType(String.class);
+ cmdOptions.getOption("jar").setRequired(false);
+ cmdOptions.getOption("localJar").setType(String.class);
+ cmdOptions.getOption("localJar").setRequired(false);
+ cmdOptions.getOption("config").setType(String.class);
+ cmdOptions.getOption("config").setRequired(false);
+
+ GnuParser parser = new GnuParser();
+ CommandLine cmdCli = parser.parse(cmdOptions, args);
+ String tableName = cmdCli.getOptionValue("table");
+ String configFile = cmdCli.getOptionValue("config");
+
+ if (configFile != null) {
+ Configuration.addDefaultResource(configFile);
+ }
+
+ if (cmdCli.hasOption("register")) {
+ if (args.length < 3) {
+ System.err.println("Error: coprocessor jar path is missing");
+ System.err.println("Usage: java " + CoprocessorTool.class.getName() + " enable " + tableName + " [jarOnHdfs] [jarOnLocal]");
+ return 1;
+ }
+ String jarPath = cmdCli.getOptionValue("jar");
+ LOGGER.info("Table name: {}", tableName);
+ LOGGER.info("Coprocessor jar on hdfs: {}", jarPath);
+ String localJarPath = cmdCli.getOptionValue("localJar");
+ LOGGER.info("Coprocessor jar on local: {}", localJarPath);
+
+ String[] tableNames = tableName.split(",\\s*");
+ for (String table : tableNames) {
+ LOGGER.info("Registering coprocessor for table {}", table);
+ registerCoprocessor(jarPath, table, localJarPath);
+ }
+ } else if (cmdCli.hasOption("unregister")) {
+ unregisterCoprocessor(tableName);
+ } else {
+ System.err.println("command is required, --register/--unregister");
+ printHelpMessage(cmdOptions);
+ }
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/protobuf/AggregateProtocol.proto
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/protobuf/AggregateProtocol.proto b/eagle-core/eagle-query/eagle-storage-hbase/src/main/protobuf/AggregateProtocol.proto
index c3385a1..da5846c 100644
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/protobuf/AggregateProtocol.proto
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/main/protobuf/AggregateProtocol.proto
@@ -27,7 +27,7 @@ option optimize_for = SPEED;
*/
import "Client.proto";
-//
+
//message ScanWrapper{
// required bytes byte_array = 1;
//}
@@ -43,7 +43,7 @@ message AggregateResult {
message AggregateRequest {
required EntityDefinition entity_definition = 1;
required Scan scan = 2;
- repeated string groupby_fields= 3;
+ repeated string groupby_fields = 3;
repeated bytes aggregate_func_types = 4;
repeated string aggregated_fields = 5;
}
@@ -51,7 +51,7 @@ message AggregateRequest {
message TimeSeriesAggregateRequest {
required EntityDefinition entity_definition = 1;
required Scan scan = 2;
- repeated string groupby_fields= 3;
+ repeated string groupby_fields = 3;
repeated bytes aggregate_func_types = 4;
repeated string aggregated_fields = 5;
required int64 start_time = 6;
@@ -60,6 +60,6 @@ message TimeSeriesAggregateRequest {
}
service AggregateProtocol {
- rpc aggregate(AggregateRequest) returns (AggregateResult);
- rpc timeseriesAggregate(TimeSeriesAggregateRequest) returns (AggregateResult);
+ rpc aggregate (AggregateRequest) returns (AggregateResult);
+ rpc timeseriesAggregate (TimeSeriesAggregateRequest) returns (AggregateResult);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/TestHBaseStatement.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/TestHBaseStatement.java b/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/TestHBaseStatement.java
index 079cf12..6dcbc78 100644
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/TestHBaseStatement.java
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/TestHBaseStatement.java
@@ -67,18 +67,18 @@ public class TestHBaseStatement extends TestHBaseBase {
{
put("cluster", "test");
put("datacenter", "test");
- put("name","unit.test.name");
+ put("name", "unit.test.name");
}
});
entities.add(entity);
- CreateStatement createStatement = new CreateStatement(entities,"TestTimeSeriesAPIEntity");
+ CreateStatement createStatement = new CreateStatement(entities, "TestTimeSeriesAPIEntity");
ModifyResult resultSet = createStatement.execute(DataStorageManager.newDataStorage("hbase"));
Assert.assertEquals(1, resultSet.getIdentifiers().size());
- createStatement = new CreateStatement(entities,"TestTimeSeriesAPIEntity");
+ createStatement = new CreateStatement(entities, "TestTimeSeriesAPIEntity");
resultSet = createStatement.execute(DataStorageManager.newDataStorage("hbase"));
Assert.assertEquals(1, resultSet.getIdentifiers().size());
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/TestHBaseStorage.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/TestHBaseStorage.java b/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/TestHBaseStorage.java
deleted file mode 100644
index 76cc507..0000000
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/TestHBaseStorage.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.storage.hbase;
-
-import org.junit.Test;
-
-/**
- * @since 3/23/15
- */
-public class TestHBaseStorage {
- @Test
- public void testCreate(){
-
- }
-
- @Test
- public void testQuery(){
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/aggregate/coprocessor/TestAggregateResultCallback.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/aggregate/coprocessor/TestAggregateResultCallback.java b/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/aggregate/coprocessor/TestAggregateResultCallback.java
index ba1b781..df5dcd9 100755
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/aggregate/coprocessor/TestAggregateResultCallback.java
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/aggregate/coprocessor/TestAggregateResultCallback.java
@@ -16,26 +16,24 @@
*/
package org.apache.eagle.storage.hbase.aggregate.coprocessor;
+import org.apache.eagle.common.ByteUtil;
import org.apache.eagle.query.aggregate.AggregateFunctionType;
-import org.apache.eagle.storage.hbase.query.coprocessor.AggregateResult;
-import org.apache.eagle.storage.hbase.query.coprocessor.AggregateResultCallback;
-import org.apache.eagle.storage.hbase.query.coprocessor.impl.AggregateResultCallbackImpl;
import org.apache.eagle.query.aggregate.raw.GroupbyKey;
import org.apache.eagle.query.aggregate.raw.GroupbyKeyValue;
import org.apache.eagle.query.aggregate.raw.GroupbyValue;
-import org.apache.eagle.common.ByteUtil;
+import org.apache.eagle.storage.hbase.query.coprocessor.AggregateResult;
+import org.apache.eagle.storage.hbase.query.coprocessor.AggregateResultCallback;
+import org.apache.eagle.storage.hbase.query.coprocessor.impl.AggregateResultCallbackImpl;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-@Ignore
public class TestAggregateResultCallback {
@Test
- public void testUpdate(){
+ public void testUpdate() {
// -----------------------------------------------------------------------------
// key | max min count avg sum | count
// -----------------------------------------------------------------------------
@@ -49,89 +47,88 @@ public class TestAggregateResultCallback {
// -----------------------------------------------------------------------------
AggregateResultCallback callback = new AggregateResultCallbackImpl(Arrays.asList(
- AggregateFunctionType.max,
- AggregateFunctionType.min,
- AggregateFunctionType.count,
- AggregateFunctionType.avg,
- AggregateFunctionType.sum));
+ AggregateFunctionType.max,
+ AggregateFunctionType.min,
+ AggregateFunctionType.count,
+ AggregateFunctionType.avg,
+ AggregateFunctionType.sum));
AggregateResult result1 = AggregateResult.build(
Arrays.asList(
- new String[]{"a","b"},
- new String[]{"a","b"},
- new String[]{"a","b","c"},
- new String[]{"a","b","c"}
+ new String[]{"a", "b"},
+ new String[]{"a", "b"},
+ new String[]{"a", "b", "c"},
+ new String[]{"a", "b", "c"}
),
Arrays.asList(
- new double[]{1.0,2.0,3.0,4.0,5.0},
- new double[]{2.0,3.0,6.0,5.0,6.0},
- new double[]{3.0,3.0,5.0,5.0,6.0},
- new double[]{4.0,5.0,5.0,5.0,7.0}
+ new double[]{1.0, 2.0, 3.0, 4.0, 5.0},
+ new double[]{2.0, 3.0, 6.0, 5.0, 6.0},
+ new double[]{3.0, 3.0, 5.0, 5.0, 6.0},
+ new double[]{4.0, 5.0, 5.0, 5.0, 7.0}
),
- Arrays.asList(3,6,5,5),
+ Arrays.asList(3, 6, 5, 5),
System.currentTimeMillis(),
System.currentTimeMillis()
);
- callback.update(null,null,result1);
+ callback.update(null, null, result1);
AggregateResult callbackResult = callback.result();
- Assert.assertEquals(2,callbackResult.getKeyValues().size());
+ Assert.assertEquals(2, callbackResult.getKeyValues().size());
// == ROW-#0 ==
// Should be:
// key | max min count avg sum | count
// -----------------------------------------------------------------------------
- // a,b,c | 4 3 10 1 13 | 10
+ // a,b | 2 2 9 1 11 | 9
GroupbyKeyValue row0 = callbackResult.getKeyValues().get(0);
-// Assert.assertEquals("a",new String(row0.getKey().getValue().get(0).copyBytes()));
-// Assert.assertEquals("b",new String(row0.getKey().getValue().get(1).copyBytes()));
- Assert.assertEquals(new GroupbyKey(Arrays.asList("a".getBytes(),"b".getBytes(),"c".getBytes())),row0.getKey());
- Assert.assertEquals(4.0,row0.getValue().get(0).get(), 0.00001);
- Assert.assertEquals(10, ByteUtil.bytesToInt(row0.getValue().getMeta(0).getBytes()));
- Assert.assertEquals(3.0, row0.getValue().get(1).get(), 0.00001);
- Assert.assertEquals(10, ByteUtil.bytesToInt(row0.getValue().getMeta(1).getBytes()));
- Assert.assertEquals(10.0,row0.getValue().get(2).get(), 0.00001);
- Assert.assertEquals(10, ByteUtil.bytesToInt(row0.getValue().getMeta(2).getBytes()));
- Assert.assertEquals(1.0,row0.getValue().get(3).get(), 0.00001);
- Assert.assertEquals(10, ByteUtil.bytesToInt(row0.getValue().getMeta(3).getBytes()));
- Assert.assertEquals(13.0,row0.getValue().get(4).get(), 0.00001);
- Assert.assertEquals(10, ByteUtil.bytesToInt(row0.getValue().getMeta(4).getBytes()));
+ Assert.assertEquals(new GroupbyKey(Arrays.asList("a".getBytes(), "b".getBytes())), row0.getKey());
+ Assert.assertEquals(2.0, row0.getValue().get(0).get(), 0.00001);
+ Assert.assertEquals(9, ByteUtil.bytesToInt(row0.getValue().getMeta(4).getBytes()));
+ Assert.assertEquals(2.0, row0.getValue().get(1).get(), 0.00001);
+ Assert.assertEquals(9, ByteUtil.bytesToInt(row0.getValue().getMeta(4).getBytes()));
+ Assert.assertEquals(9.0, row0.getValue().get(2).get(), 0.00001);
+ Assert.assertEquals(9, ByteUtil.bytesToInt(row0.getValue().getMeta(4).getBytes()));
+ Assert.assertEquals(1.0, row0.getValue().get(3).get(), 0.00001);
+ Assert.assertEquals(9, ByteUtil.bytesToInt(row0.getValue().getMeta(4).getBytes()));
+ Assert.assertEquals(11.0, row0.getValue().get(4).get(), 0.00001);
+ Assert.assertEquals(9, ByteUtil.bytesToInt(row0.getValue().getMeta(4).getBytes()));
// == ROW-#1 ==
// Should be:
// key | max min count avg sum | count
// -----------------------------------------------------------------------------
- // a,b | 2 2 9 1 11 | 9
+ // a,b,c | 4 3 10 1 13 | 10
GroupbyKeyValue row1 = callbackResult.getKeyValues().get(1);
- Assert.assertEquals(new GroupbyKey(Arrays.asList("a".getBytes(),"b".getBytes())),row1.getKey());
- Assert.assertEquals(2.0,row1.getValue().get(0).get(), 0.00001);
- Assert.assertEquals(9, ByteUtil.bytesToInt(row1.getValue().getMeta(4).getBytes()));
- Assert.assertEquals(2.0, row1.getValue().get(1).get(), 0.00001);
- Assert.assertEquals(9, ByteUtil.bytesToInt(row1.getValue().getMeta(4).getBytes()));
- Assert.assertEquals(9.0,row1.getValue().get(2).get(), 0.00001);
- Assert.assertEquals(9, ByteUtil.bytesToInt(row1.getValue().getMeta(4).getBytes()));
- Assert.assertEquals(1.0,row1.getValue().get(3).get(), 0.00001);
- Assert.assertEquals(9, ByteUtil.bytesToInt(row1.getValue().getMeta(4).getBytes()));
- Assert.assertEquals(11.0,row1.getValue().get(4).get(), 0.00001);
- Assert.assertEquals(9, ByteUtil.bytesToInt(row1.getValue().getMeta(4).getBytes()));
+ Assert.assertEquals(new GroupbyKey(Arrays.asList("a".getBytes(), "b".getBytes(), "c".getBytes())), row1.getKey());
+ Assert.assertEquals(4.0, row1.getValue().get(0).get(), 0.00001);
+ Assert.assertEquals(10, ByteUtil.bytesToInt(row1.getValue().getMeta(0).getBytes()));
+ Assert.assertEquals(3.0, row1.getValue().get(1).get(), 0.00001);
+ Assert.assertEquals(10, ByteUtil.bytesToInt(row1.getValue().getMeta(1).getBytes()));
+ Assert.assertEquals(10.0, row1.getValue().get(2).get(), 0.00001);
+ Assert.assertEquals(10, ByteUtil.bytesToInt(row1.getValue().getMeta(2).getBytes()));
+ Assert.assertEquals(1.0, row1.getValue().get(3).get(), 0.00001);
+ Assert.assertEquals(10, ByteUtil.bytesToInt(row1.getValue().getMeta(3).getBytes()));
+ Assert.assertEquals(13.0, row1.getValue().get(4).get(), 0.00001);
+ Assert.assertEquals(10, ByteUtil.bytesToInt(row1.getValue().getMeta(4).getBytes()));
+
}
@Test
- public void testAggregateResultTimestamp(){
+ public void testAggregateResultTimestamp() {
AggregateResult result1 = new AggregateResult();
- result1.setStartTimestamp(2l);
- result1.setStopTimestamp(4l);
+ result1.setStartTimestamp(2L);
+ result1.setStopTimestamp(4L);
AggregateResult result2 = new AggregateResult();
- result2.setStartTimestamp(1l);
- result2.setStopTimestamp(3l);
- AggregateResultCallback callback = new AggregateResultCallbackImpl(new ArrayList<AggregateFunctionType>());
- callback.update(null,null,result1);
- callback.update(null,null,result2);
+ result2.setStartTimestamp(1L);
+ result2.setStopTimestamp(3L);
+ AggregateResultCallback callback = new AggregateResultCallbackImpl(new ArrayList<AggregateFunctionType>());
+ callback.update(null, null, result1);
+ callback.update(null, null, result2);
AggregateResult result3 = callback.result();
- Assert.assertEquals(1l,result3.getStartTimestamp());
- Assert.assertEquals(4l,result3.getStopTimestamp());
+ Assert.assertEquals(1L, result3.getStartTimestamp());
+ Assert.assertEquals(4L, result3.getStopTimestamp());
}
@Test
- public void testUpdatePerformance(){
+ public void testUpdatePerformance() {
AggregateResultCallback callback = new AggregateResultCallbackImpl(
Arrays.asList(
AggregateFunctionType.max,
@@ -139,10 +136,10 @@ public class TestAggregateResultCallback {
AggregateFunctionType.count,
AggregateFunctionType.avg));
- for(int i=0;i<1000000;i++) {
+ for (int i = 0; i < 1000000; i++) {
AggregateResult result1 = new AggregateResult();
result1.setStartTimestamp(System.currentTimeMillis());
- List<GroupbyKeyValue> keyValues = new ArrayList<GroupbyKeyValue>();
+ final List<GroupbyKeyValue> keyValues = new ArrayList<GroupbyKeyValue>();
// <a,b> - <1*3, 2*3, 3*3, 4*3>
GroupbyKey key = new GroupbyKey();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/aggregate/coprocessor/TestGroupAggregateClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/aggregate/coprocessor/TestGroupAggregateClient.java b/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/aggregate/coprocessor/TestGroupAggregateClient.java
index 2a9ed0d..86c08fd 100755
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/aggregate/coprocessor/TestGroupAggregateClient.java
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/test/java/org/apache/eagle/storage/hbase/aggregate/coprocessor/TestGroupAggregateClient.java
@@ -31,10 +31,7 @@ import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DoubleWritable;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
+import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,201 +52,201 @@ import org.apache.eagle.service.hbase.TestHBaseBase;
*/
@Ignore
public class TestGroupAggregateClient extends TestHBaseBase {
- HTableInterface table;
- long startTime;
- long endTime;
- List<String> rowkeys;
- AggregateClient client;
- Scan scan;
- int num = 200;
-
- private final static Logger LOG = LoggerFactory.getLogger(TestGroupAggregateClient.class);
-
- @Before
- public void setUp(){
- hbase.createTable("unittest", "f");
- startTime = System.currentTimeMillis();
- try {
- rowkeys = prepareData(num);
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- endTime = System.currentTimeMillis();
- table = EagleConfigFactory.load().getHTable("unittest");
- client = new AggregateClientImpl();
- scan = new Scan();
- scan.setCaching(200);
-
- ListQueryCompiler compiler = null;
- try {
- compiler = new ListQueryCompiler("TestLogAPIEntity[@cluster=\"test4UT\" and @datacenter=\"dc1\"]{@field1,@field2}");
- } catch (Exception e) {
- Assert.fail(e.getMessage());
- }
- scan.setFilter(compiler.filter());
- }
-
- @After
- public void shutdown(){
- try {
- hbase.deleteTable("unittest");
- new HTableFactory().releaseHTableInterface(table);
- } catch (IOException e) {
- LOG.error(e.getMessage(),e);
- }
- }
-
- private List<String> prepareData(int count) throws Exception {
- List<TaggedLogAPIEntity> list = new ArrayList<TaggedLogAPIEntity>();
- EntityDefinition ed = EntityDefinitionManager.getEntityDefinitionByEntityClass(TestLogAPIEntity.class);
-
- if (ed == null) {
- EntityDefinitionManager.registerEntity(TestLogAPIEntity.class);
- ed = EntityDefinitionManager.getEntityDefinitionByEntityClass(TestLogAPIEntity.class);
- }
- ed.setTimeSeries(true);
- for(int i=0;i<count;i++){
- TestLogAPIEntity e = new TestLogAPIEntity();
- e.setTimestamp(System.currentTimeMillis());
- e.setField1(1);
- e.setField2(2);
- e.setField3(3);
- e.setField4(4L);
- e.setField5(5.0);
- e.setField6(5.0);
- e.setField7("7");
- e.setTags(new HashMap<String, String>());
- e.getTags().put("cluster", "test4UT");
- e.getTags().put("datacenter", "dc1");
- e.getTags().put("index", ""+i);
- e.getTags().put("jobId", "job_"+System.currentTimeMillis());
- list.add(e);
-
- }
- GenericEntityWriter writer = new GenericEntityWriter(ed.getService());
- LOG.info("Writing "+list.size()+" TestLogAPIEntity entities");
- List<String> result = writer.write(list);
- LOG.info("Finish writing test entities");
- return result;
- }
-
- //@Test
- public void testGroupAggregateCountClient(){
- try {
- EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName("TestLogAPIEntity");
- List<GroupbyKeyValue> result = client.aggregate(table,ed,scan, Arrays.asList("cluster","datacenter"),Arrays.asList(AggregateFunctionType.count),Arrays.asList("field2")).getKeyValues();
- if(LOG.isDebugEnabled()) LOG.debug("COUNT");
- logGroupbyKeyValue(result);
- Assert.assertNotNull(result);
- Assert.assertTrue(result.size()>0);
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- }
-
- //@Test
- public void testGroupAggregateAvgClient(){
- try {
- EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName("TestLogAPIEntity");
- List<GroupbyKeyValue> result = client.aggregate(table,ed,scan, Arrays.asList("cluster","datacenter"),Arrays.asList(AggregateFunctionType.avg),Arrays.asList("field2")).getKeyValues();
- if(LOG.isDebugEnabled()) LOG.debug("AVG");
- logGroupbyKeyValue(result);
- Assert.assertNotNull(result);
- Assert.assertTrue(result.size()>0);
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- }
-
- //@Test
- public void testGroupAggregateMaxClient(){
- try {
- EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName("TestLogAPIEntity");
- List<GroupbyKeyValue> result = client.aggregate(table,ed,scan, Arrays.asList("cluster","datacenter"),Arrays.asList(AggregateFunctionType.max),Arrays.asList("field1")).getKeyValues();
- if(LOG.isDebugEnabled()) LOG.debug("MAX");
- logGroupbyKeyValue(result);
- Assert.assertNotNull(result);
- Assert.assertTrue(result.size()>0);
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- }
-
- //@Test
- public void testGroupAggregateSumClient(){
- try {
- EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName("TestLogAPIEntity");
- List<GroupbyKeyValue> result = client.aggregate(table,ed,scan, Arrays.asList("cluster","datacenter"),Arrays.asList(AggregateFunctionType.sum),Arrays.asList("field2")).getKeyValues();
- if(LOG.isDebugEnabled()) LOG.debug("MAX");
- logGroupbyKeyValue(result);
- Assert.assertNotNull(result);
- Assert.assertTrue(result.size()>0);
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- }
-
- //@Test
- public void testGroupAggregateMinClient(){
-
- try {
- EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName("TestLogAPIEntity");
- List<GroupbyKeyValue> result = client.aggregate(table,ed,scan, Arrays.asList("cluster","datacenter"),Arrays.asList(AggregateFunctionType.min),Arrays.asList("field2")).getKeyValues();
- if(LOG.isDebugEnabled()) LOG.debug("MIN");
- logGroupbyKeyValue(result);
- Assert.assertNotNull(result);
- Assert.assertTrue(result.size()>0);
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- }
-
- //@Test
- public void testGroupAggregateMultipleClient(){
- try {
- EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName("TestLogAPIEntity");
- List<GroupbyKeyValue> result = client.aggregate(table,ed,scan, Arrays.asList("cluster","datacenter"),
- Arrays.asList(AggregateFunctionType.min,
- AggregateFunctionType.max,
- AggregateFunctionType.avg,
- AggregateFunctionType.count,
- AggregateFunctionType.sum),
- Arrays.asList("field2","field2","field2","field2","field2")).getKeyValues();
- logGroupbyKeyValue(result);
- Assert.assertNotNull(result);
- Assert.assertTrue(result.size() > 0);
- Assert.assertEquals("test4UT", new String(result.get(0).getKey().getValue().get(0).copyBytes()));
- Assert.assertEquals("dc1", new String(result.get(0).getKey().getValue().get(1).copyBytes()));
- Assert.assertEquals(2.0, result.get(0).getValue().get(0).get(), 0.00001);
- Assert.assertEquals(2.0, result.get(0).getValue().get(1).get(), 0.00001);
- Assert.assertEquals(2.0, result.get(0).getValue().get(2).get(), 0.00001);
- Assert.assertTrue(num <= result.get(0).getValue().get(3).get());
- Assert.assertTrue(2.0 * num <= result.get(0).getValue().get(4).get());
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- }
-
- private void logGroupbyKeyValue(List<GroupbyKeyValue> keyValues){
- for(GroupbyKeyValue keyValue:keyValues){
- GroupbyKey key = keyValue.getKey();
- List<String> keys = new ArrayList<String>();
- for(BytesWritable bytes:key.getValue()){
- keys.add(new String(bytes.copyBytes()));
- }
- List<Double> vals = new ArrayList<Double>();
- GroupbyValue val = keyValue.getValue();
- for(DoubleWritable dw:val.getValue()){
- vals.add(dw.get());
- }
- if(LOG.isDebugEnabled()) LOG.debug("KEY: "+keys+", VALUE: "+vals);
- }
- }
+ HTableInterface table;
+ long startTime;
+ long endTime;
+ List<String> rowkeys;
+ AggregateClient client;
+ Scan scan;
+ int num = 200;
+
+ private final static Logger LOG = LoggerFactory.getLogger(TestGroupAggregateClient.class);
+
+ @Before
+ public void setUp() {
+ hbase.createTable("unittest", "f");
+ startTime = System.currentTimeMillis();
+ try {
+ rowkeys = prepareData(num);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ endTime = System.currentTimeMillis();
+ table = EagleConfigFactory.load().getHTable("unittest");
+ client = new AggregateClientImpl();
+ scan = new Scan();
+ scan.setCaching(200);
+
+ ListQueryCompiler compiler = null;
+ try {
+ compiler = new ListQueryCompiler("TestLogAPIEntity[@cluster=\"test4UT\" and @datacenter=\"dc1\"]{@field1,@field2}");
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ scan.setFilter(compiler.filter());
+ }
+
+ @After
+ public void shutdown() {
+ try {
+ hbase.deleteTable("unittest");
+ new HTableFactory().releaseHTableInterface(table);
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ private List<String> prepareData(int count) throws Exception {
+ List<TaggedLogAPIEntity> list = new ArrayList<TaggedLogAPIEntity>();
+ EntityDefinition ed = EntityDefinitionManager.getEntityDefinitionByEntityClass(TestLogAPIEntity.class);
+
+ if (ed == null) {
+ EntityDefinitionManager.registerEntity(TestLogAPIEntity.class);
+ ed = EntityDefinitionManager.getEntityDefinitionByEntityClass(TestLogAPIEntity.class);
+ }
+ ed.setTimeSeries(true);
+ for (int i = 0; i < count; i++) {
+ TestLogAPIEntity e = new TestLogAPIEntity();
+ e.setTimestamp(System.currentTimeMillis());
+ e.setField1(1);
+ e.setField2(2);
+ e.setField3(3);
+ e.setField4(4L);
+ e.setField5(5.0);
+ e.setField6(5.0);
+ e.setField7("7");
+ e.setTags(new HashMap<String, String>());
+ e.getTags().put("cluster", "test4UT");
+ e.getTags().put("datacenter", "dc1");
+ e.getTags().put("index", "" + i);
+ e.getTags().put("jobId", "job_" + System.currentTimeMillis());
+ list.add(e);
+
+ }
+ GenericEntityWriter writer = new GenericEntityWriter(ed.getService());
+ LOG.info("Writing " + list.size() + " TestLogAPIEntity entities");
+ List<String> result = writer.write(list);
+ LOG.info("Finish writing test entities");
+ return result;
+ }
+
+ @Test
+ public void testGroupAggregateCountClient() {
+ try {
+ EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName("TestLogAPIEntity");
+ List<GroupbyKeyValue> result = client.aggregate(table, ed, scan, Arrays.asList("cluster", "datacenter"), Arrays.asList(AggregateFunctionType.count), Arrays.asList("field2")).getKeyValues();
+ if (LOG.isDebugEnabled()) LOG.debug("COUNT");
+ logGroupbyKeyValue(result);
+ Assert.assertNotNull(result);
+ Assert.assertTrue(result.size() > 0);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testGroupAggregateAvgClient() {
+ try {
+ EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName("TestLogAPIEntity");
+ List<GroupbyKeyValue> result = client.aggregate(table, ed, scan, Arrays.asList("cluster", "datacenter"), Arrays.asList(AggregateFunctionType.avg), Arrays.asList("field2")).getKeyValues();
+ if (LOG.isDebugEnabled()) LOG.debug("AVG");
+ logGroupbyKeyValue(result);
+ Assert.assertNotNull(result);
+ Assert.assertTrue(result.size() > 0);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testGroupAggregateMaxClient() {
+ try {
+ EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName("TestLogAPIEntity");
+ List<GroupbyKeyValue> result = client.aggregate(table, ed, scan, Arrays.asList("cluster", "datacenter"), Arrays.asList(AggregateFunctionType.max), Arrays.asList("field1")).getKeyValues();
+ if (LOG.isDebugEnabled()) LOG.debug("MAX");
+ logGroupbyKeyValue(result);
+ Assert.assertNotNull(result);
+ Assert.assertTrue(result.size() > 0);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testGroupAggregateSumClient() {
+ try {
+ EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName("TestLogAPIEntity");
+ List<GroupbyKeyValue> result = client.aggregate(table, ed, scan, Arrays.asList("cluster", "datacenter"), Arrays.asList(AggregateFunctionType.sum), Arrays.asList("field2")).getKeyValues();
+ if (LOG.isDebugEnabled()) LOG.debug("MAX");
+ logGroupbyKeyValue(result);
+ Assert.assertNotNull(result);
+ Assert.assertTrue(result.size() > 0);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testGroupAggregateMinClient() {
+
+ try {
+ EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName("TestLogAPIEntity");
+ List<GroupbyKeyValue> result = client.aggregate(table, ed, scan, Arrays.asList("cluster", "datacenter"), Arrays.asList(AggregateFunctionType.min), Arrays.asList("field2")).getKeyValues();
+ if (LOG.isDebugEnabled()) LOG.debug("MIN");
+ logGroupbyKeyValue(result);
+ Assert.assertNotNull(result);
+ Assert.assertTrue(result.size() > 0);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testGroupAggregateMultipleClient() {
+ try {
+ EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName("TestLogAPIEntity");
+ List<GroupbyKeyValue> result = client.aggregate(table, ed, scan, Arrays.asList("cluster", "datacenter"),
+ Arrays.asList(AggregateFunctionType.min,
+ AggregateFunctionType.max,
+ AggregateFunctionType.avg,
+ AggregateFunctionType.count,
+ AggregateFunctionType.sum),
+ Arrays.asList("field2", "field2", "field2", "field2", "field2")).getKeyValues();
+ logGroupbyKeyValue(result);
+ Assert.assertNotNull(result);
+ Assert.assertTrue(result.size() > 0);
+ Assert.assertEquals("test4UT", new String(result.get(0).getKey().getValue().get(0).copyBytes()));
+ Assert.assertEquals("dc1", new String(result.get(0).getKey().getValue().get(1).copyBytes()));
+ Assert.assertEquals(2.0, result.get(0).getValue().get(0).get(), 0.00001);
+ Assert.assertEquals(2.0, result.get(0).getValue().get(1).get(), 0.00001);
+ Assert.assertEquals(2.0, result.get(0).getValue().get(2).get(), 0.00001);
+ Assert.assertTrue(num <= result.get(0).getValue().get(3).get());
+ Assert.assertTrue(2.0 * num <= result.get(0).getValue().get(4).get());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ private void logGroupbyKeyValue(List<GroupbyKeyValue> keyValues) {
+ for (GroupbyKeyValue keyValue : keyValues) {
+ GroupbyKey key = keyValue.getKey();
+ List<String> keys = new ArrayList<String>();
+ for (BytesWritable bytes : key.getValue()) {
+ keys.add(new String(bytes.copyBytes()));
+ }
+ List<Double> vals = new ArrayList<Double>();
+ GroupbyValue val = keyValue.getValue();
+ for (DoubleWritable dw : val.getValue()) {
+ vals.add(dw.get());
+ }
+ if (LOG.isDebugEnabled()) LOG.debug("KEY: " + keys + ", VALUE: " + vals);
+ }
+ }
}
\ No newline at end of file