You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/09/07 17:42:46 UTC
[50/52] [abbrv] incubator-eagle git commit: [EAGLE-520] Fix and
decouple co-processor from eagle aggreation query service
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/aggregate/GenericAggregateQuery.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/aggregate/GenericAggregateQuery.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/aggregate/GenericAggregateQuery.java
deleted file mode 100755
index 8f20b61..0000000
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/aggregate/GenericAggregateQuery.java
+++ /dev/null
@@ -1,423 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.storage.hbase.query.aggregate;
-
-import org.apache.eagle.log.entity.GenericMetricEntity;
-import org.apache.eagle.log.entity.HBaseInternalLogHelper;
-import org.apache.eagle.log.entity.SearchCondition;
-import org.apache.eagle.log.entity.meta.EntityConstants;
-import org.apache.eagle.log.entity.meta.EntityDefinition;
-import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
-import org.apache.eagle.query.GenericQuery;
-import org.apache.eagle.query.QueryConstants;
-import org.apache.eagle.query.aggregate.AggregateCondition;
-import org.apache.eagle.query.aggregate.AggregateFunctionType;
-import org.apache.eagle.query.aggregate.raw.GroupbyKey;
-import org.apache.eagle.query.aggregate.raw.GroupbyKeyValue;
-import org.apache.eagle.query.aggregate.raw.GroupbyValue;
-import org.apache.eagle.query.aggregate.timeseries.PostFlatAggregateSort;
-import org.apache.eagle.query.aggregate.timeseries.SortOption;
-import org.apache.eagle.query.aggregate.timeseries.TimeSeriesAggregator;
-import org.apache.eagle.query.aggregate.timeseries.TimeSeriesPostFlatAggregateSort;
-import org.apache.eagle.storage.hbase.query.coprocessor.AggregateResult;
-import org.apache.eagle.storage.hbase.query.coprocessor.impl.AggregateResultCallbackImpl;
-import org.apache.eagle.common.DateTimeUtil;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DoubleWritable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.*;
-
-/**
- * AggregateQuery
- *
- * <ol>
- * <li>Open HBase connection</li>
- * <li>Aggregate through Coprocessor</li>
- * <li>Build GroupAggregateQuery.GroupAggregateQueryReader to process result and order as sort options</li>
- * <li>Return result list</li>
- * </ol>
- *
- * @since : 11/7/14,2014
- */
-public class GenericAggregateQuery implements GenericQuery {
- private static final Logger LOG = LoggerFactory.getLogger(GenericAggregateQuery.class);
- private final List<AggregateFunctionType> sortFuncs;
- private final List<String> sortFields;
-
- private EntityDefinition entityDef;
- private SearchCondition searchCondition;
- private AggregateCondition aggregateCondition;
- private String prefix;
- private long lastTimestamp = 0;
- private long firstTimestamp = 0;
- private List<SortOption> sortOptions;
- private int top;
-
- private int aggFuncNum;
- private int sortAggFuncNum;
- private int sortFuncNum;
-
- /**
- *
- * @param serviceName
- * @param condition
- * @param aggregateCondition
- * @param metricName
- * @throws InstantiationException
- * @throws IllegalAccessException
- */
- public GenericAggregateQuery(String serviceName, SearchCondition condition, AggregateCondition aggregateCondition, String metricName)
- throws InstantiationException, IllegalAccessException{
- this(serviceName, condition, aggregateCondition, metricName,null,null,null,0);
- }
-
- /**
- *
- * @param serviceName
- * @param condition
- * @param aggregateCondition
- * @param metricName
- * @param sortOptions
- * @param sortFunctionTypes
- * @param sortFields
- * @param top
- * @throws InstantiationException
- * @throws IllegalAccessException
- */
- public GenericAggregateQuery(String serviceName, SearchCondition condition,
- AggregateCondition aggregateCondition, String metricName,
- List<SortOption> sortOptions,List<AggregateFunctionType> sortFunctionTypes,List<String> sortFields,int top)
- throws InstantiationException, IllegalAccessException{
- checkNotNull(serviceName, "serviceName");
- this.searchCondition = condition;
- this.entityDef = EntityDefinitionManager.getEntityByServiceName(serviceName);
- checkNotNull(entityDef, "EntityDefinition");
- checkNotNull(entityDef, "GroupAggregateCondition");
- this.aggregateCondition = aggregateCondition;
- this.aggFuncNum = this.aggregateCondition.getAggregateFunctionTypes().size();
- this.sortOptions = sortOptions;
- this.sortFuncs = sortFunctionTypes;
- this.sortFuncNum = this.sortOptions == null ? 0: this.sortOptions.size();
- this.sortFields = sortFields;
- this.top = top;
-
- if(serviceName.equals(GenericMetricEntity.GENERIC_METRIC_SERVICE)){
- if(LOG.isDebugEnabled()) LOG.debug("list metric aggregate query");
- if(metricName == null || metricName.isEmpty()){
- throw new IllegalArgumentException("metricName should not be empty for metric list query");
- }
- if(!condition.getOutputFields().contains(GenericMetricEntity.VALUE_FIELD)){
- condition.getOutputFields().add(GenericMetricEntity.VALUE_FIELD);
- }
- this.prefix = metricName;
- }else{
- if(LOG.isDebugEnabled()) LOG.debug("list entity aggregate query");
- this.prefix = entityDef.getPrefix();
- }
-
- // Add sort oriented aggregation functions into aggregateCondtion
- if(this.sortOptions!=null){
- // if sort for time series aggregation
- if(this.aggregateCondition.isTimeSeries()) {
- this.sortAggFuncNum = 0;
- int index = 0;
- for (SortOption sortOption : this.sortOptions) {
- if (!sortOption.isInGroupby()) {
- if (LOG.isDebugEnabled())
- LOG.debug("Add additional aggregation functions for sort options " + sortOption.toString() + " in index: " + (this.aggFuncNum + this.sortAggFuncNum));
- AggregateFunctionType _sortFunc = this.sortFuncs.get(index);
- if (AggregateFunctionType.avg.equals(_sortFunc)) {
- this.aggregateCondition.getAggregateFunctionTypes().add(AggregateFunctionType.sum);
- } else {
- this.aggregateCondition.getAggregateFunctionTypes().add(_sortFunc);
- }
- this.aggregateCondition.getAggregateFields().add(this.sortFields.get(index));
-
- sortOption.setIndex(this.sortAggFuncNum);
- sortAggFuncNum++;
- }
- index++;
- }
- }
- }
- }
-
-
- private void checkNotNull(Object o, String message){
- if(o == null){
- throw new IllegalArgumentException(message + " should not be null");
- }
- }
-
- /**
- * TODO: Return List<GroupAggregateAPIEntity>
- *
- * @see GenericAggregateQuery.TimeSeriesGroupAggregateQueryReader#result()
- * @see GenericAggregateQuery.FlatGroupAggregateQueryReader#result()
- *
- */
- @Override
- @SuppressWarnings("raw")
- public List result() throws Exception {
- Date start = null;
- Date end = null;
- // shortcut to avoid read when pageSize=0
- if(searchCondition.getPageSize() <= 0){
- return null;
- }
- // Process the time range if needed
- if(entityDef.isTimeSeries()){
- start = DateTimeUtil.humanDateToDate(searchCondition.getStartTime());
- end = DateTimeUtil.humanDateToDate(searchCondition.getEndTime());
- }else{
- start = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_START_HUMANTIME);
- end = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_END_HUMANTIME);
- }
- // Generate the output qualifiers
- final byte[][] outputQualifiers = HBaseInternalLogHelper.getOutputQualifiers(entityDef, searchCondition.getOutputFields());
- GenericAggregateReader reader = new GenericAggregateReader(entityDef,
- searchCondition.getPartitionValues(),
- start, end, searchCondition.getFilter(), searchCondition.getStartRowkey(), outputQualifiers, this.prefix,this.aggregateCondition);
- try{
- if(LOG.isDebugEnabled()) LOG.debug("open and read group aggregate reader");
- reader.open();
- List result = buildGroupAggregateQueryReader(reader,this.aggregateCondition.isTimeSeries()).result();
- if(result == null) throw new IOException("result is null");
- this.firstTimestamp = reader.getFirstTimestamp();
- this.lastTimestamp = reader.getLastTimestamp();
- if(LOG.isDebugEnabled()) LOG.debug("finish read aggregated " + result.size() + " rows");
- return result;
- }catch (IOException ex){
- LOG.error("Fail reading aggregated results", ex);
- throw ex;
- }finally{
- if(reader != null) {
- if(LOG.isDebugEnabled()) LOG.debug("Release HBase connection");
- reader.close();
- }
- }
- }
-
- ///////////////////////////////////////////////////////////
- // GroupAggregateQueryReader(GroupAggregateLogReader)
- // |_ FlatGroupAggregateQueryReader
- // |_ TimeSeriesGroupAggregateQueryReader
- ///////////////////////////////////////////////////////////
-
- /**
- * Factory method for {@link GroupAggregateQueryReader}
- * <pre>
- * {@link GroupAggregateQueryReader}
- * |_ {@link FlatGroupAggregateQueryReader}
- * |_ {@link TimeSeriesGroupAggregateQueryReader}
- * </pre>
- * @param reader
- * @param isTimeSeries
- * @return
- * @throws IOException
- */
- private GroupAggregateQueryReader buildGroupAggregateQueryReader(GenericAggregateReader reader,boolean isTimeSeries) throws IOException{
- if(isTimeSeries){
- return new TimeSeriesGroupAggregateQueryReader(reader,this);
- }else{
- return new FlatGroupAggregateQueryReader(reader,this);
- }
- }
-
- private abstract class GroupAggregateQueryReader {
- protected final GenericAggregateReader reader;
- protected final GenericAggregateQuery query;
-
- public GroupAggregateQueryReader(GenericAggregateReader reader, GenericAggregateQuery query){
- this.reader = reader;
- this.query = query;
- }
- public abstract <T> List<T> result() throws Exception;
-
- protected Map<List<String>, List<Double>> keyValuesToMap(List<GroupbyKeyValue> entities) throws Exception {
- Map<List<String>, List<Double>> aggResultMap = new HashMap<List<String>, List<Double>>();
- try {
- for(GroupbyKeyValue keyValue:entities){
- List<String> key = new ArrayList<String>();
- for(BytesWritable bw:keyValue.getKey().getValue()){
- key.add(new String(bw.copyBytes(), QueryConstants.CHARSET));
- }
- List<Double> value = new ArrayList<Double>();
- for(DoubleWritable wa:keyValue.getValue().getValue()){
- value.add(wa.get());
- }
- aggResultMap.put(key, value);
- }
- } catch (UnsupportedEncodingException e) {
- LOG.error(QueryConstants.CHARSET +" not support: "+e.getMessage(),e);
- }
- return aggResultMap;
- }
- }
-
- private class FlatGroupAggregateQueryReader extends GroupAggregateQueryReader{
- public FlatGroupAggregateQueryReader(GenericAggregateReader reader, GenericAggregateQuery query) {
- super(reader,query);
- }
- @Override
- public List<Map.Entry<List<String>, List<Double>>> result() throws Exception {
- Map<List<String>, List<Double>> aggResultMap = this.keyValuesToMap(this.reader.read());
- if(this.query.sortOptions == null)
- return new ArrayList<Map.Entry<List<String>, List<Double>>>(aggResultMap.entrySet());
- if(LOG.isDebugEnabled()) LOG.debug("Flat sorting");
- return PostFlatAggregateSort.sort(aggResultMap, this.query.sortOptions, this.query.top);
- }
- }
-
- private class TimeSeriesGroupAggregateQueryReader extends GroupAggregateQueryReader{
- private final Date start;
- private final Date end;
- private final int pointsNum;
- private final int aggFuncNum;
- private final List<SortOption> sortOptions;
- private final List<AggregateFunctionType> sortFuncs;
- private final int sortAggFuncNum;
-
- public TimeSeriesGroupAggregateQueryReader(GenericAggregateReader reader, GenericAggregateQuery query) throws IOException {
- super(reader,query);
- try {
- if(entityDef.isTimeSeries()){
- this.start = DateTimeUtil.humanDateToDate(searchCondition.getStartTime());
- this.end = DateTimeUtil.humanDateToDate(searchCondition.getEndTime());
- }else{
- start = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_START_HUMANTIME);
- end = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_END_HUMANTIME);
- }
- this.pointsNum = (int)((end.getTime()-1-start.getTime())/this.query.aggregateCondition.getIntervalMS() + 1);
- this.aggFuncNum = this.query.aggFuncNum;
- this.sortOptions = this.query.sortOptions;
- this.sortFuncs = this.query.sortFuncs;
- this.sortAggFuncNum = this.query.sortAggFuncNum;
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
- /**
- * <h2>TimeSeriesReader result</h2>
- * <ol>
- * <li>generateTimeSeriesDataPoints()</li>
- * <li>if not sort options, return generate time series data points</li>
- * <li>if requiring sort, sort time series data points by order of flat aggregation</li>
- * </ol>
- *
- * <h2>Time Series Sort Algorithms</h2>
- * <ol>
- * <li>Flat aggregate on grouped fields without time series bucket index</li>
- * <li>Flat aggregated result according given sortOptions</li>
- * <li>Sort Time Series Result according the same order of flat aggregated keys</li>
- * </ol>
- *
- * @see #convertToTimeSeriesDataPoints(java.util.List)
- *
- * @return
- * @throws Exception
- */
- @Override
- public List<Map.Entry<List<String>, List<double[]>>> result() throws Exception {
- List<GroupbyKeyValue> result = this.reader.read();
-
- // aggregated data points only
- Map<List<String>,List<double[]>> timeseriesDataPoints = convertToTimeSeriesDataPoints(result);
-
- if(this.query.sortOptions == null)
- // return time-series data points without sort
- return new ArrayList<Map.Entry<List<String>, List<double[]>>>(timeseriesDataPoints.entrySet());
-
- LOG.info("Time series sorting");
-
- // Time Series Sort Steps
- // ======================
- // 1. Flat aggregate on grouped fields without time series bucket index
- // 2. Flat aggregated result according given sortOptions
- // 3. Sort Time Series Result according flat aggregated keys' order
-
- // 1. Flat aggregate on grouped fields without time series bucket index
- AggregateResultCallbackImpl callback = new AggregateResultCallbackImpl(this.sortFuncs);
- for(GroupbyKeyValue kv:result){
- ArrayList<BytesWritable> copykey = new ArrayList<BytesWritable>(kv.getKey().getValue());
- // remove time series bucket index
- copykey.remove(copykey.size()-1);
- GroupbyKey key = new GroupbyKey();
-
- // [this.aggFuncNum,this.aggFuncNum + this.sortFuncNum)
- GroupbyValue value = new GroupbyValue();
- for(int i = this.aggFuncNum;i<this.aggFuncNum+this.sortAggFuncNum;i++){
- value.add(kv.getValue().get(i));
- value.addMeta(kv.getValue().getMeta(i));
- }
- key.addAll(copykey);
- GroupbyKeyValue keyValue = new GroupbyKeyValue(key,value);
- callback.update(keyValue);
- }
- AggregateResult callbackResult = callback.result();
- Map<List<String>, List<Double>> mapForSort = this.keyValuesToMap(callbackResult.getKeyValues());
-
- // 2. Flat aggregated result according given sortOptions
-// List<Map.Entry<List<String>, List<Double>>> flatSort = PostFlatAggregateSort.sort(mapForSort , this.sortOptions, Integer.MAX_VALUE);
-// mapForSort = new HashMap<List<String>, List<Double>>();
-// for(Map.Entry<List<String>, List<Double>> entry:flatSort){
-// mapForSort.put(entry.getKey(),entry.getValue());
-// }
-
- // 3. Sort Time Series Result according flat aggregated keys' order
- return TimeSeriesPostFlatAggregateSort.sort(mapForSort,timeseriesDataPoints,this.sortOptions,this.query.top);
- }
-
- /**
- * Convert raw GroupbyKeyValue list into time-series data points hash map
- *
- * @param result <code>List<GroupbyKeyValue></code>
- * @return Map<List<String>,List<double[]>>
- * @throws Exception
- */
- private Map<List<String>,List<double[]>> convertToTimeSeriesDataPoints(List<GroupbyKeyValue> result) throws Exception {
- Map<List<String>, List<Double>> aggResultMap = this.keyValuesToMap(result);
- Map<List<String>,List<double[]>> timeseriesDataPoints = TimeSeriesAggregator.toMetric(aggResultMap,this.pointsNum,this.aggFuncNum);
- return timeseriesDataPoints;
- }
- }
-
- /**
- * Get last / max timestamp
- *
- * @return lastTimestamp
- */
- @Override
- public long getLastTimestamp() {
- return this.lastTimestamp;
- }
-
- /**
- * Get first / min timestamp
- *
- * @return firstTimestamp
- */
- @Override
- public long getFirstTimeStamp() {
- return this.firstTimestamp;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/aggregate/GenericAggregateReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/aggregate/GenericAggregateReader.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/aggregate/GenericAggregateReader.java
index 3d0fa94..67c957b 100755
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/aggregate/GenericAggregateReader.java
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/aggregate/GenericAggregateReader.java
@@ -19,10 +19,11 @@ package org.apache.eagle.storage.hbase.query.aggregate;
import org.apache.eagle.log.entity.AbstractHBaseLogReader;
import org.apache.eagle.log.entity.meta.EntityDefinition;
import org.apache.eagle.query.aggregate.AggregateCondition;
-import org.apache.eagle.storage.hbase.query.coprocessor.impl.AggregateClientImpl;
import org.apache.eagle.query.aggregate.raw.GroupbyKeyValue;
import org.apache.eagle.storage.hbase.query.coprocessor.AggregateClient;
import org.apache.eagle.storage.hbase.query.coprocessor.AggregateResult;
+import org.apache.eagle.storage.hbase.query.coprocessor.impl.AggregateClientImpl;
+
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
@@ -31,101 +32,93 @@ import java.io.IOException;
import java.util.Date;
import java.util.List;
-/**
- * @since : 11/7/14,2014
- */
public class GenericAggregateReader extends AbstractHBaseLogReader<List<GroupbyKeyValue>> {
- private final long startTime;
- private final long endTime;
- private AggregateClient aggregateClient = new AggregateClientImpl();
- private EntityDefinition ed;
- private final AggregateCondition aggregateCondition;
- private AggregateResult result;
+ private final long startTime;
+ private final long endTime;
+ private AggregateClient aggregateClient = new AggregateClientImpl();
+ private EntityDefinition ed;
+ private final AggregateCondition aggregateCondition;
+ private AggregateResult result;
- /**
- *
- * @param ed Entity Definition
- * @param partitions Partition values
- * @param startTime Start time
- * @param endTime End time
- * @param filter HBase filter for scanning
- * @param lastScanKey Last HBase scan row key in String
- * @param outputQualifiers HBase output qualifiers in bytes
- * @param condition GroupAggregateCondition Object
- *
- * @see org.apache.eagle.query.aggregate.AggregateCondition
- */
- @SuppressWarnings("unused")
- private GenericAggregateReader(EntityDefinition ed,
- List<String> partitions,
- Date startTime,
- Date endTime,
- Filter filter,
- String lastScanKey,
- byte[][] outputQualifiers,
- AggregateCondition condition) {
- super(ed, partitions, startTime, endTime, filter, lastScanKey, outputQualifiers);
- this.ed = ed;
- this.startTime = startTime.getTime();
- this.endTime = endTime.getTime();
- this.aggregateCondition = condition;
- }
+ /**
+ * @param ed Entity Definition
+ * @param partitions Partition values
+ * @param startTime Start time
+ * @param endTime End time
+ * @param filter HBase filter for scanning
+ * @param lastScanKey Last HBase scan row key in String
+ * @param outputQualifiers HBase output qualifiers in bytes
+ * @param condition GroupAggregateCondition Object
+ * @see org.apache.eagle.query.aggregate.AggregateCondition
+ */
+ private GenericAggregateReader(EntityDefinition ed,
+ List<String> partitions,
+ Date startTime,
+ Date endTime,
+ Filter filter,
+ String lastScanKey,
+ byte[][] outputQualifiers,
+ AggregateCondition condition) {
+ super(ed, partitions, startTime, endTime, filter, lastScanKey, outputQualifiers);
+ this.ed = ed;
+ this.startTime = startTime.getTime();
+ this.endTime = endTime.getTime();
+ this.aggregateCondition = condition;
+ }
- /**
- *
- * @param ed Entity Definition
- * @param partitions Partition values
- * @param startTime Start time
- * @param endTime End time
- * @param filter HBase filter for scanning
- * @param lastScanKey Last HBase scan row key in String
- * @param outputQualifiers HBase output qualifiers in bytes
- * @param prefix HBase prefix, not necessary except for GenericMetric query
- * @param condition GroupAggregateCondition Object
- *
- * @see org.apache.eagle.query.aggregate.AggregateCondition
- */
- public GenericAggregateReader(EntityDefinition ed,
- List<String> partitions,
- Date startTime,
- Date endTime,
- Filter filter,
- String lastScanKey,
- byte[][] outputQualifiers,
- String prefix,
- AggregateCondition condition) {
- super(ed, partitions, startTime, endTime, filter, lastScanKey, outputQualifiers, prefix);
- this.ed = ed;
- this.startTime = startTime.getTime();
- this.endTime = endTime.getTime();
- this.aggregateCondition = condition;
- }
+ /**
+ * @param ed Entity Definition
+ * @param partitions Partition values
+ * @param startTime Start time
+ * @param endTime End time
+ * @param filter HBase filter for scanning
+ * @param lastScanKey Last HBase scan row key in String
+ * @param outputQualifiers HBase output qualifiers in bytes
+ * @param prefix HBase prefix, not necessary except for GenericMetric query
+ * @param condition GroupAggregateCondition Object
+ * @see org.apache.eagle.query.aggregate.AggregateCondition
+ */
+ public GenericAggregateReader(EntityDefinition ed,
+ List<String> partitions,
+ Date startTime,
+ Date endTime,
+ Filter filter,
+ String lastScanKey,
+ byte[][] outputQualifiers,
+ String prefix,
+ AggregateCondition condition) {
+ super(ed, partitions, startTime, endTime, filter, lastScanKey, outputQualifiers, prefix);
+ this.ed = ed;
+ this.startTime = startTime.getTime();
+ this.endTime = endTime.getTime();
+ this.aggregateCondition = condition;
+ }
- @Override
- protected void onOpen(HTableInterface tbl, Scan scan) throws IOException {
- this.result = this.aggregateClient.aggregate(
- tbl,
- this.ed,
- scan,
- this.aggregateCondition.getGroupbyFields(),
- this.aggregateCondition.getAggregateFunctionTypes(),
- this.aggregateCondition.getAggregateFields(),
- this.aggregateCondition.isTimeSeries(),
- this.startTime,
- this.endTime,
- this.aggregateCondition.getIntervalMS());
- }
+ @Override
+ protected void onOpen(HTableInterface tbl, Scan scan) throws IOException {
+ this.result = this.aggregateClient.aggregate(
+ tbl,
+ this.ed,
+ scan,
+ this.aggregateCondition.getGroupbyFields(),
+ this.aggregateCondition.getAggregateFunctionTypes(),
+ this.aggregateCondition.getAggregateFields(),
+ this.aggregateCondition.isTimeSeries(),
+ this.startTime,
+ this.endTime,
+ this.aggregateCondition.getIntervalMS());
+ }
- @Override
- public List<GroupbyKeyValue> read() throws IOException {
- return this.result.getKeyValues();
- }
+ @Override
+ public List<GroupbyKeyValue> read() throws IOException {
+ return this.result.getKeyValues();
+ }
- public long getFirstTimestamp() {
- return this.result.getStartTimestamp();
- }
+ public long getFirstTimestamp() {
+ return this.result.getStartTimestamp();
+ }
- public long getLastTimestamp() {
- return this.result.getStopTimestamp();
- }
+ public long getLastTimestamp() {
+ return this.result.getStopTimestamp();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/aggregate/GenericCoprocessorAggregateQuery.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/aggregate/GenericCoprocessorAggregateQuery.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/aggregate/GenericCoprocessorAggregateQuery.java
new file mode 100644
index 0000000..85044ec
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/aggregate/GenericCoprocessorAggregateQuery.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.storage.hbase.query.aggregate;
+
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.log.entity.HBaseInternalLogHelper;
+import org.apache.eagle.log.entity.SearchCondition;
+import org.apache.eagle.log.entity.meta.EntityConstants;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
+import org.apache.eagle.query.GenericQuery;
+import org.apache.eagle.query.QueryConstants;
+import org.apache.eagle.query.aggregate.AggregateCondition;
+import org.apache.eagle.query.aggregate.AggregateFunctionType;
+import org.apache.eagle.query.aggregate.raw.GroupbyKey;
+import org.apache.eagle.query.aggregate.raw.GroupbyKeyValue;
+import org.apache.eagle.query.aggregate.raw.GroupbyValue;
+import org.apache.eagle.query.aggregate.timeseries.PostFlatAggregateSort;
+import org.apache.eagle.query.aggregate.timeseries.SortOption;
+import org.apache.eagle.query.aggregate.timeseries.TimeSeriesAggregator;
+import org.apache.eagle.query.aggregate.timeseries.TimeSeriesPostFlatAggregateSort;
+import org.apache.eagle.storage.hbase.query.coprocessor.AggregateResult;
+import org.apache.eagle.storage.hbase.query.coprocessor.impl.AggregateResultCallbackImpl;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.*;
+
+/**
+ * Coprocessor based AggregateQuery
+ * <ol>
+ * <li>Open HBase connection</li>
+ * <li>Aggregate through Coprocessor</li>
+ * <li>Build GroupAggregateQuery.GroupAggregateQueryReader to process result and order as sort options</li>
+ * <li>Return result list</li>
+ * </ol>
+ */
+public class GenericCoprocessorAggregateQuery implements GenericQuery {
+ private static final Logger LOG = LoggerFactory.getLogger(GenericCoprocessorAggregateQuery.class);
+ private final List<AggregateFunctionType> sortFuncs;
+ private final List<String> sortFields;
+
+ private EntityDefinition entityDef;
+ private SearchCondition searchCondition;
+ private AggregateCondition aggregateCondition;
+ private String prefix;
+ private long lastTimestamp = 0;
+ private long firstTimestamp = 0;
+ private List<SortOption> sortOptions;
+ private int top;
+
+ private int aggFuncNum;
+ private int sortAggFuncNum;
+ private int sortFuncNum;
+
+ public GenericCoprocessorAggregateQuery(String serviceName, SearchCondition condition, AggregateCondition aggregateCondition, String metricName)
+ throws InstantiationException, IllegalAccessException {
+ this(serviceName, condition, aggregateCondition, metricName, null, null, null, 0);
+ }
+
+ public GenericCoprocessorAggregateQuery(String serviceName, SearchCondition condition,
+ AggregateCondition aggregateCondition, String metricName,
+ List<SortOption> sortOptions, List<AggregateFunctionType> sortFunctionTypes, List<String> sortFields, int top)
+ throws InstantiationException, IllegalAccessException {
+ checkNotNull(serviceName, "serviceName");
+ this.searchCondition = condition;
+ this.entityDef = EntityDefinitionManager.getEntityByServiceName(serviceName);
+ checkNotNull(entityDef, "EntityDefinition");
+ checkNotNull(entityDef, "GroupAggregateCondition");
+ this.aggregateCondition = aggregateCondition;
+ this.aggFuncNum = this.aggregateCondition.getAggregateFunctionTypes().size();
+ this.sortOptions = sortOptions;
+ this.sortFuncs = sortFunctionTypes;
+ this.sortFuncNum = this.sortOptions == null ? 0 : this.sortOptions.size();
+ this.sortFields = sortFields;
+ this.top = top;
+
+ if (serviceName.equals(GenericMetricEntity.GENERIC_METRIC_SERVICE)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("list metric aggregate query");
+ }
+ if (metricName == null || metricName.isEmpty()) {
+ throw new IllegalArgumentException("metricName should not be empty for metric list query");
+ }
+ if (!condition.getOutputFields().contains(GenericMetricEntity.VALUE_FIELD)) {
+ condition.getOutputFields().add(GenericMetricEntity.VALUE_FIELD);
+ }
+ this.prefix = metricName;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("list entity aggregate query");
+ }
+ this.prefix = entityDef.getPrefix();
+ }
+
+ // Add sort oriented aggregation functions into aggregateCondtion
+ if (this.sortOptions != null) {
+ // if sort for time series aggregation
+ if (this.aggregateCondition.isTimeSeries()) {
+ this.sortAggFuncNum = 0;
+ int index = 0;
+ for (SortOption sortOption : this.sortOptions) {
+ if (!sortOption.isInGroupby()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Add additional aggregation functions for sort options " + sortOption.toString() + " in index: " + (this.aggFuncNum + this.sortAggFuncNum));
+ }
+ AggregateFunctionType _sortFunc = this.sortFuncs.get(index);
+ if (AggregateFunctionType.avg.equals(_sortFunc)) {
+ this.aggregateCondition.getAggregateFunctionTypes().add(AggregateFunctionType.sum);
+ } else {
+ this.aggregateCondition.getAggregateFunctionTypes().add(_sortFunc);
+ }
+ this.aggregateCondition.getAggregateFields().add(this.sortFields.get(index));
+
+ sortOption.setIndex(this.sortAggFuncNum);
+ sortAggFuncNum++;
+ }
+ index++;
+ }
+ }
+ }
+ }
+
+
+ private void checkNotNull(Object o, String message) {
+ if (o == null) {
+ throw new IllegalArgumentException(message + " should not be null");
+ }
+ }
+
+ /**
+ * @see GenericCoprocessorAggregateQuery.TimeSeriesGroupAggregateQueryReader#result()
+ * @see GenericCoprocessorAggregateQuery.FlatGroupAggregateQueryReader#result()
+ */
+ @Override
+ public List result() throws Exception {
+ Date start = null;
+ Date end = null;
+ // shortcut to avoid read when pageSize=0
+ if (searchCondition.getPageSize() <= 0) {
+ return null;
+ }
+ // Process the time range if needed
+ if (entityDef.isTimeSeries()) {
+ start = DateTimeUtil.humanDateToDate(searchCondition.getStartTime());
+ end = DateTimeUtil.humanDateToDate(searchCondition.getEndTime());
+ } else {
+ start = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_START_HUMANTIME);
+ end = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_END_HUMANTIME);
+ }
+ // Generate the output qualifiers
+ final byte[][] outputQualifiers = HBaseInternalLogHelper.getOutputQualifiers(entityDef, searchCondition.getOutputFields());
+ GenericAggregateReader reader = new GenericAggregateReader(entityDef,
+ searchCondition.getPartitionValues(),
+ start, end, searchCondition.getFilter(), searchCondition.getStartRowkey(), outputQualifiers, this.prefix, this.aggregateCondition);
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("open and read group aggregate reader");
+ }
+ reader.open();
+ List result = buildGroupAggregateQueryReader(reader, this.aggregateCondition.isTimeSeries()).result();
+ if (result == null) {
+ throw new IOException("result is null");
+ }
+ this.firstTimestamp = reader.getFirstTimestamp();
+ this.lastTimestamp = reader.getLastTimestamp();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("finish read aggregated " + result.size() + " rows");
+ }
+ return result;
+ } catch (IOException ex) {
+ LOG.error("Fail reading aggregated results", ex);
+ throw ex;
+ } finally {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Release HBase connection");
+ }
+ reader.close();
+ }
+ }
+
+ /**
+ * Factory method for {@link GroupAggregateQueryReader}.
+ * <pre>
+ * {@link GroupAggregateQueryReader}
+ * |_ {@link FlatGroupAggregateQueryReader}
+ * |_ {@link TimeSeriesGroupAggregateQueryReader}
+ * </pre>
+ */
+ private GroupAggregateQueryReader buildGroupAggregateQueryReader(GenericAggregateReader reader, boolean isTimeSeries) throws IOException {
+ if (isTimeSeries) {
+ return new TimeSeriesGroupAggregateQueryReader(reader, this);
+ } else {
+ return new FlatGroupAggregateQueryReader(reader, this);
+ }
+ }
+
+ private abstract class GroupAggregateQueryReader {
+ protected final GenericAggregateReader reader;
+ protected final GenericCoprocessorAggregateQuery query;
+
+ public GroupAggregateQueryReader(GenericAggregateReader reader, GenericCoprocessorAggregateQuery query) {
+ this.reader = reader;
+ this.query = query;
+ }
+
+ public abstract <T> List<T> result() throws Exception;
+
+ protected Map<List<String>, List<Double>> keyValuesToMap(List<GroupbyKeyValue> entities) throws Exception {
+ Map<List<String>, List<Double>> aggResultMap = new HashMap<List<String>, List<Double>>();
+ try {
+ for (GroupbyKeyValue keyValue : entities) {
+ List<String> key = new ArrayList<String>();
+ for (BytesWritable bw : keyValue.getKey().getValue()) {
+ key.add(new String(bw.copyBytes(), QueryConstants.CHARSET));
+ }
+ List<Double> value = new ArrayList<Double>();
+ for (DoubleWritable wa : keyValue.getValue().getValue()) {
+ value.add(wa.get());
+ }
+ aggResultMap.put(key, value);
+ }
+ } catch (UnsupportedEncodingException e) {
+ LOG.error(QueryConstants.CHARSET + " not support: " + e.getMessage(), e);
+ }
+ return aggResultMap;
+ }
+ }
+
+ private class FlatGroupAggregateQueryReader extends GroupAggregateQueryReader {
+ public FlatGroupAggregateQueryReader(GenericAggregateReader reader, GenericCoprocessorAggregateQuery query) {
+ super(reader, query);
+ }
+
+ @Override
+ public List<Map.Entry<List<String>, List<Double>>> result() throws Exception {
+ Map<List<String>, List<Double>> aggResultMap = this.keyValuesToMap(this.reader.read());
+ if (this.query.sortOptions == null) {
+ return new ArrayList<Map.Entry<List<String>, List<Double>>>(aggResultMap.entrySet());
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Flat sorting");
+ }
+ return PostFlatAggregateSort.sort(aggResultMap, this.query.sortOptions, this.query.top);
+ }
+ }
+
+ private class TimeSeriesGroupAggregateQueryReader extends GroupAggregateQueryReader {
+ private final Date start;
+ private final Date end;
+ private final int pointsNum;
+ private final int aggFuncNum;
+ private final List<SortOption> sortOptions;
+ private final List<AggregateFunctionType> sortFuncs;
+ private final int sortAggFuncNum;
+
+ public TimeSeriesGroupAggregateQueryReader(GenericAggregateReader reader, GenericCoprocessorAggregateQuery query) throws IOException {
+ super(reader, query);
+ try {
+ if (entityDef.isTimeSeries()) {
+ this.start = DateTimeUtil.humanDateToDate(searchCondition.getStartTime());
+ this.end = DateTimeUtil.humanDateToDate(searchCondition.getEndTime());
+ } else {
+ start = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_START_HUMANTIME);
+ end = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_END_HUMANTIME);
+ }
+ this.pointsNum = (int) ((end.getTime() - 1 - start.getTime()) / this.query.aggregateCondition.getIntervalMS() + 1);
+ this.aggFuncNum = this.query.aggFuncNum;
+ this.sortOptions = this.query.sortOptions;
+ this.sortFuncs = this.query.sortFuncs;
+ this.sortAggFuncNum = this.query.sortAggFuncNum;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * <h2>TimeSeriesReader result</h2>
+ * <ol>
+ * <li>generateTimeSeriesDataPoints()</li>
+ * <li>if not sort options, return generate time series data points</li>
+ * <li>if requiring sort, sort time series data points by order of flat aggregation</li>
+ * </ol>
+ * <h2>Time Series Sort Algorithms</h2>
+ * <ol>
+ * <li>Flat aggregate on grouped fields without time series bucket index</li>
+ * <li>Flat aggregated result according given sortOptions</li>
+ * <li>Sort Time Series Result according the same order of flat aggregated keys</li>
+ * </ol>.
+ */
+ @Override
+ public List<Map.Entry<List<String>, List<double[]>>> result() throws Exception {
+ List<GroupbyKeyValue> result = this.reader.read();
+
+ // aggregated data points only
+ Map<List<String>, List<double[]>> timeseriesDataPoints = convertToTimeSeriesDataPoints(result);
+
+ // return time-series data points without sort
+ if (this.query.sortOptions == null) {
+ return new ArrayList<>(timeseriesDataPoints.entrySet());
+ }
+
+ LOG.info("Time series sorting");
+
+ // Time Series Sort Steps
+ // ======================
+ // 1. Flat aggregate on grouped fields without time series bucket index
+ // 2. Flat aggregated result according given sortOptions
+ // 3. Sort Time Series Result according flat aggregated keys' order
+
+ // 1. Flat aggregate on grouped fields without time series bucket index
+ AggregateResultCallbackImpl callback = new AggregateResultCallbackImpl(this.sortFuncs);
+ for (GroupbyKeyValue kv : result) {
+ ArrayList<BytesWritable> copykey = new ArrayList<BytesWritable>(kv.getKey().getValue());
+ // remove time series bucket index
+ copykey.remove(copykey.size() - 1);
+ GroupbyKey key = new GroupbyKey();
+
+ // [this.aggFuncNum,this.aggFuncNum + this.sortFuncNum)
+ GroupbyValue value = new GroupbyValue();
+ for (int i = this.aggFuncNum; i < this.aggFuncNum + this.sortAggFuncNum; i++) {
+ value.add(kv.getValue().get(i));
+ value.addMeta(kv.getValue().getMeta(i));
+ }
+ key.addAll(copykey);
+ GroupbyKeyValue keyValue = new GroupbyKeyValue(key, value);
+ callback.update(keyValue);
+ }
+ AggregateResult callbackResult = callback.result();
+ Map<List<String>, List<Double>> mapForSort = this.keyValuesToMap(callbackResult.getKeyValues());
+
+ // 2. Flat aggregated result according given sortOptions
+ // List<Map.Entry<List<String>, List<Double>>> flatSort = PostFlatAggregateSort.sort(mapForSort , this.sortOptions, Integer.MAX_VALUE);
+ // mapForSort = new HashMap<List<String>, List<Double>>();
+ // for(Map.Entry<List<String>, List<Double>> entry:flatSort){ mapForSort.put(entry.getKey(),entry.getValue()); }
+
+ // 3. Sort Time Series Result according flat aggregated keys' order
+ return TimeSeriesPostFlatAggregateSort.sort(mapForSort, timeseriesDataPoints, this.sortOptions, this.query.top);
+ }
+
+ /**
+ * Convert raw GroupbyKeyValue list into time-series data points hash map.
+ *
+ * @param result <code>List<GroupbyKeyValue></code>
+ * @return Map<List<String>,List<double[]>>
+ * @throws Exception
+ */
+ private Map<List<String>, List<double[]>> convertToTimeSeriesDataPoints(List<GroupbyKeyValue> result) throws Exception {
+ Map<List<String>, List<Double>> aggResultMap = this.keyValuesToMap(result);
+ Map<List<String>, List<double[]>> timeseriesDataPoints = TimeSeriesAggregator.toMetric(aggResultMap, this.pointsNum, this.aggFuncNum);
+ return timeseriesDataPoints;
+ }
+ }
+
+ /**
+ * Get last / max timestamp.
+ *
+ * @return lastTimestamp
+ */
+ @Override
+ public long getLastTimestamp() {
+ return this.lastTimestamp;
+ }
+
+ /**
+ * Get first / min timestamp.
+ *
+ * @return firstTimestamp
+ */
+ @Override
+ public long getFirstTimeStamp() {
+ return this.firstTimestamp;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/aggregate/GenericInMemoryAggregateQuery.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/aggregate/GenericInMemoryAggregateQuery.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/aggregate/GenericInMemoryAggregateQuery.java
new file mode 100644
index 0000000..667390c
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/aggregate/GenericInMemoryAggregateQuery.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.storage.hbase.query.aggregate;
+
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.log.entity.*;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
+import org.apache.eagle.query.GenericQuery;
+import org.apache.eagle.query.aggregate.AggregateCondition;
+import org.apache.eagle.query.aggregate.AggregateFunctionType;
+import org.apache.eagle.query.aggregate.timeseries.*;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class GenericInMemoryAggregateQuery implements GenericQuery {
+ private static final Logger LOG = LoggerFactory.getLogger(GenericCoprocessorAggregateQuery.class);
+ private final List<AggregateFunctionType> sortFuncs;
+ private final List<String> sortFields;
+ private final String serviceName;
+
+ private EntityDefinition entityDef;
+ private SearchCondition searchCondition;
+ private AggregateCondition aggregateCondition;
+ private String prefix;
+ private long lastTimestamp = 0;
+ private long firstTimestamp = 0;
+ private List<SortOption> sortOptions;
+ private int top;
+
+ private int aggFuncNum;
+ private int sortAggFuncNum;
+ private int sortFuncNum;
+
+ public GenericInMemoryAggregateQuery(String serviceName, SearchCondition condition,
+ AggregateCondition aggregateCondition, String metricName,
+ List<SortOption> sortOptions, List<AggregateFunctionType> sortFunctionTypes, List<String> sortFields, int top)
+ throws InstantiationException, IllegalAccessException {
+ checkNotNull(serviceName, "serviceName");
+ this.searchCondition = condition;
+ this.entityDef = EntityDefinitionManager.getEntityByServiceName(serviceName);
+ checkNotNull(entityDef, "EntityDefinition");
+ checkNotNull(entityDef, "GroupAggregateCondition");
+ this.aggregateCondition = aggregateCondition;
+ this.aggFuncNum = this.aggregateCondition.getAggregateFunctionTypes().size();
+ this.sortOptions = sortOptions;
+ this.sortFuncs = sortFunctionTypes;
+ this.sortFuncNum = this.sortOptions == null ? 0 : this.sortOptions.size();
+ this.sortFields = sortFields;
+ this.top = top;
+ this.serviceName = serviceName;
+
+ if (serviceName.equals(GenericMetricEntity.GENERIC_METRIC_SERVICE)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("list metric aggregate query");
+ }
+ if (metricName == null || metricName.isEmpty()) {
+ throw new IllegalArgumentException("metricName should not be empty for metric list query");
+ }
+ if (!condition.getOutputFields().contains(GenericMetricEntity.VALUE_FIELD)) {
+ condition.getOutputFields().add(GenericMetricEntity.VALUE_FIELD);
+ }
+ this.prefix = metricName;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("list entity aggregate query");
+ }
+ this.prefix = entityDef.getPrefix();
+ }
+
+ // Add sort oriented aggregation functions into aggregateCondtion
+ if (this.sortOptions != null) {
+ // if sort for time series aggregation
+ if (this.aggregateCondition.isTimeSeries()) {
+ this.sortAggFuncNum = 0;
+ int index = 0;
+ for (SortOption sortOption : this.sortOptions) {
+ if (!sortOption.isInGroupby()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Add additional aggregation functions for sort options " + sortOption.toString() + " in index: " + (this.aggFuncNum + this.sortAggFuncNum));
+ }
+ AggregateFunctionType _sortFunc = this.sortFuncs.get(index);
+ if (AggregateFunctionType.avg.equals(_sortFunc)) {
+ this.aggregateCondition.getAggregateFunctionTypes().add(AggregateFunctionType.sum);
+ } else {
+ this.aggregateCondition.getAggregateFunctionTypes().add(_sortFunc);
+ }
+ this.aggregateCondition.getAggregateFields().add(this.sortFields.get(index));
+
+ sortOption.setIndex(this.sortAggFuncNum);
+ sortAggFuncNum++;
+ }
+ index++;
+ }
+ }
+ }
+ }
+
+ private void checkNotNull(Object o, String message) {
+ if (o == null) {
+ throw new IllegalArgumentException(message + " should not be null");
+ }
+ }
+
+ @Override
+ public <T> List<T> result() throws Exception {
+ // non time-series based aggregate query, not hierarchical
+ final List<String> groupbyFields = aggregateCondition.getGroupbyFields();
+ final List<String> aggregateFields = aggregateCondition.getAggregateFields();
+ final List<String> filterFields = searchCondition.getOutputFields();
+ final List<String> outputFields = new ArrayList<>();
+ if (groupbyFields != null) {
+ outputFields.addAll(groupbyFields);
+ }
+ if (filterFields != null) {
+ outputFields.addAll(filterFields);
+ }
+ if (sortFields != null) {
+ outputFields.addAll(sortFields);
+ }
+ outputFields.addAll(aggregateFields);
+ searchCondition.setOutputFields(outputFields);
+
+ if (searchCondition.isOutputAll()) {
+ LOG.info("Output: ALL");
+ } else {
+ LOG.info("Output: " + StringUtils.join(searchCondition.getOutputFields(), ", "));
+ }
+
+ if (!this.aggregateCondition.isTimeSeries()) {
+ FlatAggregator agg = new FlatAggregator(groupbyFields, aggregateCondition.getAggregateFunctionTypes(), aggregateCondition.getAggregateFields());
+ StreamReader reader = null;
+ if (this.entityDef.getMetricDefinition() == null) {
+ reader = new GenericEntityStreamReader(serviceName, searchCondition);
+ } else { // metric aggregation need metric reader
+ reader = new GenericMetricEntityDecompactionStreamReader(this.prefix, searchCondition);
+ }
+ reader.register(agg);
+ reader.readAsStream();
+ ArrayList<Map.Entry<List<String>, List<Double>>> obj = new ArrayList<>();
+ obj.addAll(agg.result().entrySet());
+ this.firstTimestamp = reader.getFirstTimestamp();
+ this.lastTimestamp = reader.getLastTimestamp();
+ if (this.sortOptions == null) {
+ return (List<T>) obj;
+ } else { // has sort options
+ return (List<T>) PostFlatAggregateSort.sort(agg.result(), this.sortOptions, top);
+ }
+ } else {
+ StreamReader reader;
+ if (entityDef.getMetricDefinition() == null) {
+ reader = new GenericEntityStreamReader(serviceName, searchCondition);
+ } else {
+ reader = new GenericMetricEntityDecompactionStreamReader(this.prefix, searchCondition);
+ }
+ TimeSeriesAggregator tsAgg = new TimeSeriesAggregator(groupbyFields,
+ aggregateCondition.getAggregateFunctionTypes(), aggregateFields,
+ DateTimeUtil.humanDateToDate(searchCondition.getStartTime()).getTime(),
+ DateTimeUtil.humanDateToDate(searchCondition.getEndTime()).getTime(), aggregateCondition.getIntervalMS() * 60 * 1000);
+ reader.register(tsAgg);
+
+ // for sorting
+ FlatAggregator sortAgg = null;
+ if (sortOptions != null) {
+ sortAgg = new FlatAggregator(groupbyFields, sortFuncs, sortFields);
+ reader.register(sortAgg);
+ }
+ reader.readAsStream();
+ ArrayList<Map.Entry<List<String>, List<double[]>>> obj = new ArrayList<Map.Entry<List<String>, List<double[]>>>();
+ obj.addAll(tsAgg.getMetric().entrySet());
+
+ this.firstTimestamp = reader.getFirstTimestamp();
+ this.lastTimestamp = reader.getLastTimestamp();
+ if (sortOptions == null) {
+ return (List<T>) obj;
+ } else { // has sort options
+ return (List<T>) TimeSeriesPostFlatAggregateSort.sort(sortAgg.result(), tsAgg.getMetric(), this.sortOptions, top);
+ }
+ }
+ }
+
+ @Override
+ public long getLastTimestamp() {
+ return this.firstTimestamp;
+ }
+
+ @Override
+ public long getFirstTimeStamp() {
+ return this.lastTimestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateClient.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateClient.java
index cd6a5b9..828da44 100755
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateClient.java
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateClient.java
@@ -25,35 +25,27 @@ import java.io.IOException;
import java.util.List;
/**
- * Coprocessor-based Aggregation Universal Client Interface
+ * Coprocessor-based Aggregation Universal Client Interface.
*
* <h2>Flat or RAW Aggregation:</h2>
- * <pre>
- * AggregateResult aggregate( HTableInterface table, String serviceName, Scan scan, List<String> groupbyFields, List<AggregateFunctionType> aggregateFuncTypes, List<String> aggregatedFields) throws IOException
- * </pre>
- *
+ * @see #aggregate(HTableInterface, EntityDefinition, Scan, List, List, List)
* <h2>Time Series Aggregation:</h2>
- * <pre>
- * AggregateResult aggregate(HTableInterface table, String serviceName, Scan scan, List<String> groupbyFields, List<AggregateFunctionType> aggregateFuncTypes, List<String> aggregatedFields, boolean timeSeries, long startTime, long endTime, long intervalMin) throws IOException
- * </pre>
- * @since : 11/3/14,2014
+ * @see #aggregate(HTableInterface, EntityDefinition, Scan, List, List, List, boolean, long, long, long)
+ *
*/
-public interface AggregateClient
-{
+public interface AggregateClient {
- /**
- * Flat Aggregation
- *
- *
- * @param table HTable connections
- * @param scan HBase Scan
- * @param groupbyFields Grouped by fields name
- * @param aggregateFuncTypes Aggregate function types
- * @param aggregatedFields Aggregate field names
- * @return Return AggregateResult
- * @throws Exception
- */
- AggregateResult aggregate(final HTableInterface table, // HTable connections
+ /**
+ * Flat Aggregation.
+ *
+ * @param table HTable connections
+ * @param scan HBase Scan
+ * @param groupbyFields Grouped by fields name
+ * @param aggregateFuncTypes Aggregate function types
+ * @param aggregatedFields Aggregate field names
+ * @return Return AggregateResult
+ */
+ AggregateResult aggregate(final HTableInterface table, // HTable connections
final EntityDefinition entityDefinition, // Eagle service name
final Scan scan, // HBase Scan
final List<String> groupbyFields, // Grouped by fields name
@@ -61,21 +53,20 @@ public interface AggregateClient
final List<String> aggregatedFields // Aggregate field names
) throws IOException;
- /**
- * Time Series Aggregation
- *
- * @param table HTable connections
- * @param entityDefinition Eagle EntityDefinition
- * @param scan HBase Scan
- * @param groupbyFields Grouped by fields name
- * @param aggregateFuncTypes Aggregate function types
- * @param aggregatedFields Aggregate field names
- * @param timeSeries Is time series aggregations?
- * @param intervalMin The interval in minutes if it's time series aggregation
- * @return Return AggregateResult
- * @throws Exception
- */
- AggregateResult aggregate(final HTableInterface table, // HTable connections
+ /**
+ * Time Series Aggregation.
+ *
+ * @param table HTable connections
+ * @param entityDefinition Eagle EntityDefinition
+ * @param scan HBase Scan
+ * @param groupbyFields Grouped by fields name
+ * @param aggregateFuncTypes Aggregate function types
+ * @param aggregatedFields Aggregate field names
+ * @param timeSeries Is time series aggregations?
+ * @param intervalMin The interval in minutes if it's time series aggregation
+ * @return Return AggregateResult
+ */
+ AggregateResult aggregate(final HTableInterface table, // HTable connections
final EntityDefinition entityDefinition, // Eagle service name
final Scan scan, // HBase Scan
final List<String> groupbyFields, // Grouped by fields name
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateProtocol.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateProtocol.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateProtocol.java
index fe2c414..eb85046 100755
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateProtocol.java
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateProtocol.java
@@ -29,35 +29,32 @@ import java.util.List;
*/
public interface AggregateProtocol {
- /**
- *
- *
- * @param entityDefinition
- * @param scan
- * @param groupbyFields
- * @param aggregateFuncTypes
- * @param aggregatedFields
- * @return AggregateResult
- * @throws java.io.IOException
- */
- AggregateResult aggregate(EntityDefinition entityDefinition,
+ /**
+ * @param entityDefinition
+ * @param scan
+ * @param groupbyFields
+ * @param aggregateFuncTypes
+ * @param aggregatedFields
+ * @return AggregateResult
+ * @throws java.io.IOException
+ */
+ AggregateResult aggregate(EntityDefinition entityDefinition,
Scan scan,
List<String> groupbyFields,
List<byte[]> aggregateFuncTypes,
List<String> aggregatedFields) throws IOException;
- /**
- *
- * @param entityDefinition
- * @param scan
- * @param groupbyFields
- * @param aggregateFuncTypes
- * @param aggregatedFields
- * @param intervalMin
- * @return AggregateResult
- * @throws java.io.IOException
- */
- AggregateResult aggregate(EntityDefinition entityDefinition,
+ /**
+ * @param entityDefinition
+ * @param scan
+ * @param groupbyFields
+ * @param aggregateFuncTypes
+ * @param aggregatedFields
+ * @param intervalMin
+ * @return AggregateResult
+ * @throws java.io.IOException
+ */
+ AggregateResult aggregate(EntityDefinition entityDefinition,
Scan scan,
List<String> groupbyFields,
List<byte[]> aggregateFuncTypes,