You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ja...@apache.org on 2018/02/07 07:05:49 UTC

[5/7] eagle git commit: [EAGLE-1080] Fix checkstyle errors in the eagle-query-base module

http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/FlatAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/FlatAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/FlatAggregator.java
index e12fea3..15e89ad 100755
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/FlatAggregator.java
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/FlatAggregator.java
@@ -26,36 +26,36 @@ import org.apache.eagle.query.aggregate.AggregateFunctionType;
 /**
  * Not thread safe
  */
-public class FlatAggregator extends AbstractAggregator{
-	protected GroupbyBucket bucket;
+public class FlatAggregator extends AbstractAggregator {
+    protected GroupbyBucket bucket;
 
     /**
      * @param groupbyFields
      * @param aggregateFuntionTypes
      * @param aggregatedFields
      */
-	public FlatAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields){
-		super(groupbyFields, aggregateFuntionTypes, aggregatedFields);
-		bucket = new GroupbyBucket(this.aggregateFunctionTypes);
-	}
-	
-	public void accumulate(TaggedLogAPIEntity entity) throws Exception{
-		List<String> groupbyFieldValues = createGroup(entity);
-		List<Double> preAggregatedValues = createPreAggregatedValues(entity);
-		bucket.addDatapoint(groupbyFieldValues, preAggregatedValues);
-	}
-	
-	public Map<List<String>, List<Double>> result(){
-		return bucket.result(); 
-	}
-	
-	protected List<String> createGroup(TaggedLogAPIEntity entity){
-		List<String> groupbyFieldValues = new ArrayList<String>();
-		int i = 0;
-		for(String groupbyField : groupbyFields){
-			String groupbyFieldValue = determineGroupbyFieldValue(entity, groupbyField, i++);
-			groupbyFieldValues.add(groupbyFieldValue);
-		}
-		return groupbyFieldValues;
-	}
+    public FlatAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields) {
+        super(groupbyFields, aggregateFuntionTypes, aggregatedFields);
+        bucket = new GroupbyBucket(this.aggregateFunctionTypes);
+    }
+
+    public void accumulate(TaggedLogAPIEntity entity) throws Exception {
+        List<String> groupbyFieldValues = createGroup(entity);
+        List<Double> preAggregatedValues = createPreAggregatedValues(entity);
+        bucket.addDatapoint(groupbyFieldValues, preAggregatedValues);
+    }
+
+    public Map<List<String>, List<Double>> result() {
+        return bucket.result();
+    }
+
+    protected List<String> createGroup(TaggedLogAPIEntity entity) {
+        List<String> groupbyFieldValues = new ArrayList<String>();
+        int i = 0;
+        for (String groupbyField : groupbyFields) {
+            String groupbyFieldValue = determineGroupbyFieldValue(entity, groupbyField, i++);
+            groupbyFieldValues.add(groupbyFieldValue);
+        }
+        return groupbyFieldValues;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyBucket.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyBucket.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyBucket.java
index ea57edb..93e65a9 100755
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyBucket.java
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyBucket.java
@@ -31,211 +31,230 @@ import java.util.List;
 import java.util.Map;
 
 public class GroupbyBucket {
-	private final static Logger LOG = LoggerFactory.getLogger(GroupbyBucket.class);
-	
-	public static Map<String, FunctionFactory> _functionFactories = 
-			new HashMap<>();
-    
-	// TODO put this logic to AggregatorFunctionType
-	static{
-		_functionFactories.put(AggregateFunctionType.count.name(), new CountFactory());
-		_functionFactories.put(AggregateFunctionType.sum.name(), new SumFactory());
-		_functionFactories.put(AggregateFunctionType.min.name(), new MinFactory());
-		_functionFactories.put(AggregateFunctionType.max.name(), new MaxFactory());
-		_functionFactories.put(AggregateFunctionType.avg.name(), new AvgFactory());
-	}
-	
-	private List<AggregateFunctionType> types;
-//	private SortedMap<List<String>, List<Function>> group2FunctionMap = 
-//			new TreeMap<List<String>, List<Function>>(new GroupbyFieldsComparator());
-	
-	private Map<List<String>, List<Function>> group2FunctionMap = new HashMap<>(); //new GroupbyFieldsComparator());
-	
-	public GroupbyBucket(List<AggregateFunctionType> types){
-		this.types = types;
-	}
-	
-	public void addDatapoint(List<String> groupbyFieldValues, List<Double> values){
-		// LOG.info("DEBUG: addDatapoint: groupby=["+StringUtils.join(groupbyFieldValues,",")+"], values=["+StringUtils.join(values, ",")+"]");
-		
-		// locate groupby bucket
-		List<Function> functions = group2FunctionMap.get(groupbyFieldValues);
-		if(functions == null){
-			functions = new ArrayList<Function>();
-			for(AggregateFunctionType type : types){
-				functions.add(_functionFactories.get(type.name()).createFunction());
-			}
-			group2FunctionMap.put(groupbyFieldValues, functions);
-		}
-		int functionIndex = 0;
-		for(Double v : values){
-			functions.get(functionIndex).run(v);
-			functionIndex++;
-		}
-	}
-	
-	public Map<List<String>, List<Double>> result(){
-		Map<List<String>, List<Double>> result = new HashMap<List<String>, List<Double>>();
-		for(Map.Entry<List<String>, List<Function>> entry : this.group2FunctionMap.entrySet()){
-			List<Double> values = new ArrayList<Double>();
-			for(Function f : entry.getValue()){
-				values.add(f.result());
-			}
-			result.put(entry.getKey(), values);
-		}
-		return result;
-	}
-
-	public List<GroupbyKeyValue> getGroupbyKeyValue(){
-		List<GroupbyKeyValue>  results = new ArrayList<GroupbyKeyValue>();
-		
-		for(Map.Entry<List<String>, List<Function>> entry : this.group2FunctionMap.entrySet()){
-			GroupbyKey key = new GroupbyKey();
-			for(String keyStr:entry.getKey()){
-				try {
-					key.addValue(keyStr.getBytes(QueryConstants.CHARSET));
-				} catch (UnsupportedEncodingException e) {
-					LOG.error(e.getMessage(),e);
-				}
-			}
-			GroupbyValue value = new GroupbyValue();
-			for(Function f : entry.getValue()){
-				value.add(f.result());
-				value.addMeta(f.count());
-			}
-			results.add(new GroupbyKeyValue(key,value));
-		}
-		
-		return results;
-	}
-	
-	public static interface FunctionFactory{
-		public Function createFunction();
-	}
-	
-	public static abstract class Function{
-		protected int count;
-
-		public abstract void run(double v);
-		public abstract double result();
-		public int count(){
-			return count;
-		}
-		public void incrCount(){
-			count ++;
-		}
-	}
-
-	private static class CountFactory implements FunctionFactory{
-		@Override
-		public Function createFunction(){
-			return new Count();
-		}
-	}
-	
-	
-	private static class Count extends Sum{
-		public Count(){
-			super();
-		}
-	}
-	
-	private static class SumFactory implements FunctionFactory{
-		@Override
-		public Function createFunction(){
-			return new Sum();
-		}
-	}
-	
-	private static class Sum extends Function{
-		private double summary;
-		public Sum(){
-			this.summary = 0.0;
-		}
-		@Override
-		public void run(double v){
-			this.incrCount();
-			this.summary += v;
-		}
-		
-		@Override
-		public double result(){
-			return this.summary;
-		}
-	}
-	
-	private static class MinFactory implements FunctionFactory{
-		@Override
-		public Function createFunction(){
-			return new Min();
-		}
-	}
-	public static class Min extends Function{
-		private double minimum;
-		public Min(){
-			// TODO is this a bug, or only positive numeric calculation is supported
-			this.minimum = Double.MAX_VALUE;
-		}
-
-		@Override
-		public void run(double v){
-			if(v < minimum){
-				minimum = v;
-			}
-			this.incrCount();
-		}
-		
-		@Override
-		public double result(){
-			return minimum;
-		}
-	}
-	
-	private static class MaxFactory implements FunctionFactory{
-		@Override
-		public Function createFunction(){
-			return new Max();
-		}
-	}
-	public static class Max extends Function{
-		private double maximum;
-		public Max(){
-			// TODO is this a bug, or only positive numeric calculation is supported
-			this.maximum = 0.0;
-		}
-		@Override
-		public void run(double v){
-			if(v > maximum){
-				maximum = v;
-			}
-			this.incrCount();
-		}
-		
-		@Override
-		public double result(){
-			return maximum;
-		}
-	}
-	
-	private static class AvgFactory implements FunctionFactory{
-		@Override
-		public Function createFunction(){
-			return new Avg();
-		}
-	}
-	public static class Avg extends Function{
-		private double total;
-		public Avg(){
-			this.total = 0.0;
-		}
-		@Override
-		public void run(double v){
-			total += v;
-			this.incrCount();
-		}
-		@Override
-		public double result(){
-			return this.total/this.count;
-		}
-	}
+    private static final Logger LOG = LoggerFactory.getLogger(GroupbyBucket.class);
+
+    public static Map<String, FunctionFactory> functionFactories = new HashMap<>();
+
+    // TODO put this logic to AggregatorFunctionType
+    static {
+        functionFactories.put(AggregateFunctionType.count.name(), new CountFactory());
+        functionFactories.put(AggregateFunctionType.sum.name(), new SumFactory());
+        functionFactories.put(AggregateFunctionType.min.name(), new MinFactory());
+        functionFactories.put(AggregateFunctionType.max.name(), new MaxFactory());
+        functionFactories.put(AggregateFunctionType.avg.name(), new AvgFactory());
+    }
+
+    private List<AggregateFunctionType> types;
+    // private SortedMap<List<String>, List<Function>> group2FunctionMap =
+    //     new TreeMap<List<String>, List<Function>>(new GroupbyFieldsComparator());
+
+    private Map<List<String>, List<Function>> group2FunctionMap = new HashMap<>(); //new GroupbyFieldsComparator());
+
+    public GroupbyBucket(List<AggregateFunctionType> types) {
+        this.types = types;
+    }
+
+    public void addDatapoint(List<String> groupbyFieldValues, List<Double> values) {
+        // LOG.info("DEBUG: addDatapoint: groupby =["+StringUtils.join(groupbyFieldValues,",")+"], values=["+StringUtils.join(values, ",")+"]");
+
+        // locate groupby bucket
+        List<Function> functions = group2FunctionMap.get(groupbyFieldValues);
+        if (functions == null) {
+            functions = new ArrayList<Function>();
+            for (AggregateFunctionType type : types) {
+                functions.add(functionFactories.get(type.name()).createFunction());
+            }
+            group2FunctionMap.put(groupbyFieldValues, functions);
+        }
+        int functionIndex = 0;
+        for (Double v : values) {
+            functions.get(functionIndex).run(v);
+            functionIndex++;
+        }
+    }
+
+    public Map<List<String>, List<Double>> result() {
+        Map<List<String>, List<Double>> result = new HashMap<List<String>, List<Double>>();
+        for (Map.Entry<List<String>, List<Function>> entry : this.group2FunctionMap.entrySet()) {
+            List<Double> values = new ArrayList<Double>();
+            for (Function f : entry.getValue()) {
+                values.add(f.result());
+            }
+            result.put(entry.getKey(), values);
+        }
+        return result;
+    }
+
+    public List<GroupbyKeyValue> getGroupbyKeyValue() {
+        List<GroupbyKeyValue>  results = new ArrayList<GroupbyKeyValue>();
+
+        for (Map.Entry<List<String>, List<Function>> entry : this.group2FunctionMap.entrySet()) {
+            GroupbyKey key = new GroupbyKey();
+            for (String keyStr:entry.getKey()) {
+                try {
+                    key.addValue(keyStr.getBytes(QueryConstants.CHARSET));
+                } catch (UnsupportedEncodingException e) {
+                    LOG.error(e.getMessage(),e);
+                }
+            }
+            GroupbyValue value = new GroupbyValue();
+            for (Function f : entry.getValue()) {
+                value.add(f.result());
+                value.addMeta(f.count());
+            }
+            results.add(new GroupbyKeyValue(key,value));
+        }
+
+        return results;
+    }
+
+    public static interface FunctionFactory {
+        public Function createFunction();
+    }
+
+    public abstract static class Function {
+        protected int count;
+
+        public abstract void run(double v);
+
+        public abstract double result();
+
+        public int count() {
+            return count;
+        }
+
+        public void incrCount() {
+            count ++;
+        }
+    }
+
+    private static class CountFactory implements FunctionFactory {
+
+        @Override
+        public Function createFunction() {
+            return new Count();
+        }
+    }
+
+
+    private static class Count extends Sum {
+
+        public Count() {
+            super();
+        }
+    }
+
+    private static class SumFactory implements FunctionFactory {
+
+        @Override
+        public Function createFunction() {
+            return new Sum();
+        }
+    }
+
+    private static class Sum extends Function {
+        private double summary;
+
+        public Sum() {
+            this.summary = 0.0;
+        }
+
+        @Override
+        public void run(double v) {
+            this.incrCount();
+            this.summary += v;
+        }
+
+        @Override
+        public double result() {
+            return this.summary;
+        }
+    }
+
+    private static class MinFactory implements FunctionFactory {
+
+        @Override
+        public Function createFunction() {
+            return new Min();
+        }
+    }
+
+    public static class Min extends Function {
+        private double minimum;
+
+        public Min() {
+            // TODO is this a bug, or only positive numeric calculation is supported
+            this.minimum = Double.MAX_VALUE;
+        }
+
+        @Override
+        public void run(double v) {
+            if (v < minimum) {
+                minimum = v;
+            }
+            this.incrCount();
+        }
+
+        @Override
+        public double result() {
+            return minimum;
+        }
+    }
+
+    private static class MaxFactory implements FunctionFactory {
+
+        @Override
+        public Function createFunction() {
+            return new Max();
+        }
+    }
+
+    public static class Max extends Function {
+        private double maximum;
+
+        public Max() {
+            // TODO is this a bug, or only positive numeric calculation is supported
+            this.maximum = 0.0;
+        }
+
+        @Override
+        public void run(double v) {
+            if (v > maximum) {
+                maximum = v;
+            }
+            this.incrCount();
+        }
+
+        @Override
+        public double result() {
+            return maximum;
+        }
+    }
+
+    private static class AvgFactory implements FunctionFactory {
+
+        @Override
+        public Function createFunction() {
+            return new Avg();
+        }
+    }
+
+    public static class Avg extends Function {
+        private double total;
+
+        public Avg() {
+            this.total = 0.0;
+        }
+
+        @Override
+        public void run(double v) {
+            total += v;
+            this.incrCount();
+        }
+
+        @Override
+        public double result() {
+            return this.total / this.count;
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyFieldsComparator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyFieldsComparator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyFieldsComparator.java
index 6635483..b612ccf 100644
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyFieldsComparator.java
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyFieldsComparator.java
@@ -22,18 +22,21 @@ import java.util.List;
 /**
  * this is default comparator for aggregation. The behavior is to sort by groupby fields ascendantly
  */
-public class GroupbyFieldsComparator implements Comparator<List<String>>{
-	@Override 
-    public int compare(List<String> list1, List<String> list2){
-		if(list1 == null || list2 == null || list1.size() != list2.size())
-			throw new IllegalArgumentException("2 list of groupby fields must be non-null and have the same size");
-		int r = 0;
-		int index = 0;
-		for(String s1 : list1){
-			r = s1.compareTo(list2.get(index++));
-			if(r != 0)
-				return r;
-		}
-		return r;
-	}
+public class GroupbyFieldsComparator implements Comparator<List<String>> {
+
+    @Override
+    public int compare(List<String> list1, List<String> list2) {
+        if (list1 == null || list2 == null || list1.size() != list2.size()) {
+            throw new IllegalArgumentException("2 list of groupby fields must be non-null and have the same size");
+        }
+        int r = 0;
+        int index = 0;
+        for (String s1 : list1) {
+            r = s1.compareTo(list2.get(index++));
+            if (r != 0) {
+                return r;
+            }
+        }
+        return r;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregateEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregateEntity.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregateEntity.java
index 341fa00..559061b 100644
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregateEntity.java
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregateEntity.java
@@ -25,43 +25,52 @@ import java.util.TreeMap;
 
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 public class HierarchicalAggregateEntity {
-	private String key;
-	private List<GroupbyBucket.Function> tmpValues = new ArrayList<GroupbyBucket.Function>();
-	private List<Double> values = new ArrayList<Double>();
-	private SortedMap<String, HierarchicalAggregateEntity> children = new TreeMap<String, HierarchicalAggregateEntity>();
-	private SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedList = null;
+    private String key;
+    private List<GroupbyBucket.Function> tmpValues = new ArrayList<GroupbyBucket.Function>();
+    private List<Double> values = new ArrayList<Double>();
+    private SortedMap<String, HierarchicalAggregateEntity> children = new TreeMap<String, HierarchicalAggregateEntity>();
+    private SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedList = null;
 
-	public SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> getSortedList() {
-		return sortedList;
-	}
-	public void setSortedList(
-			SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedList) {
-		this.sortedList = sortedList;
-	}
-	public List<GroupbyBucket.Function> getTmpValues() {
-		return tmpValues;
-	}
-	public void setTmpValues(List<GroupbyBucket.Function> tmpValues) {
-		this.tmpValues = tmpValues;
-	}
-	public String getKey() {
-		return key;
-	}
-	public void setKey(String key) {
-		this.key = key;
-	}
-	public List<Double> getValues() {
-		return values;
-	}
-	public void setValues(List<Double> values) {
-		this.values = values;
-	}
-	public SortedMap<String, HierarchicalAggregateEntity> getChildren() {
-		return children;
-	}
-	public void setChildren(SortedMap<String, HierarchicalAggregateEntity> children) {
-		this.children = children;
-	}
+    public SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> getSortedList() {
+        return sortedList;
+    }
+
+    public void setSortedList(
+                              SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedList) {
+        this.sortedList = sortedList;
+    }
+
+    public List<GroupbyBucket.Function> getTmpValues() {
+        return tmpValues;
+    }
+
+    public void setTmpValues(List<GroupbyBucket.Function> tmpValues) {
+        this.tmpValues = tmpValues;
+    }
+
+    public String getKey() {
+        return key;
+    }
+
+    public void setKey(String key) {
+        this.key = key;
+    }
+
+    public List<Double> getValues() {
+        return values;
+    }
+
+    public void setValues(List<Double> values) {
+        this.values = values;
+    }
+
+    public SortedMap<String, HierarchicalAggregateEntity> getChildren() {
+        return children;
+    }
+
+    public void setChildren(SortedMap<String, HierarchicalAggregateEntity> children) {
+        this.children = children;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregator.java
index ecb80ac..8751a74 100755
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregator.java
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregator.java
@@ -22,61 +22,61 @@ import java.util.SortedMap;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.query.aggregate.AggregateFunctionType;
 
-public class HierarchicalAggregator extends AbstractAggregator{
-	private HierarchicalAggregateEntity root = new HierarchicalAggregateEntity();
+public class HierarchicalAggregator extends AbstractAggregator {
+    private HierarchicalAggregateEntity root = new HierarchicalAggregateEntity();
 
-	public HierarchicalAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields){
-		super(groupbyFields, aggregateFuntionTypes, aggregatedFields);
-	}
+    public HierarchicalAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields) {
+        super(groupbyFields, aggregateFuntionTypes, aggregatedFields);
+    }
 
-	public void accumulate(TaggedLogAPIEntity entity) throws Exception{
-		List<Double> preAggregatedValues = createPreAggregatedValues(entity);
-		// aggregate to root first
-		addDatapoint(root, preAggregatedValues);
-		// go through hierarchical tree
-		HierarchicalAggregateEntity current = root;
-		int i = 0;
-		for(String groupbyField : groupbyFields){
-			// determine groupbyFieldValue from tag or fields
-			String groupbyFieldValue = determineGroupbyFieldValue(entity, groupbyField, i);
-			SortedMap<String, HierarchicalAggregateEntity> children = current.getChildren();
-			if(children.get(groupbyFieldValue) == null){
-				HierarchicalAggregateEntity tmp = new HierarchicalAggregateEntity();
-				children.put(groupbyFieldValue, tmp);
-			}
-			children.get(groupbyFieldValue).setKey(groupbyFieldValue);
-			addDatapoint(children.get(groupbyFieldValue), preAggregatedValues);
-			current = children.get(groupbyFieldValue);
-		}
-	}
+    public void accumulate(TaggedLogAPIEntity entity) throws Exception {
+        List<Double> preAggregatedValues = createPreAggregatedValues(entity);
+        // aggregate to root first
+        addDatapoint(root, preAggregatedValues);
+        // go through hierarchical tree
+        HierarchicalAggregateEntity current = root;
+        int i = 0;
+        for (String groupbyField : groupbyFields) {
+            // determine groupbyFieldValue from tag or fields
+            String groupbyFieldValue = determineGroupbyFieldValue(entity, groupbyField, i);
+            SortedMap<String, HierarchicalAggregateEntity> children = current.getChildren();
+            if (children.get(groupbyFieldValue) == null) {
+                HierarchicalAggregateEntity tmp = new HierarchicalAggregateEntity();
+                children.put(groupbyFieldValue, tmp);
+            }
+            children.get(groupbyFieldValue).setKey(groupbyFieldValue);
+            addDatapoint(children.get(groupbyFieldValue), preAggregatedValues);
+            current = children.get(groupbyFieldValue);
+        }
+    }
 
-	private void addDatapoint(HierarchicalAggregateEntity entity, List<Double> values){
-		List<GroupbyBucket.Function> functions = entity.getTmpValues();
-		// initialize list of function
-		if(functions.isEmpty()){
-			for(AggregateFunctionType type : aggregateFunctionTypes){
-				functions.add(GroupbyBucket._functionFactories.get(type.name()).createFunction());
-			}
-		}
-		int functionIndex = 0;
-		for(Double v : values){
-			functions.get(functionIndex).run(v);
-			functionIndex++;
-		}
-	}
+    private void addDatapoint(HierarchicalAggregateEntity entity, List<Double> values) {
+        List<GroupbyBucket.Function> functions = entity.getTmpValues();
+        // initialize list of function
+        if (functions.isEmpty()) {
+            for (AggregateFunctionType type : aggregateFunctionTypes) {
+                functions.add(GroupbyBucket.functionFactories.get(type.name()).createFunction());
+            }
+        }
+        int functionIndex = 0;
+        for (Double v : values) {
+            functions.get(functionIndex).run(v);
+            functionIndex++;
+        }
+    }
 
-	private void finalizeHierarchicalAggregateEntity(HierarchicalAggregateEntity entity){
-		for(GroupbyBucket.Function f : entity.getTmpValues()){
-			entity.getValues().add(f.result());
-		}
-		for(HierarchicalAggregateEntity child : entity.getChildren().values()){
-			finalizeHierarchicalAggregateEntity(child);
-		}
-		entity.setTmpValues(null);
-	}
+    private void finalizeHierarchicalAggregateEntity(HierarchicalAggregateEntity entity) {
+        for (GroupbyBucket.Function f : entity.getTmpValues()) {
+            entity.getValues().add(f.result());
+        }
+        for (HierarchicalAggregateEntity child : entity.getChildren().values()) {
+            finalizeHierarchicalAggregateEntity(child);
+        }
+        entity.setTmpValues(null);
+    }
 
-	public HierarchicalAggregateEntity result(){
-		finalizeHierarchicalAggregateEntity(root);
-		return this.root;
-	}
+    public HierarchicalAggregateEntity result() {
+        finalizeHierarchicalAggregateEntity(root);
+        return this.root;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostFlatAggregateSort.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostFlatAggregateSort.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostFlatAggregateSort.java
index f62d2c2..8ca24c6 100644
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostFlatAggregateSort.java
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostFlatAggregateSort.java
@@ -24,70 +24,74 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 
 public class PostFlatAggregateSort {
-	private static SortedSet<Map.Entry<List<String>, List<Double>>> sortByValue(Map<List<String>, List<Double>> map, List<SortOption> sortOptions) {
-	    SortedSet<Map.Entry<List<String>, List<Double>>> sortedEntries = new TreeSet<Map.Entry<List<String>, List<Double>>>(new MapEntryComparator(sortOptions));
-	    sortedEntries.addAll(map.entrySet());
-	    return sortedEntries;
-	}
+    private static SortedSet<Map.Entry<List<String>, List<Double>>> sortByValue(Map<List<String>, List<Double>> map, List<SortOption> sortOptions) {
+        SortedSet<Map.Entry<List<String>, List<Double>>> sortedEntries = new TreeSet<Map.Entry<List<String>, List<Double>>>(new MapEntryComparator(sortOptions));
+        sortedEntries.addAll(map.entrySet());
+        return sortedEntries;
+    }
 
-	/**
-	 * sort aggregated results with sort options
-	 * @param aggregatedResult aggregated result set, but it is not sorted
-	 * @sortOptions sorting options
-	 * @topN top N results will be returned if topN is specified. If it's not specified (as default value 0), all results will be returned
-	 */
-	public static List<Map.Entry<List<String>, List<Double>>> sort(Map<List<String>, List<Double>> aggregatedResult, List<SortOption> sortOptions, int topN){
-		SortedSet<Map.Entry<List<String>, List<Double>>> allList = sortByValue(aggregatedResult, sortOptions);
-		List<Map.Entry<List<String>, List<Double>>> result = new ArrayList<Map.Entry<List<String>, List<Double>>>();
-		for (Map.Entry<List<String>, List<Double>> entry : allList) {
-			result.add(entry);
-			if (topN > 0 && result.size() >= topN) {
-				break;
-			}
-		}
-		return result;
-	}
+    /**
+     * sort aggregated results with sort options
+     * @param aggregatedResult aggregated result set, but it is not sorted
+     * @sortOptions sorting options
+     * @topN top N results will be returned if topN is specified. If it's not specified (as default value 0), all results will be returned
+     */
+    public static List<Map.Entry<List<String>, List<Double>>> sort(Map<List<String>, List<Double>> aggregatedResult, List<SortOption> sortOptions, int topN) {
+        SortedSet<Map.Entry<List<String>, List<Double>>> allList = sortByValue(aggregatedResult, sortOptions);
+        List<Map.Entry<List<String>, List<Double>>> result = new ArrayList<Map.Entry<List<String>, List<Double>>>();
+        for (Map.Entry<List<String>, List<Double>> entry : allList) {
+            result.add(entry);
+            if (topN > 0 && result.size() >= topN) {
+                break;
+            }
+        }
+        return result;
+    }
+
+    private static class MapEntryComparator implements Comparator<Map.Entry<List<String>, List<Double>>> {
+        private List<SortOption> sortOptions;
+
+        public MapEntryComparator(List<SortOption> sortOptions) {
+            this.sortOptions = sortOptions;
+        }
 
-	private static class MapEntryComparator implements Comparator<Map.Entry<List<String>, List<Double>>>{
-		private List<SortOption> sortOptions;
-		public MapEntryComparator(List<SortOption> sortOptions){
-			this.sortOptions = sortOptions;
-		}
-		/**
-		 * default to sort by all groupby fields
-		 */
-		@Override
-        public int compare(Map.Entry<List<String>, List<Double>> e1, Map.Entry<List<String>, List<Double>> e2){
-			int r = 0;
-			List<String> keyList1 = e1.getKey();
-			List<Double> valueList1 = e1.getValue();
-			List<String> keyList2 = e2.getKey();
-			List<Double> valueList2 = e2.getValue();
-			for(SortOption so : sortOptions){
-				int index = so.getIndex();
-				if (index == -1) {
-					continue;
-				}
-				if(!so.isInGroupby()){  // sort fields come from functions
-					Double value1 = valueList1.get(index);
-					Double value2 = valueList2.get(index);
-					r = value1.compareTo(value2);
-				}else{  // sort fields come from groupby fields
-					String key1 = keyList1.get(index);
-					String key2 = keyList2.get(index);
-					r = key1.compareTo(key2);
-				}
-				if(r == 0) continue;
-				if(!so.isAscendant()){
-					r = -r;
-				}
-				return r;
-			}
-			// default to sort by groupby fields ascendently
-			if(r ==0){ // TODO is this check necessary
-				return new GroupbyFieldsComparator().compare(keyList1, keyList2);
-			}
-			return r;
+        /**
+         * default to sort by all groupby fields
+         */
+        @Override
+        public int compare(Map.Entry<List<String>, List<Double>> e1, Map.Entry<List<String>, List<Double>> e2) {
+            int r = 0;
+            List<String> keyList1 = e1.getKey();
+            List<Double> valueList1 = e1.getValue();
+            List<String> keyList2 = e2.getKey();
+            List<Double> valueList2 = e2.getValue();
+            for (SortOption so : sortOptions) {
+                int index = so.getIndex();
+                if (index == -1) {
+                    continue;
+                }
+                if (!so.isInGroupby()) {  // sort fields come from functions
+                    Double value1 = valueList1.get(index);
+                    Double value2 = valueList2.get(index);
+                    r = value1.compareTo(value2);
+                } else {  // sort fields come from groupby fields
+                    String key1 = keyList1.get(index);
+                    String key2 = keyList2.get(index);
+                    r = key1.compareTo(key2);
+                }
+                if (r == 0) {
+                    continue;
+                }
+                if (!so.isAscendant()) {
+                    r = -r;
+                }
+                return r;
+            }
+            // default to sort by groupby fields ascendently
+            if (r == 0) { // TODO is this check necessary
+                return new GroupbyFieldsComparator().compare(keyList1, keyList2);
+            }
+            return r;
         }
-	}
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostHierarchicalAggregateSort.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostHierarchicalAggregateSort.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostHierarchicalAggregateSort.java
index 7b0997b..bd475f9 100644
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostHierarchicalAggregateSort.java
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostHierarchicalAggregateSort.java
@@ -24,69 +24,71 @@ import java.util.TreeSet;
 
 public class PostHierarchicalAggregateSort {
 
-	private static SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortByValue(HierarchicalAggregateEntity entity, List<SortOption> sortOptions) {
-	    SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedEntries = new TreeSet<Map.Entry<String, HierarchicalAggregateEntity>>(new MapEntryComparator(sortOptions));
-	    sortedEntries.addAll(entity.getChildren().entrySet());
-	    return sortedEntries;
-	}
+    private static SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortByValue(HierarchicalAggregateEntity entity, List<SortOption> sortOptions) {
+        SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedEntries = new TreeSet<Map.Entry<String, HierarchicalAggregateEntity>>(new MapEntryComparator(sortOptions));
+        sortedEntries.addAll(entity.getChildren().entrySet());
+        return sortedEntries;
+    }
 
-	/**
-	 * sort aggregated results with sort options
+    /**
+     * sort aggregated results with sort options
      *
      * @param result
      * @param sortOptions
      * @return
      */
-	public static HierarchicalAggregateEntity sort(HierarchicalAggregateEntity result, List<SortOption> sortOptions){
-		SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> tmp = sortByValue(result, sortOptions);
-		result.setSortedList(tmp);
-		result.setChildren(null);
-		for(Map.Entry<String, HierarchicalAggregateEntity> entry : tmp){
-			sort(entry.getValue(), sortOptions);
-		}
-		return result;
-	}
+    public static HierarchicalAggregateEntity sort(HierarchicalAggregateEntity result, List<SortOption> sortOptions) {
+        SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> tmp = sortByValue(result, sortOptions);
+        result.setSortedList(tmp);
+        result.setChildren(null);
+        for (Map.Entry<String, HierarchicalAggregateEntity> entry : tmp) {
+            sort(entry.getValue(), sortOptions);
+        }
+        return result;
+    }
+
+    private static class MapEntryComparator implements Comparator<Map.Entry<String, HierarchicalAggregateEntity>> {
+        private List<SortOption> sortOptions;
 
-	private static class MapEntryComparator implements Comparator<Map.Entry<String, HierarchicalAggregateEntity>>{
-		private List<SortOption> sortOptions;
+        public MapEntryComparator(List<SortOption> sortOptions) {
+            this.sortOptions = sortOptions;
+        }
 
-		public MapEntryComparator(List<SortOption> sortOptions){
-			this.sortOptions = sortOptions;
-		}
+        /**
+         * default to sort by all groupby fields
+         */
+        @Override
+        public int compare(Map.Entry<String, HierarchicalAggregateEntity> e1, Map.Entry<String, HierarchicalAggregateEntity> e2) {
+            int r = 0;
+            String key1 = e1.getKey();
+            List<Double> valueList1 = e1.getValue().getValues();
+            String key2 = e2.getKey();
+            List<Double> valueList2 = e2.getValue().getValues();
+            for (SortOption so : sortOptions) {
+                int index = so.getIndex();
+                if (index == -1) {
+                    continue;
+                }
+                if (!so.isInGroupby()) {  // sort fields come from functions
+                    Double value1 = valueList1.get(index);
+                    Double value2 = valueList2.get(index);
+                    r = value1.compareTo(value2);
+                }
+                // sort fields come from groupby fields, then silently ignored
 
-		/**
-		 * default to sort by all groupby fields
-		 */
-		@Override
-        public int compare(Map.Entry<String, HierarchicalAggregateEntity> e1, Map.Entry<String, HierarchicalAggregateEntity> e2){
-			int r = 0;
-			String key1 = e1.getKey();
-			List<Double> valueList1 = e1.getValue().getValues();
-			String key2 = e2.getKey();
-			List<Double> valueList2 = e2.getValue().getValues();
-			for(SortOption so : sortOptions){
-				int index = so.getIndex();
-				if (index == -1) {
-					continue;
-				}
-				if(!so.isInGroupby()){  // sort fields come from functions
-					Double value1 = valueList1.get(index);
-					Double value2 = valueList2.get(index);
-					r = value1.compareTo(value2);
-				}  
-				// sort fields come from groupby fields, then silently ignored
-				
-				if(r == 0) continue;
-				if(!so.isAscendant()){
-					r = -r;
-				}
-				return r;
-			}
-			// default to sort by groupby fields ascendently
-			if(r ==0){
-				return key1.compareTo(key2);
-			}
-			return r;
+                if (r == 0) {
+                    continue;
+                }
+                if (!so.isAscendant()) {
+                    r = -r;
+                }
+                return r;
+            }
+            // default to sort by groupby fields ascendently
+            if (r == 0) {
+                return key1.compareTo(key2);
+            }
+            return r;
         }
-	}
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOption.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOption.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOption.java
index d1578ac..c848122 100644
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOption.java
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOption.java
@@ -19,31 +19,36 @@ package org.apache.eagle.query.aggregate.timeseries;
 /**
  * sum(field1), max(field2) groupby(field3, field4) sort by field1 asc, field3 desc
  * There are 2 SortOption object, then
- * the 1st one is inGroupby=false, index=0, ascendent=true
- * the 2nd one is inGroupby=true, index=1, ascendent=false
+ * the 1st one is inGroupby = false, index=0, ascendent=true
+ * the 2nd one is inGroupby = true, index=1, ascendent=false
  *
  */
 public class SortOption {
-	private boolean inGroupby; // sort field defaultly is not from groupby fields 
-	private int index; // index relative to list of groupby fields or list of functions
-	private boolean ascendant; //asc or desc
+    private boolean inGroupby; // sort field defaultly is not from groupby fields
+    private int index; // index relative to list of groupby fields or list of functions
+    private boolean ascendant; //asc or desc
 
-	public boolean isInGroupby() {
-		return inGroupby;
-	}
-	public void setInGroupby(boolean inGroupby) {
-		this.inGroupby = inGroupby;
-	}
-	public int getIndex() {
-		return index;
-	}
-	public void setIndex(int index) {
-		this.index = index;
-	}
-	public boolean isAscendant() {
-		return ascendant;
-	}
-	public void setAscendant(boolean ascendant) {
-		this.ascendant = ascendant;
-	}
+    public boolean isInGroupby() {
+        return inGroupby;
+    }
+
+    public void setInGroupby(boolean inGroupby) {
+        this.inGroupby = inGroupby;
+    }
+
+    public int getIndex() {
+        return index;
+    }
+
+    public void setIndex(int index) {
+        this.index = index;
+    }
+
+    public boolean isAscendant() {
+        return ascendant;
+    }
+
+    public void setAscendant(boolean ascendant) {
+        this.ascendant = ascendant;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOptionsParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOptionsParser.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOptionsParser.java
index 1360e0c..2457b4e 100644
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOptionsParser.java
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOptionsParser.java
@@ -25,45 +25,45 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SortOptionsParser {
-	private static final Logger LOG = LoggerFactory.getLogger(SortOptionsParser.class);
-	private static Pattern pattern = Pattern.compile("^(.+)\\s+(asc|desc)$");
-		
-	public static List<SortOption> parse(List<String> groupbyFields, List<String> aggregatedFields, List<String> sortOptions, List<String> sortFields){
-		List<SortOption> list = new ArrayList<SortOption>();
-		for(String sortOption : sortOptions){
-			Matcher m = pattern.matcher(sortOption);
-			if(!m.find()){
-				throw new IllegalArgumentException("sort option must have the format of <groupbyfield|function> asc|desc");
-			}
-			String field = m.group(1);
-			if (sortFields != null) {
-				sortFields.add(field);
-			}
-			SortOption so = new SortOption();
-			list.add(so);
-			so.setAscendant(m.group(2).equals("asc") ? true : false);
-			int index = aggregatedFields.indexOf(field); 
-			if(index > -1){
-				so.setInGroupby(false);
-				so.setIndex(index);
-				continue;
-			}
-			if(groupbyFields != null){  // if groupbyFields is not provided, ignore this sort field
-				index = groupbyFields.indexOf(field);
-				if(index > -1){
-					so.setInGroupby(true);
-					so.setIndex(index);
-					continue;
-				}
-			}
-			logNonExistingSortByField(field);
-			so.setInGroupby(false);
-			so.setIndex(-1);
-		}
-		return list;
-	}
-	
-	private static void logNonExistingSortByField(String sortByField){
-		LOG.warn("Sortby field is neither in aggregated fields or groupby fields, ignore " + sortByField);
-	}
+    private static final Logger LOG = LoggerFactory.getLogger(SortOptionsParser.class);
+    private static Pattern pattern = Pattern.compile("^(.+)\\s+(asc|desc)$");
+
+    public static List<SortOption> parse(List<String> groupbyFields, List<String> aggregatedFields, List<String> sortOptions, List<String> sortFields) {
+        List<SortOption> list = new ArrayList<SortOption>();
+        for (String sortOption : sortOptions) {
+            Matcher m = pattern.matcher(sortOption);
+            if (!m.find()) {
+                throw new IllegalArgumentException("sort option must have the format of <groupbyfield|function> asc|desc");
+            }
+            String field = m.group(1);
+            if (sortFields != null) {
+                sortFields.add(field);
+            }
+            SortOption so = new SortOption();
+            list.add(so);
+            so.setAscendant(m.group(2).equals("asc") ? true : false);
+            int index = aggregatedFields.indexOf(field);
+            if (index > -1) {
+                so.setInGroupby(false);
+                so.setIndex(index);
+                continue;
+            }
+            if (groupbyFields != null) {  // if groupbyFields is not provided, ignore this sort field
+                index = groupbyFields.indexOf(field);
+                if (index > -1) {
+                    so.setInGroupby(true);
+                    so.setIndex(index);
+                    continue;
+                }
+            }
+            logNonExistingSortByField(field);
+            so.setInGroupby(false);
+            so.setIndex(-1);
+        }
+        return list;
+    }
+
+    private static void logNonExistingSortByField(String sortByField) {
+        LOG.warn("Sortby field is neither in aggregated fields or groupby fields, ignore " + sortByField);
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedAggregator.java
index d8b781e..f4eabcd 100755
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedAggregator.java
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedAggregator.java
@@ -18,18 +18,18 @@ package org.apache.eagle.query.aggregate.timeseries;
 
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 
-public class SynchronizedAggregator implements Aggregator{
-	private Object mutex = new Object();
-	private Aggregator agg;
-	
-	public SynchronizedAggregator(Aggregator agg){
-		this.agg = agg;
-	}
-	
-	@Override
-	public void accumulate(TaggedLogAPIEntity entity) throws Exception{
-		synchronized(mutex){
-			agg.accumulate(entity);
-		}
-	}
-}	
+public class SynchronizedAggregator implements Aggregator {
+    private Object mutex = new Object();
+    private Aggregator agg;
+
+    public SynchronizedAggregator(Aggregator agg) {
+        this.agg = agg;
+    }
+
+    @Override
+    public void accumulate(TaggedLogAPIEntity entity) throws Exception {
+        synchronized (mutex) {
+            agg.accumulate(entity);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedEntityCreationListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedEntityCreationListener.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedEntityCreationListener.java
index 7c1412e..baa89be 100644
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedEntityCreationListener.java
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedEntityCreationListener.java
@@ -19,18 +19,18 @@ package org.apache.eagle.query.aggregate.timeseries;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.log.entity.EntityCreationListener;
 
-public class SynchronizedEntityCreationListener implements EntityCreationListener{
-	private Object mutex = new Object();
-	private EntityCreationListener listener;
-	
-	public SynchronizedEntityCreationListener(EntityCreationListener listener){
-		this.listener = listener;
-	}
-	
-	@Override
-	public void entityCreated(TaggedLogAPIEntity entity) throws Exception{
-		synchronized(mutex){
-			listener.entityCreated(entity);
-		}
-	}
+public class SynchronizedEntityCreationListener implements EntityCreationListener {
+    private Object mutex = new Object();
+    private EntityCreationListener listener;
+
+    public SynchronizedEntityCreationListener(EntityCreationListener listener) {
+        this.listener = listener;
+    }
+
+    @Override
+    public void entityCreated(TaggedLogAPIEntity entity) throws Exception {
+        synchronized (mutex) {
+            listener.entityCreated(entity);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesAggregator.java
index 5bebe13..e142657 100755
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesAggregator.java
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesAggregator.java
@@ -29,7 +29,7 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * TODO Assuming that data point comes in the sequence of occurrence time desc or asc would 
+ * TODO Assuming that data point comes in the sequence of occurrence time desc or asc would
  * save memory for holding all the data in the memory
  *
  * <h3>Aggregate Bucket Structure</h3>
@@ -41,129 +41,135 @@ import java.util.Map;
  *
  */
 public class TimeSeriesAggregator extends FlatAggregator implements GroupbyKeyAggregatable {
-	private final static Logger LOG = LoggerFactory.getLogger(TimeSeriesAggregator.class);
-	private static final int DEFAULT_DATAPOINT_MAX_COUNT = 1000;
-	private long startTime;
-	private long endTime;
-	private long intervalms;
-	private int numFunctions;
-	private int ignoredEntityCounter = 0;
-	
-	public TimeSeriesAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields,
-			long startTime, long endTime, long intervalms){
-		super(groupbyFields, aggregateFuntionTypes, aggregatedFields);
-		// guard to avoid too many data points returned
-//		validateTimeRange(startTime, endTime, intervalms);
-		this.startTime = startTime;
-		this.endTime = endTime;
-		this.intervalms = intervalms;
-		this.numFunctions = aggregateFuntionTypes.size();
-	}
+    private static final Logger LOG = LoggerFactory.getLogger(TimeSeriesAggregator.class);
+    private static final int DEFAULT_DATAPOINT_MAX_COUNT = 1000;
+    private long startTime;
+    private long endTime;
+    private long intervalms;
+    private int numFunctions;
+    private int ignoredEntityCounter = 0;
 
-//	@Deprecated
-//	public static void validateTimeRange(long startTime, long endTime, long intervalms){
-//		if(startTime >= endTime || intervalms <= 0){
-//			throw new IllegalArgumentException("invalid argument, startTime should be less than endTime and interval must be greater than 0, starTime is " + startTime + " and endTime is " + endTime + ", interval is " + intervalms);
-//		}
-//		if((endTime-startTime)/intervalms > DEFAULT_DATAPOINT_MAX_COUNT){
-//			throw new IllegalArgumentException("invalid argument, # of datapoints should be less than " + DEFAULT_DATAPOINT_MAX_COUNT + ", current # of datapoints is " + (endTime-startTime)/intervalms);
-//		}
-//	}
-	
-	public void accumulate(TaggedLogAPIEntity entity) throws Exception{
-		List<String> groupbyFieldValues = createGroup(entity);
-		// TODO: make sure timestamp be in range of this.startTime to this.endTime in outer side
-		// guard the time range to avoid to accumulate entities whose timestamp is bigger than endTime
-		if(entity.getTimestamp() >= this.endTime || entity.getTimestamp() < this.startTime){
-			if(LOG.isDebugEnabled()) LOG.debug("Ignore in-coming entity whose timestamp > endTime or < startTime, timestamp: " + entity.getTimestamp() + ", startTime:" + startTime + ", endTime:" + endTime);
-			this.ignoredEntityCounter ++;
-			return;
-		}
-		// time series bucket index
-		long located =(entity.getTimestamp() - startTime)/intervalms; 
-		groupbyFieldValues.add(String.valueOf(located));
-		List<Double> preAggregatedValues = createPreAggregatedValues(entity);
-		bucket.addDatapoint(groupbyFieldValues, preAggregatedValues);
-	}
-	
-	public Map<List<String>, List<Double>> result(){
-		if(this.ignoredEntityCounter > 0)
-			LOG.warn("Ignored "+this.ignoredEntityCounter+" entities for reason: timestamp > "+this.endTime+" or < "+this.startTime);
-		return bucket.result();
-	}
+    public TimeSeriesAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields,
+                                long startTime, long endTime, long intervalms) {
+        super(groupbyFields, aggregateFuntionTypes, aggregatedFields);
+        // guard to avoid too many data points returned
+        //      validateTimeRange(startTime, endTime, intervalms);
+        this.startTime = startTime;
+        this.endTime = endTime;
+        this.intervalms = intervalms;
+        this.numFunctions = aggregateFuntionTypes.size();
+    }
 
-	/**
-	 * Support new aggregate result
-	 *
-	 * @return
-	 */
-	@Override
-	public List<GroupbyKeyValue> getGroupbyKeyValues(){
-		if(this.ignoredEntityCounter > 0)
-			LOG.warn("Ignored "+this.ignoredEntityCounter+" entities for reason: timestamp > "+this.endTime+" or < "+this.startTime);
-		return bucket.getGroupbyKeyValue();
-	}
-	
-	public Map<List<String>, List<double[]>> getMetric(){
-		// groupbyfields+timeseriesbucket --> aggregatedvalues for different function
-		Map<List<String>, List<Double>> result = bucket.result();
-//		Map<List<String>, List<double[]>> timeseriesDatapoints = new HashMap<List<String>, List<double[]>>();
-//		/**
-//		 * bug fix: startTime is inclusive and endTime is exclusive
-//		 */
-////		int numDatapoints =(int)((endTime-startTime)/intervalms + 1);
-//		int numDatapoints =(int)((endTime-1-startTime)/intervalms + 1);
-//		for(Map.Entry<List<String>, List<Double>> entry : result.entrySet()){
-//			// get groups
-//			List<String> groupbyFields = entry.getKey();
-//			List<String> copy = new ArrayList<String>(groupbyFields);
-//			String strTimeseriesIndex = copy.remove(copy.size()-1);
-//			List<double[]> functionValues = timeseriesDatapoints.get(copy);
-//			if(functionValues == null){
-//				functionValues = new ArrayList<double[]>();
-//				timeseriesDatapoints.put(copy, functionValues);
-//				for(int i=0; i<numFunctions; i++){
-//					functionValues.add(new double[numDatapoints]);
-//				}
-//			}
-//			int timeseriesIndex = Integer.valueOf(strTimeseriesIndex);
-//			int functionIndex = 0;
-//			for(double[] values : functionValues){
-//				values[timeseriesIndex] = entry.getValue().get(functionIndex);
-//				functionIndex++;
-//			}
-//		}
-//		return timeseriesDatapoints;
-		return toMetric(result,(int)((endTime-1-startTime)/intervalms + 1),this.numFunctions);
-	}
+    //  @Deprecated
+    //  public static void validateTimeRange(long startTime, long endTime, long intervalms) {
+    //      if (startTime >= endTime || intervalms <= 0) {
+    //          throw new IllegalArgumentException("invalid argument, startTime should be less than endTime and "
+    //                + "interval must be greater than 0, starTime is " + startTime + " and endTime is " + endTime + ", interval is " + intervalms);
+    //      }
+    //      if ((endTime-startTime)/intervalms > DEFAULT_DATAPOINT_MAX_COUNT) {
+    //          throw new IllegalArgumentException("invalid argument, # of datapoints should be less than " + DEFAULT_DATAPOINT_MAX_COUNT
+    //                  + ", current # of datapoints is " + (endTime-startTime)/intervalms);
+    //      }
+    //  }
 
-	public static Map<List<String>, List<double[]>> toMetric(Map<List<String>, List<Double>> result,int numDatapoints,int numFunctions){
-		Map<List<String>, List<double[]>> timeseriesDatapoints = new HashMap<List<String>, List<double[]>>();
-		/**
-		 * bug fix: startTime is inclusive and endTime is exclusive
-		 */
-//		int numDatapoints =(int)((endTime-startTime)/intervalms + 1);
-//		int numDatapoints =(int)((endTime-1-startTime)/intervalms + 1);
-		for(Map.Entry<List<String>, List<Double>> entry : result.entrySet()){
-			// get groups
-			List<String> groupbyFields = entry.getKey();
-			List<String> copy = new ArrayList<String>(groupbyFields);
-			String strTimeseriesIndex = copy.remove(copy.size()-1);
-			List<double[]> functionValues = timeseriesDatapoints.get(copy);
-			if(functionValues == null){
-				functionValues = new ArrayList<double[]>();
-				timeseriesDatapoints.put(copy, functionValues);
-				for(int i=0; i<numFunctions; i++){
-					functionValues.add(new double[numDatapoints]);
-				}
-			}
-			int timeseriesIndex = Integer.valueOf(strTimeseriesIndex);
-			int functionIndex = 0;
-			for(double[] values : functionValues){
-				values[timeseriesIndex] = entry.getValue().get(functionIndex);
-				functionIndex++;
-			}
-		}
-		return timeseriesDatapoints;
-	}
+    public void accumulate(TaggedLogAPIEntity entity) throws Exception {
+        List<String> groupbyFieldValues = createGroup(entity);
+        // TODO: make sure timestamp be in range of this.startTime to this.endTime in outer side
+        // guard the time range to avoid to accumulate entities whose timestamp is bigger than endTime
+        if (entity.getTimestamp() >= this.endTime || entity.getTimestamp() < this.startTime) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Ignore in-coming entity whose timestamp > endTime or < startTime, timestamp: " + entity.getTimestamp() + ", startTime:" + startTime + ", endTime:" + endTime);
+            }
+            this.ignoredEntityCounter ++;
+            return;
+        }
+        // time series bucket index
+        long located = (entity.getTimestamp() - startTime) / intervalms;
+        groupbyFieldValues.add(String.valueOf(located));
+        List<Double> preAggregatedValues = createPreAggregatedValues(entity);
+        bucket.addDatapoint(groupbyFieldValues, preAggregatedValues);
+    }
+
+    public Map<List<String>, List<Double>> result() {
+        if (this.ignoredEntityCounter > 0) {
+            LOG.warn("Ignored " + this.ignoredEntityCounter + " entities for reason: timestamp > " + this.endTime + " or < " + this.startTime);
+        }
+        return bucket.result();
+    }
+
+    /**
+     * Support new aggregate result
+     *
+     * @return
+     */
+    @Override
+    public List<GroupbyKeyValue> getGroupbyKeyValues() {
+        if (this.ignoredEntityCounter > 0) {
+            LOG.warn("Ignored " + this.ignoredEntityCounter + " entities for reason: timestamp > " + this.endTime + " or < " + this.startTime);
+        }
+        return bucket.getGroupbyKeyValue();
+    }
+
+    public Map<List<String>, List<double[]>> getMetric() {
+        // groupbyfields+timeseriesbucket --> aggregatedvalues for different function
+        Map<List<String>, List<Double>> result = bucket.result();
+        //      Map<List<String>, List<double[]>> timeseriesDatapoints = new HashMap<List<String>, List<double[]>>();
+        //      /**
+        //       * bug fix: startTime is inclusive and endTime is exclusive
+        //       */
+        ////        int numDatapoints =(int)((endTime-startTime)/intervalms + 1);
+        //      int numDatapoints =(int)((endTime-1-startTime)/intervalms + 1);
+        //      for (Map.Entry<List<String>, List<Double>> entry : result.entrySet()) {
+        //          // get groups
+        //          List<String> groupbyFields = entry.getKey();
+        //          List<String> copy = new ArrayList<String>(groupbyFields);
+        //          String strTimeseriesIndex = copy.remove(copy.size()-1);
+        //          List<double[]> functionValues = timeseriesDatapoints.get(copy);
+        //          if (functionValues == null) {
+        //              functionValues = new ArrayList<double[]>();
+        //              timeseriesDatapoints.put(copy, functionValues);
+        //              for (int i = 0; i<numFunctions; i++) {
+        //                  functionValues.add(new double[numDatapoints]);
+        //              }
+        //          }
+        //          int timeseriesIndex = Integer.valueOf(strTimeseriesIndex);
+        //          int functionIndex = 0;
+        //          for (double[] values : functionValues) {
+        //              values[timeseriesIndex] = entry.getValue().get(functionIndex);
+        //              functionIndex++;
+        //          }
+        //      }
+        //      return timeseriesDatapoints;
+        return toMetric(result,(int)((endTime - 1 - startTime) / intervalms + 1), this.numFunctions);
+    }
+
+    public static Map<List<String>, List<double[]>> toMetric(Map<List<String>, List<Double>> result,int numDatapoints,int numFunctions) {
+        Map<List<String>, List<double[]>> timeseriesDatapoints = new HashMap<List<String>, List<double[]>>();
+        /**
+         * bug fix: startTime is inclusive and endTime is exclusive
+         */
+        //      int numDatapoints = (int)((endTime-startTime)/intervalms + 1);
+        //      int numDatapoints = (int)((endTime-1-startTime)/intervalms + 1);
+        for (Map.Entry<List<String>, List<Double>> entry : result.entrySet()) {
+            // get groups
+            List<String> groupbyFields = entry.getKey();
+            List<String> copy = new ArrayList<String>(groupbyFields);
+            String strTimeseriesIndex = copy.remove(copy.size() - 1);
+            List<double[]> functionValues = timeseriesDatapoints.get(copy);
+            if (functionValues == null) {
+                functionValues = new ArrayList<double[]>();
+                timeseriesDatapoints.put(copy, functionValues);
+                for (int i = 0; i < numFunctions; i++) {
+                    functionValues.add(new double[numDatapoints]);
+                }
+            }
+            int timeseriesIndex = Integer.valueOf(strTimeseriesIndex);
+            int functionIndex = 0;
+            for (double[] values : functionValues) {
+                values[timeseriesIndex] = entry.getValue().get(functionIndex);
+                functionIndex++;
+            }
+        }
+        return timeseriesDatapoints;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesBucket.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesBucket.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesBucket.java
index d662658..78fa010 100644
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesBucket.java
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesBucket.java
@@ -26,51 +26,51 @@ import org.slf4j.LoggerFactory;
  * only numeric aggregation is supported and number type supported is double
  */
 public class TimeSeriesBucket {
-	private final static Logger LOG = LoggerFactory.getLogger(TimeSeriesBucket.class);
-	private long startTime;
-	private long endTime;
-	private long interval;
-	
-	// map of aggregation function to aggregated values 
-	List<double[]> aggregatedValues = new ArrayList<double[]>();
-	
-	// align from the startTime
-	/**
-	 * 
-	 * @param startTime milliseconds
-	 * @param endTime milliseconds
-	 * @param intervalMillseconds
-	 * @param aggFunctions
-	 */
-	public TimeSeriesBucket(long startTime, long endTime, long intervalms, int numAggFunctions){
-		int count =(int)((endTime-startTime)/intervalms);
-		for(int i=0; i<numAggFunctions; i++){
-			aggregatedValues.add(new double[count]);
-		}
-	}
-	
-	/**
-	 * add datapoint which has a list of values for different aggregate functions
-	 * for example, sum(numHosts), count(*), avg(timespan) etc
-	 * @param timestamp
-	 * @param values
-	 */
-	public void addDataPoint(long timestamp, List<Double> values){
-		// locate timeseries bucket
-		if(timestamp < startTime || timestamp > endTime){
-			LOG.warn("timestamp<startTime or timestamp>endTime, ignore this datapoint." + timestamp + "," + startTime + ":" + endTime);
-			return;
-		}
-		int located =(int)((timestamp - startTime)/interval);
-		int index = 0;
-		for(Double src : values){
-			double[] timeSeriesValues = aggregatedValues.get(index);
-			timeSeriesValues[located] += src;
-			index++;
-		}
-	}
-	
-	public List<double[]> aggregatedValues(){
-		return this.aggregatedValues;
-	}
+    private static final Logger LOG = LoggerFactory.getLogger(TimeSeriesBucket.class);
+    private long startTime;
+    private long endTime;
+    private long interval;
+
+    // map of aggregation function to aggregated values
+    List<double[]> aggregatedValues = new ArrayList<double[]>();
+
+    // align from the startTime
+    /**
+     *
+     * @param startTime milliseconds
+     * @param endTime milliseconds
+     * @param intervalMillseconds
+     * @param aggFunctions
+     */
+    public TimeSeriesBucket(long startTime, long endTime, long intervalms, int numAggFunctions) {
+        int count = (int)((endTime - startTime) / intervalms);
+        for (int i = 0; i < numAggFunctions; i++) {
+            aggregatedValues.add(new double[count]);
+        }
+    }
+
+    /**
+     * add datapoint which has a list of values for different aggregate functions
+     * for example, sum(numHosts), count(*), avg(timespan) etc
+     * @param timestamp
+     * @param values
+     */
+    public void addDataPoint(long timestamp, List<Double> values) {
+        // locate timeseries bucket
+        if (timestamp < startTime || timestamp > endTime) {
+            LOG.warn("timestamp<startTime or timestamp>endTime, ignore this datapoint." + timestamp + "," + startTime + ":" + endTime);
+            return;
+        }
+        int located = (int)((timestamp - startTime) / interval);
+        int index = 0;
+        for (Double src : values) {
+            double[] timeSeriesValues = aggregatedValues.get(index);
+            timeSeriesValues[located] += src;
+            index++;
+        }
+    }
+
+    public List<double[]> aggregatedValues() {
+        return this.aggregatedValues;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesPostFlatAggregateSort.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesPostFlatAggregateSort.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesPostFlatAggregateSort.java
index c0a6e06..ae00fdf 100644
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesPostFlatAggregateSort.java
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesPostFlatAggregateSort.java
@@ -25,127 +25,127 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 
 public class TimeSeriesPostFlatAggregateSort {
-	// private static final Logger logger =
-	// LoggerFactory.getLogger(PostFlatAggregateSort.class);
-
-	private static SortedSet<Map.Entry<List<String>, List<Double>>> sortByValue(
-			Map<List<String>, List<Double>> mapForSort,
-			List<SortOption> sortOptions) {
-		SortedSet<Map.Entry<List<String>, List<Double>>> sortedEntries = new TreeSet<Map.Entry<List<String>, List<Double>>>(
-				new MapEntryComparator(sortOptions));
-		sortedEntries.addAll(mapForSort.entrySet());
-		return sortedEntries;
-	}
-
-	/**
-	 * sort aggregated results with sort options
-	 * 
-	 * @param entity
-	 */
-	public static List<Map.Entry<List<String>, List<double[]>>> sort(
-			Map<List<String>, List<Double>> mapForSort,
-			Map<List<String>, List<double[]>> valueMap,
-			List<SortOption> sortOptions, int topN) {
-
-		processIndex(sortOptions);
-		List<Map.Entry<List<String>, List<double[]>>> result = new ArrayList<Map.Entry<List<String>, List<double[]>>>();
-		SortedSet<Map.Entry<List<String>, List<Double>>> sortedSet = sortByValue(
-				mapForSort, sortOptions);
-		for (Map.Entry<List<String>, List<Double>> entry : sortedSet) {
-			List<String> key = entry.getKey();
-			List<double[]> value = valueMap.get(key);
-			if (value != null) {
-				Map.Entry<List<String>, List<double[]>> newEntry = new ImmutableEntry<List<String>, List<double[]>>(key, value);
-				result.add(newEntry);
-				if (topN > 0 && result.size() >= topN) {
-					break;
-				}
-			}
-		}
-		return result;
-	}
-
-	private static void processIndex(List<SortOption> sortOptions) {
-		for (int i = 0; i < sortOptions.size(); ++i) {
-			SortOption so = sortOptions.get(i);
-			so.setIndex(i);
-		}
-	}
-
-	private static class MapEntryComparator implements
-			Comparator<Map.Entry<List<String>, List<Double>>> {
-		private List<SortOption> sortOptions;
-
-		public MapEntryComparator(List<SortOption> sortOptions) {
-			this.sortOptions = sortOptions;
-		}
-
-		/**
-		 * default to sort by all groupby fields
-		 */
-		@Override
-		public int compare(Map.Entry<List<String>, List<Double>> e1,
-				Map.Entry<List<String>, List<Double>> e2) {
-			int r = 0;
-			List<String> keyList1 = e1.getKey();
-			List<Double> valueList1 = e1.getValue();
-			List<String> keyList2 = e2.getKey();
-			List<Double> valueList2 = e2.getValue();
-			for (SortOption so : sortOptions) {
-				int index = so.getIndex();
-				if (index == -1) {
-					continue;
-				}
-				if (!so.isInGroupby()) { // sort fields come from functions
-					Double value1 = valueList1.get(index);
-					Double value2 = valueList2.get(index);
-					r = value1.compareTo(value2);
-				} else { // sort fields come from groupby fields
-					String key1 = keyList1.get(index);
-					String key2 = keyList2.get(index);
-					r = key1.compareTo(key2);
-				}
-				if (r == 0)
-					continue;
-				if (!so.isAscendant()) {
-					r = -r;
-				}
-				return r;
-			}
-			// default to sort by groupby fields ascendently
-			if (r == 0) { // TODO is this check necessary
-				return new GroupbyFieldsComparator()
-						.compare(keyList1, keyList2);
-			}
-			return r;
-		}
-	}
-
-	static class ImmutableEntry<K, V> implements Map.Entry<K, V>, Serializable {
-		private final K key;
-		private final V value;
-
-		ImmutableEntry(K key, V value) {
-			this.key = key;
-			this.value = value;
-		}
-
-		@Override
-		public K getKey() {
-			return key;
-		}
-
-		@Override
-		public V getValue() {
-			return value;
-		}
-
-		@Override
-		public final V setValue(V value) {
-			throw new UnsupportedOperationException();
-		}
-
-		private static final long serialVersionUID = 0;
-	}
+    // private static final Logger logger =
+    // LoggerFactory.getLogger(PostFlatAggregateSort.class);
+
+    private static SortedSet<Map.Entry<List<String>, List<Double>>> sortByValue(
+                                                                                Map<List<String>, List<Double>> mapForSort,
+                                                                                List<SortOption> sortOptions) {
+        SortedSet<Map.Entry<List<String>, List<Double>>> sortedEntries = new TreeSet<Map.Entry<List<String>, List<Double>>>(
+            new MapEntryComparator(sortOptions));
+        sortedEntries.addAll(mapForSort.entrySet());
+        return sortedEntries;
+    }
+
+    /**
+     * sort aggregated results with sort options
+     *
+     * @param entity
+     */
+    public static List<Map.Entry<List<String>, List<double[]>>> sort(
+                                                                     Map<List<String>, List<Double>> mapForSort,
+                                                                     Map<List<String>, List<double[]>> valueMap,
+                                                                     List<SortOption> sortOptions, int topN) {
+
+        processIndex(sortOptions);
+        List<Map.Entry<List<String>, List<double[]>>> result = new ArrayList<Map.Entry<List<String>, List<double[]>>>();
+        SortedSet<Map.Entry<List<String>, List<Double>>> sortedSet = sortByValue(
+                                                                                 mapForSort, sortOptions);
+        for (Map.Entry<List<String>, List<Double>> entry : sortedSet) {
+            List<String> key = entry.getKey();
+            List<double[]> value = valueMap.get(key);
+            if (value != null) {
+                Map.Entry<List<String>, List<double[]>> newEntry = new ImmutableEntry<List<String>, List<double[]>>(key, value);
+                result.add(newEntry);
+                if (topN > 0 && result.size() >= topN) {
+                    break;
+                }
+            }
+        }
+        return result;
+    }
+
+    private static void processIndex(List<SortOption> sortOptions) {
+        for (int i = 0; i < sortOptions.size(); ++i) {
+            SortOption so = sortOptions.get(i);
+            so.setIndex(i);
+        }
+    }
+
+    private static class MapEntryComparator implements Comparator<Map.Entry<List<String>, List<Double>>> {
+        private List<SortOption> sortOptions;
+
+        public MapEntryComparator(List<SortOption> sortOptions) {
+            this.sortOptions = sortOptions;
+        }
+
+        /**
+         * default to sort by all groupby fields
+         */
+        @Override
+        public int compare(Map.Entry<List<String>, List<Double>> e1,
+                           Map.Entry<List<String>, List<Double>> e2) {
+            int r = 0;
+            List<String> keyList1 = e1.getKey();
+            List<Double> valueList1 = e1.getValue();
+            List<String> keyList2 = e2.getKey();
+            List<Double> valueList2 = e2.getValue();
+            for (SortOption so : sortOptions) {
+                int index = so.getIndex();
+                if (index == -1) {
+                    continue;
+                }
+                if (!so.isInGroupby()) { // sort fields come from functions
+                    Double value1 = valueList1.get(index);
+                    Double value2 = valueList2.get(index);
+                    r = value1.compareTo(value2);
+                } else { // sort fields come from groupby fields
+                    String key1 = keyList1.get(index);
+                    String key2 = keyList2.get(index);
+                    r = key1.compareTo(key2);
+                }
+                if (r == 0) {
+                    continue;
+                }
+                if (!so.isAscendant()) {
+                    r = -r;
+                }
+                return r;
+            }
+            // default to sort by groupby fields ascendently
+            if (r == 0) { // TODO is this check necessary
+                return new GroupbyFieldsComparator()
+                    .compare(keyList1, keyList2);
+            }
+            return r;
+        }
+    }
+
+    static class ImmutableEntry<K, V> implements Map.Entry<K, V>, Serializable {
+        private final K key;
+        private final V value;
+
+        ImmutableEntry(K key, V value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        @Override
+        public K getKey() {
+            return key;
+        }
+
+        @Override
+        public V getValue() {
+            return value;
+        }
+
+        @Override
+        public final V setValue(V value) {
+            throw new UnsupportedOperationException();
+        }
+
+        private static final long serialVersionUID = 0;
+    }
 
 }