You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ja...@apache.org on 2018/02/07 07:05:49 UTC
[5/7] eagle git commit: [EAGLE-1080] Fix checkstyle errors in the
eagle-query-base module
http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/FlatAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/FlatAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/FlatAggregator.java
index e12fea3..15e89ad 100755
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/FlatAggregator.java
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/FlatAggregator.java
@@ -26,36 +26,36 @@ import org.apache.eagle.query.aggregate.AggregateFunctionType;
/**
* Not thread safe
*/
-public class FlatAggregator extends AbstractAggregator{
- protected GroupbyBucket bucket;
+public class FlatAggregator extends AbstractAggregator {
+ protected GroupbyBucket bucket;
/**
* @param groupbyFields
* @param aggregateFuntionTypes
* @param aggregatedFields
*/
- public FlatAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields){
- super(groupbyFields, aggregateFuntionTypes, aggregatedFields);
- bucket = new GroupbyBucket(this.aggregateFunctionTypes);
- }
-
- public void accumulate(TaggedLogAPIEntity entity) throws Exception{
- List<String> groupbyFieldValues = createGroup(entity);
- List<Double> preAggregatedValues = createPreAggregatedValues(entity);
- bucket.addDatapoint(groupbyFieldValues, preAggregatedValues);
- }
-
- public Map<List<String>, List<Double>> result(){
- return bucket.result();
- }
-
- protected List<String> createGroup(TaggedLogAPIEntity entity){
- List<String> groupbyFieldValues = new ArrayList<String>();
- int i = 0;
- for(String groupbyField : groupbyFields){
- String groupbyFieldValue = determineGroupbyFieldValue(entity, groupbyField, i++);
- groupbyFieldValues.add(groupbyFieldValue);
- }
- return groupbyFieldValues;
- }
+ public FlatAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields) {
+ super(groupbyFields, aggregateFuntionTypes, aggregatedFields);
+ bucket = new GroupbyBucket(this.aggregateFunctionTypes);
+ }
+
+ public void accumulate(TaggedLogAPIEntity entity) throws Exception {
+ List<String> groupbyFieldValues = createGroup(entity);
+ List<Double> preAggregatedValues = createPreAggregatedValues(entity);
+ bucket.addDatapoint(groupbyFieldValues, preAggregatedValues);
+ }
+
+ public Map<List<String>, List<Double>> result() {
+ return bucket.result();
+ }
+
+ protected List<String> createGroup(TaggedLogAPIEntity entity) {
+ List<String> groupbyFieldValues = new ArrayList<String>();
+ int i = 0;
+ for (String groupbyField : groupbyFields) {
+ String groupbyFieldValue = determineGroupbyFieldValue(entity, groupbyField, i++);
+ groupbyFieldValues.add(groupbyFieldValue);
+ }
+ return groupbyFieldValues;
+ }
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyBucket.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyBucket.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyBucket.java
index ea57edb..93e65a9 100755
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyBucket.java
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyBucket.java
@@ -31,211 +31,230 @@ import java.util.List;
import java.util.Map;
public class GroupbyBucket {
- private final static Logger LOG = LoggerFactory.getLogger(GroupbyBucket.class);
-
- public static Map<String, FunctionFactory> _functionFactories =
- new HashMap<>();
-
- // TODO put this logic to AggregatorFunctionType
- static{
- _functionFactories.put(AggregateFunctionType.count.name(), new CountFactory());
- _functionFactories.put(AggregateFunctionType.sum.name(), new SumFactory());
- _functionFactories.put(AggregateFunctionType.min.name(), new MinFactory());
- _functionFactories.put(AggregateFunctionType.max.name(), new MaxFactory());
- _functionFactories.put(AggregateFunctionType.avg.name(), new AvgFactory());
- }
-
- private List<AggregateFunctionType> types;
-// private SortedMap<List<String>, List<Function>> group2FunctionMap =
-// new TreeMap<List<String>, List<Function>>(new GroupbyFieldsComparator());
-
- private Map<List<String>, List<Function>> group2FunctionMap = new HashMap<>(); //new GroupbyFieldsComparator());
-
- public GroupbyBucket(List<AggregateFunctionType> types){
- this.types = types;
- }
-
- public void addDatapoint(List<String> groupbyFieldValues, List<Double> values){
- // LOG.info("DEBUG: addDatapoint: groupby=["+StringUtils.join(groupbyFieldValues,",")+"], values=["+StringUtils.join(values, ",")+"]");
-
- // locate groupby bucket
- List<Function> functions = group2FunctionMap.get(groupbyFieldValues);
- if(functions == null){
- functions = new ArrayList<Function>();
- for(AggregateFunctionType type : types){
- functions.add(_functionFactories.get(type.name()).createFunction());
- }
- group2FunctionMap.put(groupbyFieldValues, functions);
- }
- int functionIndex = 0;
- for(Double v : values){
- functions.get(functionIndex).run(v);
- functionIndex++;
- }
- }
-
- public Map<List<String>, List<Double>> result(){
- Map<List<String>, List<Double>> result = new HashMap<List<String>, List<Double>>();
- for(Map.Entry<List<String>, List<Function>> entry : this.group2FunctionMap.entrySet()){
- List<Double> values = new ArrayList<Double>();
- for(Function f : entry.getValue()){
- values.add(f.result());
- }
- result.put(entry.getKey(), values);
- }
- return result;
- }
-
- public List<GroupbyKeyValue> getGroupbyKeyValue(){
- List<GroupbyKeyValue> results = new ArrayList<GroupbyKeyValue>();
-
- for(Map.Entry<List<String>, List<Function>> entry : this.group2FunctionMap.entrySet()){
- GroupbyKey key = new GroupbyKey();
- for(String keyStr:entry.getKey()){
- try {
- key.addValue(keyStr.getBytes(QueryConstants.CHARSET));
- } catch (UnsupportedEncodingException e) {
- LOG.error(e.getMessage(),e);
- }
- }
- GroupbyValue value = new GroupbyValue();
- for(Function f : entry.getValue()){
- value.add(f.result());
- value.addMeta(f.count());
- }
- results.add(new GroupbyKeyValue(key,value));
- }
-
- return results;
- }
-
- public static interface FunctionFactory{
- public Function createFunction();
- }
-
- public static abstract class Function{
- protected int count;
-
- public abstract void run(double v);
- public abstract double result();
- public int count(){
- return count;
- }
- public void incrCount(){
- count ++;
- }
- }
-
- private static class CountFactory implements FunctionFactory{
- @Override
- public Function createFunction(){
- return new Count();
- }
- }
-
-
- private static class Count extends Sum{
- public Count(){
- super();
- }
- }
-
- private static class SumFactory implements FunctionFactory{
- @Override
- public Function createFunction(){
- return new Sum();
- }
- }
-
- private static class Sum extends Function{
- private double summary;
- public Sum(){
- this.summary = 0.0;
- }
- @Override
- public void run(double v){
- this.incrCount();
- this.summary += v;
- }
-
- @Override
- public double result(){
- return this.summary;
- }
- }
-
- private static class MinFactory implements FunctionFactory{
- @Override
- public Function createFunction(){
- return new Min();
- }
- }
- public static class Min extends Function{
- private double minimum;
- public Min(){
- // TODO is this a bug, or only positive numeric calculation is supported
- this.minimum = Double.MAX_VALUE;
- }
-
- @Override
- public void run(double v){
- if(v < minimum){
- minimum = v;
- }
- this.incrCount();
- }
-
- @Override
- public double result(){
- return minimum;
- }
- }
-
- private static class MaxFactory implements FunctionFactory{
- @Override
- public Function createFunction(){
- return new Max();
- }
- }
- public static class Max extends Function{
- private double maximum;
- public Max(){
- // TODO is this a bug, or only positive numeric calculation is supported
- this.maximum = 0.0;
- }
- @Override
- public void run(double v){
- if(v > maximum){
- maximum = v;
- }
- this.incrCount();
- }
-
- @Override
- public double result(){
- return maximum;
- }
- }
-
- private static class AvgFactory implements FunctionFactory{
- @Override
- public Function createFunction(){
- return new Avg();
- }
- }
- public static class Avg extends Function{
- private double total;
- public Avg(){
- this.total = 0.0;
- }
- @Override
- public void run(double v){
- total += v;
- this.incrCount();
- }
- @Override
- public double result(){
- return this.total/this.count;
- }
- }
+ private static final Logger LOG = LoggerFactory.getLogger(GroupbyBucket.class);
+
+ public static Map<String, FunctionFactory> functionFactories = new HashMap<>();
+
+ // TODO put this logic to AggregatorFunctionType
+ static {
+ functionFactories.put(AggregateFunctionType.count.name(), new CountFactory());
+ functionFactories.put(AggregateFunctionType.sum.name(), new SumFactory());
+ functionFactories.put(AggregateFunctionType.min.name(), new MinFactory());
+ functionFactories.put(AggregateFunctionType.max.name(), new MaxFactory());
+ functionFactories.put(AggregateFunctionType.avg.name(), new AvgFactory());
+ }
+
+ private List<AggregateFunctionType> types;
+ // private SortedMap<List<String>, List<Function>> group2FunctionMap =
+ // new TreeMap<List<String>, List<Function>>(new GroupbyFieldsComparator());
+
+ private Map<List<String>, List<Function>> group2FunctionMap = new HashMap<>(); //new GroupbyFieldsComparator());
+
+ public GroupbyBucket(List<AggregateFunctionType> types) {
+ this.types = types;
+ }
+
+ public void addDatapoint(List<String> groupbyFieldValues, List<Double> values) {
+ // LOG.info("DEBUG: addDatapoint: groupby =["+StringUtils.join(groupbyFieldValues,",")+"], values=["+StringUtils.join(values, ",")+"]");
+
+ // locate groupby bucket
+ List<Function> functions = group2FunctionMap.get(groupbyFieldValues);
+ if (functions == null) {
+ functions = new ArrayList<Function>();
+ for (AggregateFunctionType type : types) {
+ functions.add(functionFactories.get(type.name()).createFunction());
+ }
+ group2FunctionMap.put(groupbyFieldValues, functions);
+ }
+ int functionIndex = 0;
+ for (Double v : values) {
+ functions.get(functionIndex).run(v);
+ functionIndex++;
+ }
+ }
+
+ public Map<List<String>, List<Double>> result() {
+ Map<List<String>, List<Double>> result = new HashMap<List<String>, List<Double>>();
+ for (Map.Entry<List<String>, List<Function>> entry : this.group2FunctionMap.entrySet()) {
+ List<Double> values = new ArrayList<Double>();
+ for (Function f : entry.getValue()) {
+ values.add(f.result());
+ }
+ result.put(entry.getKey(), values);
+ }
+ return result;
+ }
+
+ public List<GroupbyKeyValue> getGroupbyKeyValue() {
+ List<GroupbyKeyValue> results = new ArrayList<GroupbyKeyValue>();
+
+ for (Map.Entry<List<String>, List<Function>> entry : this.group2FunctionMap.entrySet()) {
+ GroupbyKey key = new GroupbyKey();
+ for (String keyStr:entry.getKey()) {
+ try {
+ key.addValue(keyStr.getBytes(QueryConstants.CHARSET));
+ } catch (UnsupportedEncodingException e) {
+ LOG.error(e.getMessage(),e);
+ }
+ }
+ GroupbyValue value = new GroupbyValue();
+ for (Function f : entry.getValue()) {
+ value.add(f.result());
+ value.addMeta(f.count());
+ }
+ results.add(new GroupbyKeyValue(key,value));
+ }
+
+ return results;
+ }
+
+ public static interface FunctionFactory {
+ public Function createFunction();
+ }
+
+ public abstract static class Function {
+ protected int count;
+
+ public abstract void run(double v);
+
+ public abstract double result();
+
+ public int count() {
+ return count;
+ }
+
+ public void incrCount() {
+ count ++;
+ }
+ }
+
+ private static class CountFactory implements FunctionFactory {
+
+ @Override
+ public Function createFunction() {
+ return new Count();
+ }
+ }
+
+
+ private static class Count extends Sum {
+
+ public Count() {
+ super();
+ }
+ }
+
+ private static class SumFactory implements FunctionFactory {
+
+ @Override
+ public Function createFunction() {
+ return new Sum();
+ }
+ }
+
+ private static class Sum extends Function {
+ private double summary;
+
+ public Sum() {
+ this.summary = 0.0;
+ }
+
+ @Override
+ public void run(double v) {
+ this.incrCount();
+ this.summary += v;
+ }
+
+ @Override
+ public double result() {
+ return this.summary;
+ }
+ }
+
+ private static class MinFactory implements FunctionFactory {
+
+ @Override
+ public Function createFunction() {
+ return new Min();
+ }
+ }
+
+ public static class Min extends Function {
+ private double minimum;
+
+ public Min() {
+ // TODO is this a bug, or only positive numeric calculation is supported
+ this.minimum = Double.MAX_VALUE;
+ }
+
+ @Override
+ public void run(double v) {
+ if (v < minimum) {
+ minimum = v;
+ }
+ this.incrCount();
+ }
+
+ @Override
+ public double result() {
+ return minimum;
+ }
+ }
+
+ private static class MaxFactory implements FunctionFactory {
+
+ @Override
+ public Function createFunction() {
+ return new Max();
+ }
+ }
+
+ public static class Max extends Function {
+ private double maximum;
+
+ public Max() {
+ // TODO is this a bug, or only positive numeric calculation is supported
+ this.maximum = 0.0;
+ }
+
+ @Override
+ public void run(double v) {
+ if (v > maximum) {
+ maximum = v;
+ }
+ this.incrCount();
+ }
+
+ @Override
+ public double result() {
+ return maximum;
+ }
+ }
+
+ private static class AvgFactory implements FunctionFactory {
+
+ @Override
+ public Function createFunction() {
+ return new Avg();
+ }
+ }
+
+ public static class Avg extends Function {
+ private double total;
+
+ public Avg() {
+ this.total = 0.0;
+ }
+
+ @Override
+ public void run(double v) {
+ total += v;
+ this.incrCount();
+ }
+
+ @Override
+ public double result() {
+ return this.total / this.count;
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyFieldsComparator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyFieldsComparator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyFieldsComparator.java
index 6635483..b612ccf 100644
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyFieldsComparator.java
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyFieldsComparator.java
@@ -22,18 +22,21 @@ import java.util.List;
/**
* this is default comparator for aggregation. The behavior is to sort by groupby fields ascendantly
*/
-public class GroupbyFieldsComparator implements Comparator<List<String>>{
- @Override
- public int compare(List<String> list1, List<String> list2){
- if(list1 == null || list2 == null || list1.size() != list2.size())
- throw new IllegalArgumentException("2 list of groupby fields must be non-null and have the same size");
- int r = 0;
- int index = 0;
- for(String s1 : list1){
- r = s1.compareTo(list2.get(index++));
- if(r != 0)
- return r;
- }
- return r;
- }
+public class GroupbyFieldsComparator implements Comparator<List<String>> {
+
+ @Override
+ public int compare(List<String> list1, List<String> list2) {
+ if (list1 == null || list2 == null || list1.size() != list2.size()) {
+ throw new IllegalArgumentException("2 list of groupby fields must be non-null and have the same size");
+ }
+ int r = 0;
+ int index = 0;
+ for (String s1 : list1) {
+ r = s1.compareTo(list2.get(index++));
+ if (r != 0) {
+ return r;
+ }
+ }
+ return r;
+ }
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregateEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregateEntity.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregateEntity.java
index 341fa00..559061b 100644
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregateEntity.java
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregateEntity.java
@@ -25,43 +25,52 @@ import java.util.TreeMap;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
public class HierarchicalAggregateEntity {
- private String key;
- private List<GroupbyBucket.Function> tmpValues = new ArrayList<GroupbyBucket.Function>();
- private List<Double> values = new ArrayList<Double>();
- private SortedMap<String, HierarchicalAggregateEntity> children = new TreeMap<String, HierarchicalAggregateEntity>();
- private SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedList = null;
+ private String key;
+ private List<GroupbyBucket.Function> tmpValues = new ArrayList<GroupbyBucket.Function>();
+ private List<Double> values = new ArrayList<Double>();
+ private SortedMap<String, HierarchicalAggregateEntity> children = new TreeMap<String, HierarchicalAggregateEntity>();
+ private SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedList = null;
- public SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> getSortedList() {
- return sortedList;
- }
- public void setSortedList(
- SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedList) {
- this.sortedList = sortedList;
- }
- public List<GroupbyBucket.Function> getTmpValues() {
- return tmpValues;
- }
- public void setTmpValues(List<GroupbyBucket.Function> tmpValues) {
- this.tmpValues = tmpValues;
- }
- public String getKey() {
- return key;
- }
- public void setKey(String key) {
- this.key = key;
- }
- public List<Double> getValues() {
- return values;
- }
- public void setValues(List<Double> values) {
- this.values = values;
- }
- public SortedMap<String, HierarchicalAggregateEntity> getChildren() {
- return children;
- }
- public void setChildren(SortedMap<String, HierarchicalAggregateEntity> children) {
- this.children = children;
- }
+ public SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> getSortedList() {
+ return sortedList;
+ }
+
+ public void setSortedList(
+ SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedList) {
+ this.sortedList = sortedList;
+ }
+
+ public List<GroupbyBucket.Function> getTmpValues() {
+ return tmpValues;
+ }
+
+ public void setTmpValues(List<GroupbyBucket.Function> tmpValues) {
+ this.tmpValues = tmpValues;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public void setKey(String key) {
+ this.key = key;
+ }
+
+ public List<Double> getValues() {
+ return values;
+ }
+
+ public void setValues(List<Double> values) {
+ this.values = values;
+ }
+
+ public SortedMap<String, HierarchicalAggregateEntity> getChildren() {
+ return children;
+ }
+
+ public void setChildren(SortedMap<String, HierarchicalAggregateEntity> children) {
+ this.children = children;
+ }
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregator.java
index ecb80ac..8751a74 100755
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregator.java
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregator.java
@@ -22,61 +22,61 @@ import java.util.SortedMap;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.query.aggregate.AggregateFunctionType;
-public class HierarchicalAggregator extends AbstractAggregator{
- private HierarchicalAggregateEntity root = new HierarchicalAggregateEntity();
+public class HierarchicalAggregator extends AbstractAggregator {
+ private HierarchicalAggregateEntity root = new HierarchicalAggregateEntity();
- public HierarchicalAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields){
- super(groupbyFields, aggregateFuntionTypes, aggregatedFields);
- }
+ public HierarchicalAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields) {
+ super(groupbyFields, aggregateFuntionTypes, aggregatedFields);
+ }
- public void accumulate(TaggedLogAPIEntity entity) throws Exception{
- List<Double> preAggregatedValues = createPreAggregatedValues(entity);
- // aggregate to root first
- addDatapoint(root, preAggregatedValues);
- // go through hierarchical tree
- HierarchicalAggregateEntity current = root;
- int i = 0;
- for(String groupbyField : groupbyFields){
- // determine groupbyFieldValue from tag or fields
- String groupbyFieldValue = determineGroupbyFieldValue(entity, groupbyField, i);
- SortedMap<String, HierarchicalAggregateEntity> children = current.getChildren();
- if(children.get(groupbyFieldValue) == null){
- HierarchicalAggregateEntity tmp = new HierarchicalAggregateEntity();
- children.put(groupbyFieldValue, tmp);
- }
- children.get(groupbyFieldValue).setKey(groupbyFieldValue);
- addDatapoint(children.get(groupbyFieldValue), preAggregatedValues);
- current = children.get(groupbyFieldValue);
- }
- }
+ public void accumulate(TaggedLogAPIEntity entity) throws Exception {
+ List<Double> preAggregatedValues = createPreAggregatedValues(entity);
+ // aggregate to root first
+ addDatapoint(root, preAggregatedValues);
+ // go through hierarchical tree
+ HierarchicalAggregateEntity current = root;
+ int i = 0;
+ for (String groupbyField : groupbyFields) {
+ // determine groupbyFieldValue from tag or fields
+ String groupbyFieldValue = determineGroupbyFieldValue(entity, groupbyField, i);
+ SortedMap<String, HierarchicalAggregateEntity> children = current.getChildren();
+ if (children.get(groupbyFieldValue) == null) {
+ HierarchicalAggregateEntity tmp = new HierarchicalAggregateEntity();
+ children.put(groupbyFieldValue, tmp);
+ }
+ children.get(groupbyFieldValue).setKey(groupbyFieldValue);
+ addDatapoint(children.get(groupbyFieldValue), preAggregatedValues);
+ current = children.get(groupbyFieldValue);
+ }
+ }
- private void addDatapoint(HierarchicalAggregateEntity entity, List<Double> values){
- List<GroupbyBucket.Function> functions = entity.getTmpValues();
- // initialize list of function
- if(functions.isEmpty()){
- for(AggregateFunctionType type : aggregateFunctionTypes){
- functions.add(GroupbyBucket._functionFactories.get(type.name()).createFunction());
- }
- }
- int functionIndex = 0;
- for(Double v : values){
- functions.get(functionIndex).run(v);
- functionIndex++;
- }
- }
+ private void addDatapoint(HierarchicalAggregateEntity entity, List<Double> values) {
+ List<GroupbyBucket.Function> functions = entity.getTmpValues();
+ // initialize list of function
+ if (functions.isEmpty()) {
+ for (AggregateFunctionType type : aggregateFunctionTypes) {
+ functions.add(GroupbyBucket.functionFactories.get(type.name()).createFunction());
+ }
+ }
+ int functionIndex = 0;
+ for (Double v : values) {
+ functions.get(functionIndex).run(v);
+ functionIndex++;
+ }
+ }
- private void finalizeHierarchicalAggregateEntity(HierarchicalAggregateEntity entity){
- for(GroupbyBucket.Function f : entity.getTmpValues()){
- entity.getValues().add(f.result());
- }
- for(HierarchicalAggregateEntity child : entity.getChildren().values()){
- finalizeHierarchicalAggregateEntity(child);
- }
- entity.setTmpValues(null);
- }
+ private void finalizeHierarchicalAggregateEntity(HierarchicalAggregateEntity entity) {
+ for (GroupbyBucket.Function f : entity.getTmpValues()) {
+ entity.getValues().add(f.result());
+ }
+ for (HierarchicalAggregateEntity child : entity.getChildren().values()) {
+ finalizeHierarchicalAggregateEntity(child);
+ }
+ entity.setTmpValues(null);
+ }
- public HierarchicalAggregateEntity result(){
- finalizeHierarchicalAggregateEntity(root);
- return this.root;
- }
+ public HierarchicalAggregateEntity result() {
+ finalizeHierarchicalAggregateEntity(root);
+ return this.root;
+ }
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostFlatAggregateSort.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostFlatAggregateSort.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostFlatAggregateSort.java
index f62d2c2..8ca24c6 100644
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostFlatAggregateSort.java
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostFlatAggregateSort.java
@@ -24,70 +24,74 @@ import java.util.SortedSet;
import java.util.TreeSet;
public class PostFlatAggregateSort {
- private static SortedSet<Map.Entry<List<String>, List<Double>>> sortByValue(Map<List<String>, List<Double>> map, List<SortOption> sortOptions) {
- SortedSet<Map.Entry<List<String>, List<Double>>> sortedEntries = new TreeSet<Map.Entry<List<String>, List<Double>>>(new MapEntryComparator(sortOptions));
- sortedEntries.addAll(map.entrySet());
- return sortedEntries;
- }
+ private static SortedSet<Map.Entry<List<String>, List<Double>>> sortByValue(Map<List<String>, List<Double>> map, List<SortOption> sortOptions) {
+ SortedSet<Map.Entry<List<String>, List<Double>>> sortedEntries = new TreeSet<Map.Entry<List<String>, List<Double>>>(new MapEntryComparator(sortOptions));
+ sortedEntries.addAll(map.entrySet());
+ return sortedEntries;
+ }
- /**
- * sort aggregated results with sort options
- * @param aggregatedResult aggregated result set, but it is not sorted
- * @sortOptions sorting options
- * @topN top N results will be returned if topN is specified. If it's not specified (as default value 0), all results will be returned
- */
- public static List<Map.Entry<List<String>, List<Double>>> sort(Map<List<String>, List<Double>> aggregatedResult, List<SortOption> sortOptions, int topN){
- SortedSet<Map.Entry<List<String>, List<Double>>> allList = sortByValue(aggregatedResult, sortOptions);
- List<Map.Entry<List<String>, List<Double>>> result = new ArrayList<Map.Entry<List<String>, List<Double>>>();
- for (Map.Entry<List<String>, List<Double>> entry : allList) {
- result.add(entry);
- if (topN > 0 && result.size() >= topN) {
- break;
- }
- }
- return result;
- }
+ /**
+ * sort aggregated results with sort options
+ * @param aggregatedResult aggregated result set, but it is not sorted
+ * @sortOptions sorting options
+ * @topN top N results will be returned if topN is specified. If it's not specified (as default value 0), all results will be returned
+ */
+ public static List<Map.Entry<List<String>, List<Double>>> sort(Map<List<String>, List<Double>> aggregatedResult, List<SortOption> sortOptions, int topN) {
+ SortedSet<Map.Entry<List<String>, List<Double>>> allList = sortByValue(aggregatedResult, sortOptions);
+ List<Map.Entry<List<String>, List<Double>>> result = new ArrayList<Map.Entry<List<String>, List<Double>>>();
+ for (Map.Entry<List<String>, List<Double>> entry : allList) {
+ result.add(entry);
+ if (topN > 0 && result.size() >= topN) {
+ break;
+ }
+ }
+ return result;
+ }
+
+ private static class MapEntryComparator implements Comparator<Map.Entry<List<String>, List<Double>>> {
+ private List<SortOption> sortOptions;
+
+ public MapEntryComparator(List<SortOption> sortOptions) {
+ this.sortOptions = sortOptions;
+ }
- private static class MapEntryComparator implements Comparator<Map.Entry<List<String>, List<Double>>>{
- private List<SortOption> sortOptions;
- public MapEntryComparator(List<SortOption> sortOptions){
- this.sortOptions = sortOptions;
- }
- /**
- * default to sort by all groupby fields
- */
- @Override
- public int compare(Map.Entry<List<String>, List<Double>> e1, Map.Entry<List<String>, List<Double>> e2){
- int r = 0;
- List<String> keyList1 = e1.getKey();
- List<Double> valueList1 = e1.getValue();
- List<String> keyList2 = e2.getKey();
- List<Double> valueList2 = e2.getValue();
- for(SortOption so : sortOptions){
- int index = so.getIndex();
- if (index == -1) {
- continue;
- }
- if(!so.isInGroupby()){ // sort fields come from functions
- Double value1 = valueList1.get(index);
- Double value2 = valueList2.get(index);
- r = value1.compareTo(value2);
- }else{ // sort fields come from groupby fields
- String key1 = keyList1.get(index);
- String key2 = keyList2.get(index);
- r = key1.compareTo(key2);
- }
- if(r == 0) continue;
- if(!so.isAscendant()){
- r = -r;
- }
- return r;
- }
- // default to sort by groupby fields ascendently
- if(r ==0){ // TODO is this check necessary
- return new GroupbyFieldsComparator().compare(keyList1, keyList2);
- }
- return r;
+ /**
+ * default to sort by all groupby fields
+ */
+ @Override
+ public int compare(Map.Entry<List<String>, List<Double>> e1, Map.Entry<List<String>, List<Double>> e2) {
+ int r = 0;
+ List<String> keyList1 = e1.getKey();
+ List<Double> valueList1 = e1.getValue();
+ List<String> keyList2 = e2.getKey();
+ List<Double> valueList2 = e2.getValue();
+ for (SortOption so : sortOptions) {
+ int index = so.getIndex();
+ if (index == -1) {
+ continue;
+ }
+ if (!so.isInGroupby()) { // sort fields come from functions
+ Double value1 = valueList1.get(index);
+ Double value2 = valueList2.get(index);
+ r = value1.compareTo(value2);
+ } else { // sort fields come from groupby fields
+ String key1 = keyList1.get(index);
+ String key2 = keyList2.get(index);
+ r = key1.compareTo(key2);
+ }
+ if (r == 0) {
+ continue;
+ }
+ if (!so.isAscendant()) {
+ r = -r;
+ }
+ return r;
+ }
+ // default to sort by groupby fields ascendently
+ if (r == 0) { // TODO is this check necessary
+ return new GroupbyFieldsComparator().compare(keyList1, keyList2);
+ }
+ return r;
}
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostHierarchicalAggregateSort.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostHierarchicalAggregateSort.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostHierarchicalAggregateSort.java
index 7b0997b..bd475f9 100644
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostHierarchicalAggregateSort.java
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostHierarchicalAggregateSort.java
@@ -24,69 +24,71 @@ import java.util.TreeSet;
public class PostHierarchicalAggregateSort {
- private static SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortByValue(HierarchicalAggregateEntity entity, List<SortOption> sortOptions) {
- SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedEntries = new TreeSet<Map.Entry<String, HierarchicalAggregateEntity>>(new MapEntryComparator(sortOptions));
- sortedEntries.addAll(entity.getChildren().entrySet());
- return sortedEntries;
- }
+ private static SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortByValue(HierarchicalAggregateEntity entity, List<SortOption> sortOptions) {
+ SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedEntries = new TreeSet<Map.Entry<String, HierarchicalAggregateEntity>>(new MapEntryComparator(sortOptions));
+ sortedEntries.addAll(entity.getChildren().entrySet());
+ return sortedEntries;
+ }
- /**
- * sort aggregated results with sort options
+ /**
+ * sort aggregated results with sort options
*
* @param result
* @param sortOptions
* @return
*/
- public static HierarchicalAggregateEntity sort(HierarchicalAggregateEntity result, List<SortOption> sortOptions){
- SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> tmp = sortByValue(result, sortOptions);
- result.setSortedList(tmp);
- result.setChildren(null);
- for(Map.Entry<String, HierarchicalAggregateEntity> entry : tmp){
- sort(entry.getValue(), sortOptions);
- }
- return result;
- }
+ public static HierarchicalAggregateEntity sort(HierarchicalAggregateEntity result, List<SortOption> sortOptions) {
+ SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> tmp = sortByValue(result, sortOptions);
+ result.setSortedList(tmp);
+ result.setChildren(null);
+ for (Map.Entry<String, HierarchicalAggregateEntity> entry : tmp) {
+ sort(entry.getValue(), sortOptions);
+ }
+ return result;
+ }
+
+ private static class MapEntryComparator implements Comparator<Map.Entry<String, HierarchicalAggregateEntity>> {
+ private List<SortOption> sortOptions;
- private static class MapEntryComparator implements Comparator<Map.Entry<String, HierarchicalAggregateEntity>>{
- private List<SortOption> sortOptions;
+ public MapEntryComparator(List<SortOption> sortOptions) {
+ this.sortOptions = sortOptions;
+ }
- public MapEntryComparator(List<SortOption> sortOptions){
- this.sortOptions = sortOptions;
- }
+ /**
+ * default to sort by all groupby fields
+ */
+ @Override
+ public int compare(Map.Entry<String, HierarchicalAggregateEntity> e1, Map.Entry<String, HierarchicalAggregateEntity> e2) {
+ int r = 0;
+ String key1 = e1.getKey();
+ List<Double> valueList1 = e1.getValue().getValues();
+ String key2 = e2.getKey();
+ List<Double> valueList2 = e2.getValue().getValues();
+ for (SortOption so : sortOptions) {
+ int index = so.getIndex();
+ if (index == -1) {
+ continue;
+ }
+ if (!so.isInGroupby()) { // sort fields come from functions
+ Double value1 = valueList1.get(index);
+ Double value2 = valueList2.get(index);
+ r = value1.compareTo(value2);
+ }
+ // sort fields come from groupby fields, then silently ignored
- /**
- * default to sort by all groupby fields
- */
- @Override
- public int compare(Map.Entry<String, HierarchicalAggregateEntity> e1, Map.Entry<String, HierarchicalAggregateEntity> e2){
- int r = 0;
- String key1 = e1.getKey();
- List<Double> valueList1 = e1.getValue().getValues();
- String key2 = e2.getKey();
- List<Double> valueList2 = e2.getValue().getValues();
- for(SortOption so : sortOptions){
- int index = so.getIndex();
- if (index == -1) {
- continue;
- }
- if(!so.isInGroupby()){ // sort fields come from functions
- Double value1 = valueList1.get(index);
- Double value2 = valueList2.get(index);
- r = value1.compareTo(value2);
- }
- // sort fields come from groupby fields, then silently ignored
-
- if(r == 0) continue;
- if(!so.isAscendant()){
- r = -r;
- }
- return r;
- }
- // default to sort by groupby fields ascendently
- if(r ==0){
- return key1.compareTo(key2);
- }
- return r;
+ if (r == 0) {
+ continue;
+ }
+ if (!so.isAscendant()) {
+ r = -r;
+ }
+ return r;
+ }
+ // default to sort by groupby fields ascendently
+ if (r == 0) {
+ return key1.compareTo(key2);
+ }
+ return r;
}
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOption.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOption.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOption.java
index d1578ac..c848122 100644
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOption.java
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOption.java
@@ -19,31 +19,36 @@ package org.apache.eagle.query.aggregate.timeseries;
/**
* sum(field1), max(field2) groupby(field3, field4) sort by field1 asc, field3 desc
* There are 2 SortOption object, then
- * the 1st one is inGroupby=false, index=0, ascendent=true
- * the 2nd one is inGroupby=true, index=1, ascendent=false
+ * the 1st one is inGroupby = false, index=0, ascendent=true
+ * the 2nd one is inGroupby = true, index=1, ascendent=false
*
*/
public class SortOption {
- private boolean inGroupby; // sort field defaultly is not from groupby fields
- private int index; // index relative to list of groupby fields or list of functions
- private boolean ascendant; //asc or desc
+ private boolean inGroupby; // sort field defaultly is not from groupby fields
+ private int index; // index relative to list of groupby fields or list of functions
+ private boolean ascendant; //asc or desc
- public boolean isInGroupby() {
- return inGroupby;
- }
- public void setInGroupby(boolean inGroupby) {
- this.inGroupby = inGroupby;
- }
- public int getIndex() {
- return index;
- }
- public void setIndex(int index) {
- this.index = index;
- }
- public boolean isAscendant() {
- return ascendant;
- }
- public void setAscendant(boolean ascendant) {
- this.ascendant = ascendant;
- }
+ public boolean isInGroupby() {
+ return inGroupby;
+ }
+
+ public void setInGroupby(boolean inGroupby) {
+ this.inGroupby = inGroupby;
+ }
+
+ public int getIndex() {
+ return index;
+ }
+
+ public void setIndex(int index) {
+ this.index = index;
+ }
+
+ public boolean isAscendant() {
+ return ascendant;
+ }
+
+ public void setAscendant(boolean ascendant) {
+ this.ascendant = ascendant;
+ }
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOptionsParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOptionsParser.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOptionsParser.java
index 1360e0c..2457b4e 100644
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOptionsParser.java
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOptionsParser.java
@@ -25,45 +25,45 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SortOptionsParser {
- private static final Logger LOG = LoggerFactory.getLogger(SortOptionsParser.class);
- private static Pattern pattern = Pattern.compile("^(.+)\\s+(asc|desc)$");
-
- public static List<SortOption> parse(List<String> groupbyFields, List<String> aggregatedFields, List<String> sortOptions, List<String> sortFields){
- List<SortOption> list = new ArrayList<SortOption>();
- for(String sortOption : sortOptions){
- Matcher m = pattern.matcher(sortOption);
- if(!m.find()){
- throw new IllegalArgumentException("sort option must have the format of <groupbyfield|function> asc|desc");
- }
- String field = m.group(1);
- if (sortFields != null) {
- sortFields.add(field);
- }
- SortOption so = new SortOption();
- list.add(so);
- so.setAscendant(m.group(2).equals("asc") ? true : false);
- int index = aggregatedFields.indexOf(field);
- if(index > -1){
- so.setInGroupby(false);
- so.setIndex(index);
- continue;
- }
- if(groupbyFields != null){ // if groupbyFields is not provided, ignore this sort field
- index = groupbyFields.indexOf(field);
- if(index > -1){
- so.setInGroupby(true);
- so.setIndex(index);
- continue;
- }
- }
- logNonExistingSortByField(field);
- so.setInGroupby(false);
- so.setIndex(-1);
- }
- return list;
- }
-
- private static void logNonExistingSortByField(String sortByField){
- LOG.warn("Sortby field is neither in aggregated fields or groupby fields, ignore " + sortByField);
- }
+ private static final Logger LOG = LoggerFactory.getLogger(SortOptionsParser.class);
+ private static Pattern pattern = Pattern.compile("^(.+)\\s+(asc|desc)$");
+
+ public static List<SortOption> parse(List<String> groupbyFields, List<String> aggregatedFields, List<String> sortOptions, List<String> sortFields) {
+ List<SortOption> list = new ArrayList<SortOption>();
+ for (String sortOption : sortOptions) {
+ Matcher m = pattern.matcher(sortOption);
+ if (!m.find()) {
+ throw new IllegalArgumentException("sort option must have the format of <groupbyfield|function> asc|desc");
+ }
+ String field = m.group(1);
+ if (sortFields != null) {
+ sortFields.add(field);
+ }
+ SortOption so = new SortOption();
+ list.add(so);
+ so.setAscendant(m.group(2).equals("asc") ? true : false);
+ int index = aggregatedFields.indexOf(field);
+ if (index > -1) {
+ so.setInGroupby(false);
+ so.setIndex(index);
+ continue;
+ }
+ if (groupbyFields != null) { // if groupbyFields is not provided, ignore this sort field
+ index = groupbyFields.indexOf(field);
+ if (index > -1) {
+ so.setInGroupby(true);
+ so.setIndex(index);
+ continue;
+ }
+ }
+ logNonExistingSortByField(field);
+ so.setInGroupby(false);
+ so.setIndex(-1);
+ }
+ return list;
+ }
+
+ private static void logNonExistingSortByField(String sortByField) {
+ LOG.warn("Sortby field is neither in aggregated fields or groupby fields, ignore " + sortByField);
+ }
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedAggregator.java
index d8b781e..f4eabcd 100755
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedAggregator.java
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedAggregator.java
@@ -18,18 +18,18 @@ package org.apache.eagle.query.aggregate.timeseries;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-public class SynchronizedAggregator implements Aggregator{
- private Object mutex = new Object();
- private Aggregator agg;
-
- public SynchronizedAggregator(Aggregator agg){
- this.agg = agg;
- }
-
- @Override
- public void accumulate(TaggedLogAPIEntity entity) throws Exception{
- synchronized(mutex){
- agg.accumulate(entity);
- }
- }
-}
+public class SynchronizedAggregator implements Aggregator {
+ private Object mutex = new Object();
+ private Aggregator agg;
+
+ public SynchronizedAggregator(Aggregator agg) {
+ this.agg = agg;
+ }
+
+ @Override
+ public void accumulate(TaggedLogAPIEntity entity) throws Exception {
+ synchronized (mutex) {
+ agg.accumulate(entity);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedEntityCreationListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedEntityCreationListener.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedEntityCreationListener.java
index 7c1412e..baa89be 100644
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedEntityCreationListener.java
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedEntityCreationListener.java
@@ -19,18 +19,18 @@ package org.apache.eagle.query.aggregate.timeseries;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.EntityCreationListener;
-public class SynchronizedEntityCreationListener implements EntityCreationListener{
- private Object mutex = new Object();
- private EntityCreationListener listener;
-
- public SynchronizedEntityCreationListener(EntityCreationListener listener){
- this.listener = listener;
- }
-
- @Override
- public void entityCreated(TaggedLogAPIEntity entity) throws Exception{
- synchronized(mutex){
- listener.entityCreated(entity);
- }
- }
+public class SynchronizedEntityCreationListener implements EntityCreationListener {
+ private Object mutex = new Object();
+ private EntityCreationListener listener;
+
+ public SynchronizedEntityCreationListener(EntityCreationListener listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public void entityCreated(TaggedLogAPIEntity entity) throws Exception {
+ synchronized (mutex) {
+ listener.entityCreated(entity);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesAggregator.java
index 5bebe13..e142657 100755
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesAggregator.java
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesAggregator.java
@@ -29,7 +29,7 @@ import java.util.List;
import java.util.Map;
/**
- * TODO Assuming that data point comes in the sequence of occurrence time desc or asc would
+ * TODO Assuming that data point comes in the sequence of occurrence time desc or asc would
* save memory for holding all the data in the memory
*
* <h3>Aggregate Bucket Structure</h3>
@@ -41,129 +41,135 @@ import java.util.Map;
*
*/
public class TimeSeriesAggregator extends FlatAggregator implements GroupbyKeyAggregatable {
- private final static Logger LOG = LoggerFactory.getLogger(TimeSeriesAggregator.class);
- private static final int DEFAULT_DATAPOINT_MAX_COUNT = 1000;
- private long startTime;
- private long endTime;
- private long intervalms;
- private int numFunctions;
- private int ignoredEntityCounter = 0;
-
- public TimeSeriesAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields,
- long startTime, long endTime, long intervalms){
- super(groupbyFields, aggregateFuntionTypes, aggregatedFields);
- // guard to avoid too many data points returned
-// validateTimeRange(startTime, endTime, intervalms);
- this.startTime = startTime;
- this.endTime = endTime;
- this.intervalms = intervalms;
- this.numFunctions = aggregateFuntionTypes.size();
- }
+ private static final Logger LOG = LoggerFactory.getLogger(TimeSeriesAggregator.class);
+ private static final int DEFAULT_DATAPOINT_MAX_COUNT = 1000;
+ private long startTime;
+ private long endTime;
+ private long intervalms;
+ private int numFunctions;
+ private int ignoredEntityCounter = 0;
-// @Deprecated
-// public static void validateTimeRange(long startTime, long endTime, long intervalms){
-// if(startTime >= endTime || intervalms <= 0){
-// throw new IllegalArgumentException("invalid argument, startTime should be less than endTime and interval must be greater than 0, starTime is " + startTime + " and endTime is " + endTime + ", interval is " + intervalms);
-// }
-// if((endTime-startTime)/intervalms > DEFAULT_DATAPOINT_MAX_COUNT){
-// throw new IllegalArgumentException("invalid argument, # of datapoints should be less than " + DEFAULT_DATAPOINT_MAX_COUNT + ", current # of datapoints is " + (endTime-startTime)/intervalms);
-// }
-// }
-
- public void accumulate(TaggedLogAPIEntity entity) throws Exception{
- List<String> groupbyFieldValues = createGroup(entity);
- // TODO: make sure timestamp be in range of this.startTime to this.endTime in outer side
- // guard the time range to avoid to accumulate entities whose timestamp is bigger than endTime
- if(entity.getTimestamp() >= this.endTime || entity.getTimestamp() < this.startTime){
- if(LOG.isDebugEnabled()) LOG.debug("Ignore in-coming entity whose timestamp > endTime or < startTime, timestamp: " + entity.getTimestamp() + ", startTime:" + startTime + ", endTime:" + endTime);
- this.ignoredEntityCounter ++;
- return;
- }
- // time series bucket index
- long located =(entity.getTimestamp() - startTime)/intervalms;
- groupbyFieldValues.add(String.valueOf(located));
- List<Double> preAggregatedValues = createPreAggregatedValues(entity);
- bucket.addDatapoint(groupbyFieldValues, preAggregatedValues);
- }
-
- public Map<List<String>, List<Double>> result(){
- if(this.ignoredEntityCounter > 0)
- LOG.warn("Ignored "+this.ignoredEntityCounter+" entities for reason: timestamp > "+this.endTime+" or < "+this.startTime);
- return bucket.result();
- }
+ public TimeSeriesAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields,
+ long startTime, long endTime, long intervalms) {
+ super(groupbyFields, aggregateFuntionTypes, aggregatedFields);
+ // guard to avoid too many data points returned
+ // validateTimeRange(startTime, endTime, intervalms);
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.intervalms = intervalms;
+ this.numFunctions = aggregateFuntionTypes.size();
+ }
- /**
- * Support new aggregate result
- *
- * @return
- */
- @Override
- public List<GroupbyKeyValue> getGroupbyKeyValues(){
- if(this.ignoredEntityCounter > 0)
- LOG.warn("Ignored "+this.ignoredEntityCounter+" entities for reason: timestamp > "+this.endTime+" or < "+this.startTime);
- return bucket.getGroupbyKeyValue();
- }
-
- public Map<List<String>, List<double[]>> getMetric(){
- // groupbyfields+timeseriesbucket --> aggregatedvalues for different function
- Map<List<String>, List<Double>> result = bucket.result();
-// Map<List<String>, List<double[]>> timeseriesDatapoints = new HashMap<List<String>, List<double[]>>();
-// /**
-// * bug fix: startTime is inclusive and endTime is exclusive
-// */
-//// int numDatapoints =(int)((endTime-startTime)/intervalms + 1);
-// int numDatapoints =(int)((endTime-1-startTime)/intervalms + 1);
-// for(Map.Entry<List<String>, List<Double>> entry : result.entrySet()){
-// // get groups
-// List<String> groupbyFields = entry.getKey();
-// List<String> copy = new ArrayList<String>(groupbyFields);
-// String strTimeseriesIndex = copy.remove(copy.size()-1);
-// List<double[]> functionValues = timeseriesDatapoints.get(copy);
-// if(functionValues == null){
-// functionValues = new ArrayList<double[]>();
-// timeseriesDatapoints.put(copy, functionValues);
-// for(int i=0; i<numFunctions; i++){
-// functionValues.add(new double[numDatapoints]);
-// }
-// }
-// int timeseriesIndex = Integer.valueOf(strTimeseriesIndex);
-// int functionIndex = 0;
-// for(double[] values : functionValues){
-// values[timeseriesIndex] = entry.getValue().get(functionIndex);
-// functionIndex++;
-// }
-// }
-// return timeseriesDatapoints;
- return toMetric(result,(int)((endTime-1-startTime)/intervalms + 1),this.numFunctions);
- }
+ // @Deprecated
+ // public static void validateTimeRange(long startTime, long endTime, long intervalms) {
+ // if (startTime >= endTime || intervalms <= 0) {
+ // throw new IllegalArgumentException("invalid argument, startTime should be less than endTime and "
+ // + "interval must be greater than 0, starTime is " + startTime + " and endTime is " + endTime + ", interval is " + intervalms);
+ // }
+ // if ((endTime-startTime)/intervalms > DEFAULT_DATAPOINT_MAX_COUNT) {
+ // throw new IllegalArgumentException("invalid argument, # of datapoints should be less than " + DEFAULT_DATAPOINT_MAX_COUNT
+ // + ", current # of datapoints is " + (endTime-startTime)/intervalms);
+ // }
+ // }
- public static Map<List<String>, List<double[]>> toMetric(Map<List<String>, List<Double>> result,int numDatapoints,int numFunctions){
- Map<List<String>, List<double[]>> timeseriesDatapoints = new HashMap<List<String>, List<double[]>>();
- /**
- * bug fix: startTime is inclusive and endTime is exclusive
- */
-// int numDatapoints =(int)((endTime-startTime)/intervalms + 1);
-// int numDatapoints =(int)((endTime-1-startTime)/intervalms + 1);
- for(Map.Entry<List<String>, List<Double>> entry : result.entrySet()){
- // get groups
- List<String> groupbyFields = entry.getKey();
- List<String> copy = new ArrayList<String>(groupbyFields);
- String strTimeseriesIndex = copy.remove(copy.size()-1);
- List<double[]> functionValues = timeseriesDatapoints.get(copy);
- if(functionValues == null){
- functionValues = new ArrayList<double[]>();
- timeseriesDatapoints.put(copy, functionValues);
- for(int i=0; i<numFunctions; i++){
- functionValues.add(new double[numDatapoints]);
- }
- }
- int timeseriesIndex = Integer.valueOf(strTimeseriesIndex);
- int functionIndex = 0;
- for(double[] values : functionValues){
- values[timeseriesIndex] = entry.getValue().get(functionIndex);
- functionIndex++;
- }
- }
- return timeseriesDatapoints;
- }
+ public void accumulate(TaggedLogAPIEntity entity) throws Exception {
+ List<String> groupbyFieldValues = createGroup(entity);
+ // TODO: make sure timestamp be in range of this.startTime to this.endTime in outer side
+ // guard the time range to avoid to accumulate entities whose timestamp is bigger than endTime
+ if (entity.getTimestamp() >= this.endTime || entity.getTimestamp() < this.startTime) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignore in-coming entity whose timestamp > endTime or < startTime, timestamp: " + entity.getTimestamp() + ", startTime:" + startTime + ", endTime:" + endTime);
+ }
+ this.ignoredEntityCounter ++;
+ return;
+ }
+ // time series bucket index
+ long located = (entity.getTimestamp() - startTime) / intervalms;
+ groupbyFieldValues.add(String.valueOf(located));
+ List<Double> preAggregatedValues = createPreAggregatedValues(entity);
+ bucket.addDatapoint(groupbyFieldValues, preAggregatedValues);
+ }
+
+ public Map<List<String>, List<Double>> result() {
+ if (this.ignoredEntityCounter > 0) {
+ LOG.warn("Ignored " + this.ignoredEntityCounter + " entities for reason: timestamp > " + this.endTime + " or < " + this.startTime);
+ }
+ return bucket.result();
+ }
+
+ /**
+ * Support new aggregate result
+ *
+ * @return
+ */
+ @Override
+ public List<GroupbyKeyValue> getGroupbyKeyValues() {
+ if (this.ignoredEntityCounter > 0) {
+ LOG.warn("Ignored " + this.ignoredEntityCounter + " entities for reason: timestamp > " + this.endTime + " or < " + this.startTime);
+ }
+ return bucket.getGroupbyKeyValue();
+ }
+
+ public Map<List<String>, List<double[]>> getMetric() {
+ // groupbyfields+timeseriesbucket --> aggregatedvalues for different function
+ Map<List<String>, List<Double>> result = bucket.result();
+ // Map<List<String>, List<double[]>> timeseriesDatapoints = new HashMap<List<String>, List<double[]>>();
+ // /**
+ // * bug fix: startTime is inclusive and endTime is exclusive
+ // */
+ //// int numDatapoints =(int)((endTime-startTime)/intervalms + 1);
+ // int numDatapoints =(int)((endTime-1-startTime)/intervalms + 1);
+ // for (Map.Entry<List<String>, List<Double>> entry : result.entrySet()) {
+ // // get groups
+ // List<String> groupbyFields = entry.getKey();
+ // List<String> copy = new ArrayList<String>(groupbyFields);
+ // String strTimeseriesIndex = copy.remove(copy.size()-1);
+ // List<double[]> functionValues = timeseriesDatapoints.get(copy);
+ // if (functionValues == null) {
+ // functionValues = new ArrayList<double[]>();
+ // timeseriesDatapoints.put(copy, functionValues);
+ // for (int i = 0; i<numFunctions; i++) {
+ // functionValues.add(new double[numDatapoints]);
+ // }
+ // }
+ // int timeseriesIndex = Integer.valueOf(strTimeseriesIndex);
+ // int functionIndex = 0;
+ // for (double[] values : functionValues) {
+ // values[timeseriesIndex] = entry.getValue().get(functionIndex);
+ // functionIndex++;
+ // }
+ // }
+ // return timeseriesDatapoints;
+ return toMetric(result,(int)((endTime - 1 - startTime) / intervalms + 1), this.numFunctions);
+ }
+
+ public static Map<List<String>, List<double[]>> toMetric(Map<List<String>, List<Double>> result,int numDatapoints,int numFunctions) {
+ Map<List<String>, List<double[]>> timeseriesDatapoints = new HashMap<List<String>, List<double[]>>();
+ /**
+ * bug fix: startTime is inclusive and endTime is exclusive
+ */
+ // int numDatapoints = (int)((endTime-startTime)/intervalms + 1);
+ // int numDatapoints = (int)((endTime-1-startTime)/intervalms + 1);
+ for (Map.Entry<List<String>, List<Double>> entry : result.entrySet()) {
+ // get groups
+ List<String> groupbyFields = entry.getKey();
+ List<String> copy = new ArrayList<String>(groupbyFields);
+ String strTimeseriesIndex = copy.remove(copy.size() - 1);
+ List<double[]> functionValues = timeseriesDatapoints.get(copy);
+ if (functionValues == null) {
+ functionValues = new ArrayList<double[]>();
+ timeseriesDatapoints.put(copy, functionValues);
+ for (int i = 0; i < numFunctions; i++) {
+ functionValues.add(new double[numDatapoints]);
+ }
+ }
+ int timeseriesIndex = Integer.valueOf(strTimeseriesIndex);
+ int functionIndex = 0;
+ for (double[] values : functionValues) {
+ values[timeseriesIndex] = entry.getValue().get(functionIndex);
+ functionIndex++;
+ }
+ }
+ return timeseriesDatapoints;
+ }
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesBucket.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesBucket.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesBucket.java
index d662658..78fa010 100644
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesBucket.java
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesBucket.java
@@ -26,51 +26,51 @@ import org.slf4j.LoggerFactory;
* only numeric aggregation is supported and number type supported is double
*/
public class TimeSeriesBucket {
- private final static Logger LOG = LoggerFactory.getLogger(TimeSeriesBucket.class);
- private long startTime;
- private long endTime;
- private long interval;
-
- // map of aggregation function to aggregated values
- List<double[]> aggregatedValues = new ArrayList<double[]>();
-
- // align from the startTime
- /**
- *
- * @param startTime milliseconds
- * @param endTime milliseconds
- * @param intervalMillseconds
- * @param aggFunctions
- */
- public TimeSeriesBucket(long startTime, long endTime, long intervalms, int numAggFunctions){
- int count =(int)((endTime-startTime)/intervalms);
- for(int i=0; i<numAggFunctions; i++){
- aggregatedValues.add(new double[count]);
- }
- }
-
- /**
- * add datapoint which has a list of values for different aggregate functions
- * for example, sum(numHosts), count(*), avg(timespan) etc
- * @param timestamp
- * @param values
- */
- public void addDataPoint(long timestamp, List<Double> values){
- // locate timeseries bucket
- if(timestamp < startTime || timestamp > endTime){
- LOG.warn("timestamp<startTime or timestamp>endTime, ignore this datapoint." + timestamp + "," + startTime + ":" + endTime);
- return;
- }
- int located =(int)((timestamp - startTime)/interval);
- int index = 0;
- for(Double src : values){
- double[] timeSeriesValues = aggregatedValues.get(index);
- timeSeriesValues[located] += src;
- index++;
- }
- }
-
- public List<double[]> aggregatedValues(){
- return this.aggregatedValues;
- }
+ private static final Logger LOG = LoggerFactory.getLogger(TimeSeriesBucket.class);
+ private long startTime;
+ private long endTime;
+ private long interval;
+
+ // map of aggregation function to aggregated values
+ List<double[]> aggregatedValues = new ArrayList<double[]>();
+
+ // align from the startTime
+ /**
+ *
+ * @param startTime milliseconds
+ * @param endTime milliseconds
+ * @param intervalMillseconds
+ * @param aggFunctions
+ */
+ public TimeSeriesBucket(long startTime, long endTime, long intervalms, int numAggFunctions) {
+ int count = (int)((endTime - startTime) / intervalms);
+ for (int i = 0; i < numAggFunctions; i++) {
+ aggregatedValues.add(new double[count]);
+ }
+ }
+
+ /**
+ * add datapoint which has a list of values for different aggregate functions
+ * for example, sum(numHosts), count(*), avg(timespan) etc
+ * @param timestamp
+ * @param values
+ */
+ public void addDataPoint(long timestamp, List<Double> values) {
+ // locate timeseries bucket
+ if (timestamp < startTime || timestamp > endTime) {
+ LOG.warn("timestamp<startTime or timestamp>endTime, ignore this datapoint." + timestamp + "," + startTime + ":" + endTime);
+ return;
+ }
+ int located = (int)((timestamp - startTime) / interval);
+ int index = 0;
+ for (Double src : values) {
+ double[] timeSeriesValues = aggregatedValues.get(index);
+ timeSeriesValues[located] += src;
+ index++;
+ }
+ }
+
+ public List<double[]> aggregatedValues() {
+ return this.aggregatedValues;
+ }
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesPostFlatAggregateSort.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesPostFlatAggregateSort.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesPostFlatAggregateSort.java
index c0a6e06..ae00fdf 100644
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesPostFlatAggregateSort.java
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesPostFlatAggregateSort.java
@@ -25,127 +25,127 @@ import java.util.SortedSet;
import java.util.TreeSet;
public class TimeSeriesPostFlatAggregateSort {
- // private static final Logger logger =
- // LoggerFactory.getLogger(PostFlatAggregateSort.class);
-
- private static SortedSet<Map.Entry<List<String>, List<Double>>> sortByValue(
- Map<List<String>, List<Double>> mapForSort,
- List<SortOption> sortOptions) {
- SortedSet<Map.Entry<List<String>, List<Double>>> sortedEntries = new TreeSet<Map.Entry<List<String>, List<Double>>>(
- new MapEntryComparator(sortOptions));
- sortedEntries.addAll(mapForSort.entrySet());
- return sortedEntries;
- }
-
- /**
- * sort aggregated results with sort options
- *
- * @param entity
- */
- public static List<Map.Entry<List<String>, List<double[]>>> sort(
- Map<List<String>, List<Double>> mapForSort,
- Map<List<String>, List<double[]>> valueMap,
- List<SortOption> sortOptions, int topN) {
-
- processIndex(sortOptions);
- List<Map.Entry<List<String>, List<double[]>>> result = new ArrayList<Map.Entry<List<String>, List<double[]>>>();
- SortedSet<Map.Entry<List<String>, List<Double>>> sortedSet = sortByValue(
- mapForSort, sortOptions);
- for (Map.Entry<List<String>, List<Double>> entry : sortedSet) {
- List<String> key = entry.getKey();
- List<double[]> value = valueMap.get(key);
- if (value != null) {
- Map.Entry<List<String>, List<double[]>> newEntry = new ImmutableEntry<List<String>, List<double[]>>(key, value);
- result.add(newEntry);
- if (topN > 0 && result.size() >= topN) {
- break;
- }
- }
- }
- return result;
- }
-
- private static void processIndex(List<SortOption> sortOptions) {
- for (int i = 0; i < sortOptions.size(); ++i) {
- SortOption so = sortOptions.get(i);
- so.setIndex(i);
- }
- }
-
- private static class MapEntryComparator implements
- Comparator<Map.Entry<List<String>, List<Double>>> {
- private List<SortOption> sortOptions;
-
- public MapEntryComparator(List<SortOption> sortOptions) {
- this.sortOptions = sortOptions;
- }
-
- /**
- * default to sort by all groupby fields
- */
- @Override
- public int compare(Map.Entry<List<String>, List<Double>> e1,
- Map.Entry<List<String>, List<Double>> e2) {
- int r = 0;
- List<String> keyList1 = e1.getKey();
- List<Double> valueList1 = e1.getValue();
- List<String> keyList2 = e2.getKey();
- List<Double> valueList2 = e2.getValue();
- for (SortOption so : sortOptions) {
- int index = so.getIndex();
- if (index == -1) {
- continue;
- }
- if (!so.isInGroupby()) { // sort fields come from functions
- Double value1 = valueList1.get(index);
- Double value2 = valueList2.get(index);
- r = value1.compareTo(value2);
- } else { // sort fields come from groupby fields
- String key1 = keyList1.get(index);
- String key2 = keyList2.get(index);
- r = key1.compareTo(key2);
- }
- if (r == 0)
- continue;
- if (!so.isAscendant()) {
- r = -r;
- }
- return r;
- }
- // default to sort by groupby fields ascendently
- if (r == 0) { // TODO is this check necessary
- return new GroupbyFieldsComparator()
- .compare(keyList1, keyList2);
- }
- return r;
- }
- }
-
- static class ImmutableEntry<K, V> implements Map.Entry<K, V>, Serializable {
- private final K key;
- private final V value;
-
- ImmutableEntry(K key, V value) {
- this.key = key;
- this.value = value;
- }
-
- @Override
- public K getKey() {
- return key;
- }
-
- @Override
- public V getValue() {
- return value;
- }
-
- @Override
- public final V setValue(V value) {
- throw new UnsupportedOperationException();
- }
-
- private static final long serialVersionUID = 0;
- }
+ // private static final Logger logger =
+ // LoggerFactory.getLogger(PostFlatAggregateSort.class);
+
+ private static SortedSet<Map.Entry<List<String>, List<Double>>> sortByValue(
+ Map<List<String>, List<Double>> mapForSort,
+ List<SortOption> sortOptions) {
+ SortedSet<Map.Entry<List<String>, List<Double>>> sortedEntries = new TreeSet<Map.Entry<List<String>, List<Double>>>(
+ new MapEntryComparator(sortOptions));
+ sortedEntries.addAll(mapForSort.entrySet());
+ return sortedEntries;
+ }
+
+ /**
+ * sort aggregated results with sort options
+ *
+ * @param entity
+ */
+ public static List<Map.Entry<List<String>, List<double[]>>> sort(
+ Map<List<String>, List<Double>> mapForSort,
+ Map<List<String>, List<double[]>> valueMap,
+ List<SortOption> sortOptions, int topN) {
+
+ processIndex(sortOptions);
+ List<Map.Entry<List<String>, List<double[]>>> result = new ArrayList<Map.Entry<List<String>, List<double[]>>>();
+ SortedSet<Map.Entry<List<String>, List<Double>>> sortedSet = sortByValue(
+ mapForSort, sortOptions);
+ for (Map.Entry<List<String>, List<Double>> entry : sortedSet) {
+ List<String> key = entry.getKey();
+ List<double[]> value = valueMap.get(key);
+ if (value != null) {
+ Map.Entry<List<String>, List<double[]>> newEntry = new ImmutableEntry<List<String>, List<double[]>>(key, value);
+ result.add(newEntry);
+ if (topN > 0 && result.size() >= topN) {
+ break;
+ }
+ }
+ }
+ return result;
+ }
+
+ private static void processIndex(List<SortOption> sortOptions) {
+ for (int i = 0; i < sortOptions.size(); ++i) {
+ SortOption so = sortOptions.get(i);
+ so.setIndex(i);
+ }
+ }
+
+ private static class MapEntryComparator implements Comparator<Map.Entry<List<String>, List<Double>>> {
+ private List<SortOption> sortOptions;
+
+ public MapEntryComparator(List<SortOption> sortOptions) {
+ this.sortOptions = sortOptions;
+ }
+
+ /**
+ * default to sort by all groupby fields
+ */
+ @Override
+ public int compare(Map.Entry<List<String>, List<Double>> e1,
+ Map.Entry<List<String>, List<Double>> e2) {
+ int r = 0;
+ List<String> keyList1 = e1.getKey();
+ List<Double> valueList1 = e1.getValue();
+ List<String> keyList2 = e2.getKey();
+ List<Double> valueList2 = e2.getValue();
+ for (SortOption so : sortOptions) {
+ int index = so.getIndex();
+ if (index == -1) {
+ continue;
+ }
+ if (!so.isInGroupby()) { // sort fields come from functions
+ Double value1 = valueList1.get(index);
+ Double value2 = valueList2.get(index);
+ r = value1.compareTo(value2);
+ } else { // sort fields come from groupby fields
+ String key1 = keyList1.get(index);
+ String key2 = keyList2.get(index);
+ r = key1.compareTo(key2);
+ }
+ if (r == 0) {
+ continue;
+ }
+ if (!so.isAscendant()) {
+ r = -r;
+ }
+ return r;
+ }
+ // default to sort by groupby fields ascendently
+ if (r == 0) { // TODO is this check necessary
+ return new GroupbyFieldsComparator()
+ .compare(keyList1, keyList2);
+ }
+ return r;
+ }
+ }
+
+ static class ImmutableEntry<K, V> implements Map.Entry<K, V>, Serializable {
+ private final K key;
+ private final V value;
+
+ ImmutableEntry(K key, V value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public K getKey() {
+ return key;
+ }
+
+ @Override
+ public V getValue() {
+ return value;
+ }
+
+ @Override
+ public final V setValue(V value) {
+ throw new UnsupportedOperationException();
+ }
+
+ private static final long serialVersionUID = 0;
+ }
}