You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2015/11/19 11:47:16 UTC

[09/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46] Rename package name as "org.apache.eagle"

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/raw/RawAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/raw/RawAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/raw/RawAggregator.java
deleted file mode 100755
index e94487e..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/raw/RawAggregator.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.raw;
-
-import eagle.log.entity.QualifierCreationListener;
-import eagle.log.entity.meta.EntityDefinition;
-import eagle.query.aggregate.AggregateFunctionType;
-
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-
-public class RawAggregator implements QualifierCreationListener,GroupbyKeyAggregatable {
-	private List<String> groupbyFields;
-	private GroupbyKey key;
-	private static final byte[] UNASSIGNED = "unassigned".getBytes();
-	private RawGroupbyBucket bucket;
-
-	public RawAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFunctionTypes, List<String> aggregatedFields, EntityDefinition ed){
-		this.groupbyFields = groupbyFields;
-		key = new GroupbyKey();
-		bucket = new RawGroupbyBucket(aggregateFunctionTypes, aggregatedFields, ed);
-	}
-
-	@Override
-	public void qualifierCreated(Map<String, byte[]> qualifiers){
-		key.clear();
-		ListIterator<String> it = groupbyFields.listIterator();
-		while(it.hasNext()){
-			byte[] groupbyFieldValue = qualifiers.get(it.next());
-			if(groupbyFieldValue == null){
-				key.addValue(UNASSIGNED);
-			}else{
-				key.addValue(groupbyFieldValue);
-			}
-		}
-		GroupbyKey newKey = null;
-		if(bucket.exists(key)){
-			newKey = key;
-		}else{
-			newKey = new GroupbyKey(key);
-		}
-		
-		bucket.addDatapoint(newKey, qualifiers);
-	}
-
-	/**
-	 * @return
-	 */
-	public Map<List<String>, List<Double>> result(){
-		return bucket.result();
-	}
-
-	public List<GroupbyKeyValue> getGroupbyKeyValues(){
-		return bucket.groupbyKeyValues();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/raw/RawGroupbyBucket.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/raw/RawGroupbyBucket.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/raw/RawGroupbyBucket.java
deleted file mode 100755
index e633723..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/raw/RawGroupbyBucket.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.raw;
-
-import eagle.log.entity.EntityQualifierUtils;
-import eagle.log.entity.GenericMetricEntity;
-import eagle.log.entity.meta.*;
-import eagle.log.expression.ExpressionParser;
-import eagle.query.aggregate.AggregateFunctionType;
-import eagle.query.parser.TokenConstant;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DoubleWritable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-public class RawGroupbyBucket {
-	private final static Logger LOG = LoggerFactory.getLogger(RawGroupbyBucket.class);
-
-	private List<String> aggregatedFields;
-	private EntityDefinition entityDefinition;
-
-	
-	private List<AggregateFunctionType> types;
-	private SortedMap<GroupbyKey, List<Function>> group2FunctionMap =
-			new TreeMap<GroupbyKey, List<Function>>(new GroupbyKeyComparator());
-
-	public RawGroupbyBucket(List<AggregateFunctionType> types, List<String> aggregatedFields, EntityDefinition ed){
-		this.types = types;
-		this.aggregatedFields = aggregatedFields;
-		this.entityDefinition = ed;
-	}
-
-	public boolean exists(GroupbyKey key){
-		return group2FunctionMap.containsKey(key);
-	}
-
-	public void addDatapoint(GroupbyKey groupbyKey, Map<String, byte[]> values){
-		// locate groupby bucket
-		List<Function> functions = group2FunctionMap.get(groupbyKey);
-		if(functions == null){
-			functions = new ArrayList<Function>();
-			for(AggregateFunctionType type : types){
-				FunctionFactory ff = FunctionFactory.locateFunctionFactory(type);
-				if(ff == null){
-					LOG.error("FunctionFactory of AggregationFunctionType:"+type+" is null");
-				}else{
-					functions.add(ff.createFunction());
-				}
-			}
-			group2FunctionMap.put(groupbyKey, functions);
-		}
-		ListIterator<Function> e1 = functions.listIterator();
-		ListIterator<String> e2 = aggregatedFields.listIterator();
-		while(e1.hasNext() && e2.hasNext()){
-			Function f = e1.next();
-			String aggregatedField = e2.next();
-			byte[] v = values.get(aggregatedField);
-			if(f instanceof Function.Count){ // handle count
-				if(entityDefinition.getMetricDefinition()==null) {
-					f.run(1.0);
-					continue;
-				}else if(v == null){
-					aggregatedField = GenericMetricEntity.VALUE_FIELD;
-					v = values.get(aggregatedField);
-				}
-			}
-			if(v != null){
-				Qualifier q = entityDefinition.getDisplayNameMap().get(aggregatedField);
-				EntitySerDeser<?> serDeser = q.getSerDeser();
-				// double d = 0.0;
-				if(serDeser instanceof IntSerDeser){
-					double d= (Integer)serDeser.deserialize(v);
-					f.run(d);
-				}else if(serDeser instanceof LongSerDeser){
-					double d = (Long)serDeser.deserialize(v);
-					f.run(d);
-				}else if(serDeser instanceof DoubleSerDeser){
-					double d = (Double)serDeser.deserialize(v);
-					f.run(d);
-				// TODO: support numeric array type that is not metric
-				}else if(serDeser instanceof DoubleArraySerDeser){
-					double[] d = ((DoubleArraySerDeser) serDeser).deserialize(v);
-					if(f instanceof Function.Count){
-						f.run(d.length);
-					} else {
-						for(double i:d) f.run(i);
-					}
-				}else if(serDeser instanceof IntArraySerDeser){
-					int[] d = ((IntArraySerDeser) serDeser).deserialize(v);
-					if(f instanceof Function.Count){
-						f.run(d.length);
-					}else{
-						for(int i:d) f.run(i);
-					}
-				}else{
-					if(LOG.isDebugEnabled()) LOG.debug("EntitySerDeser of field "+aggregatedField+" is not IntSerDeser or LongSerDeser or DoubleSerDeser or IntArraySerDeser or DoubleArraySerDeser, default as 0.0");
-				}
-			}else if(TokenConstant.isExpression(aggregatedField)){
-				String expression = TokenConstant.parseExpressionContent(aggregatedField);
-				try {
-					Map<String,Double> doubleMap = EntityQualifierUtils.bytesMapToDoubleMap(values, entityDefinition);
-					if(entityDefinition.getMetricDefinition() == null) {
-						double value = ExpressionParser.eval(expression,doubleMap);
-						// LOG.info("DEBUG: Eval "+expression +" = "+value);
-						f.run(value);
-					}else{
-						Qualifier qualifier = entityDefinition.getDisplayNameMap().get(GenericMetricEntity.VALUE_FIELD);
-						EntitySerDeser _serDeser = qualifier.getSerDeser();
-						byte[] valueBytes = values.get(GenericMetricEntity.VALUE_FIELD);
-						if( _serDeser instanceof DoubleArraySerDeser){
-							double[] d = (double[]) _serDeser.deserialize(valueBytes);
-							if(f instanceof Function.Count) {
-								f.run(d.length);
-							}else{
-								for(double i:d){
-									doubleMap.put(GenericMetricEntity.VALUE_FIELD,i);
-									f.run(ExpressionParser.eval(expression, doubleMap));
-								}
-							}
-						}else if(_serDeser instanceof IntArraySerDeser){
-							int[] d = (int[]) _serDeser.deserialize(valueBytes);
-							if(f instanceof Function.Count) {
-								f.run(d.length);
-							}else {
-								for (double i : d) {
-									doubleMap.put(GenericMetricEntity.VALUE_FIELD, i);
-									f.run(ExpressionParser.eval(expression, doubleMap));
-								}
-							}
-						}else{
-							double value = ExpressionParser.eval(expression,doubleMap);
-							f.run(value);
-						}
-					}
-				} catch (Exception e) {
-					LOG.error("Got exception to evaluate expression: "+expression+", exception: "+e.getMessage(),e);
-				}
-			}
-		}
-	}
-
-	/**
-	 * expensive operation - create objects and format the result
-	 * @return
-	 */
-	public List<GroupbyKeyValue> groupbyKeyValues(){
-		List<GroupbyKeyValue> results = new ArrayList<GroupbyKeyValue>();
-		for(Map.Entry<GroupbyKey, List<Function>> entry : this.group2FunctionMap.entrySet()){
-			GroupbyValue value = new GroupbyValue();
-			for(Function f : entry.getValue()){
-				value.add(new DoubleWritable(f.result()));
-				value.addMeta(f.count());
-			}
-			results.add(new GroupbyKeyValue(entry.getKey(),value));
-		}
-		return results;
-	}
-
-	/**
-	 * expensive operation - create objects and format the result
-	 * @return
-	 */
-	public Map<List<String>, List<Double>> result(){
-		Map<List<String>, List<Double>> result = new HashMap<List<String>, List<Double>>();
-		for(Map.Entry<GroupbyKey, List<Function>> entry : this.group2FunctionMap.entrySet()){
-			List<Double> values = new ArrayList<Double>();
-			for(Function f : entry.getValue()){
-				values.add(f.result());
-			}
-			GroupbyKey key = entry.getKey();
-			List<BytesWritable> list1 = key.getValue();
-			List<String> list2 = new ArrayList<String>();
-			for(BytesWritable e : list1){
-				list2.add(new String(e.copyBytes()));
-			}
-			result.put(list2, values);
-		}
-		return result;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/raw/WritableList.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/raw/WritableList.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/raw/WritableList.java
deleted file mode 100755
index db774c1..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/raw/WritableList.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.raw;
-
-import org.apache.hadoop.io.Writable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.lang.reflect.ParameterizedType;
-import java.util.ArrayList;
-
-/**
- * @since : 11/6/14,2014
- */
-public class WritableList<E extends Writable> extends ArrayList<E> implements Writable{
-	private Class<E> itemTypeClass;
-
-	public WritableList(Class<E> typeClass){
-		this.itemTypeClass = typeClass;
-	}
-
-	public WritableList(Class<E> typeClass,int initialCapacity){
-		super(initialCapacity);
-		this.itemTypeClass = typeClass;
-	}
-
-
-	/**
-	 * <h3> Get item class by </h3>
-	 * <pre>
-	 * (Class<E>) ((ParameterizedType)getClass().getGenericSuperclass()).getActualTypeArguments()[0];
-	 * </pre>
-	 */
-	@Deprecated
-	public WritableList(){
-		this.itemTypeClass = (Class<E>) ((ParameterizedType)getClass().getGenericSuperclass()).getActualTypeArguments()[0];
-	}
-
-	private void check() throws IOException{
-		if(this.itemTypeClass == null){
-			throw new IOException("Class Type of WritableArrayList<E extends Writable> is null");
-		}
-	}
-
-	public Class<E> getItemClass(){
-		return itemTypeClass;
-	}
-
-	/**
-	 * Serialize the fields of this object to <code>out</code>.
-	 *
-	 * @param out <code>DataOuput</code> to serialize this object into.
-	 * @throws java.io.IOException
-	 */
-	@Override
-	public void write(DataOutput out) throws IOException {
-		this.check();
-		out.writeInt(this.size());
-		for(Writable item: this){
-			item.write(out);
-		}
-	}
-
-	/**
-	 * Deserialize the fields of this object from <code>in</code>.
-	 * <p/>
-	 * <p>For efficiency, implementations should attempt to re-use storage in the
-	 * existing object where possible.</p>
-	 *
-	 * @param in <code>DataInput</code> to deseriablize this object from.
-	 * @throws java.io.IOException
-	 */
-	@Override
-	public void readFields(DataInput in) throws IOException {
-		this.check();
-		int size = in.readInt();
-		for(int i=0;i<size;i++){
-			try {
-				E item = itemTypeClass.newInstance();
-				item.readFields(in);
-				this.add(item);
-			} catch (InstantiationException e) {
-				throw new IOException("Got exception to create instance for class: "+itemTypeClass+": "+e.getMessage(),e);
-			} catch (IllegalAccessException e) {
-				throw new IOException("Got exception to create instance for class: "+itemTypeClass+": "+e.getMessage(),e);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/AbstractAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/AbstractAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/AbstractAggregator.java
deleted file mode 100755
index c857f5c..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/AbstractAggregator.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import eagle.log.entity.EntityCreationListener;
-import eagle.log.expression.ExpressionParser;
-import eagle.query.aggregate.AggregateFunctionType;
-import eagle.query.aggregate.IllegalAggregateFieldTypeException;
-import eagle.query.parser.TokenConstant;
-import org.apache.commons.beanutils.PropertyUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.beans.PropertyDescriptor;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.List;
-
-public abstract class AbstractAggregator implements Aggregator, EntityCreationListener{
-	private final static Logger LOG = LoggerFactory.getLogger(AbstractAggregator.class);
-
-	private static final String UNASSIGNED = "unassigned";
-	protected List<String> groupbyFields;
-	protected List<AggregateFunctionType> aggregateFunctionTypes;
-	protected List<String> aggregatedFields;
-	// a cache to know immediately if groupby field should come from tags(true) or qualifiers(false)
-	private Boolean[] _groupbyFieldPlacementCache;
-	private Method[] _aggregateFieldReflectedMethodCache;
-
-	public AbstractAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields){
-		this.groupbyFields = groupbyFields;
-		this.aggregateFunctionTypes = aggregateFuntionTypes;
-		this.aggregatedFields = aggregatedFields;
-		_aggregateFieldReflectedMethodCache = new Method[this.aggregatedFields.size()];
-		_groupbyFieldPlacementCache = new Boolean[this.groupbyFields.size()];
-	}
-	
-	@Override
-	public void entityCreated(TaggedLogAPIEntity entity) throws Exception{
-		accumulate(entity);
-	}
-	
-	public abstract Object result();
-	
-	protected String createGroupFromTags(TaggedLogAPIEntity entity, String groupbyField, int i){
-		String groupbyFieldValue = entity.getTags().get(groupbyField);
-		if(groupbyFieldValue != null){
-			_groupbyFieldPlacementCache[i] = true;
-			return groupbyFieldValue;
-		}
-		return null;
-	}
-	
-	protected String createGroupFromQualifiers(TaggedLogAPIEntity entity, String groupbyField, int i){
-		try{
-			PropertyDescriptor pd = PropertyUtils.getPropertyDescriptor(entity, groupbyField);
-			if(pd == null)
-				return null;
-//			_groupbyFieldPlacementCache.put(groupbyField, false);
-			_groupbyFieldPlacementCache[i] = false;
-			return (String)(pd.getReadMethod().invoke(entity));
-		}catch(NoSuchMethodException ex){
-			return null;
-		}catch(InvocationTargetException ex){
-			return null;
-		}catch(IllegalAccessException ex){
-			return null;
-		}
-	}
-	
-	protected String determineGroupbyFieldValue(TaggedLogAPIEntity entity, String groupbyField, int i){
-		Boolean placement = _groupbyFieldPlacementCache[i];
-		String groupbyFieldValue = null; 
-		if(placement != null){
-			groupbyFieldValue = placement.booleanValue() ? createGroupFromTags(entity, groupbyField, i) : createGroupFromQualifiers(entity, groupbyField, i); 
-		}else{
-			groupbyFieldValue = createGroupFromTags(entity, groupbyField, i);
-			if(groupbyFieldValue == null){
-				groupbyFieldValue = createGroupFromQualifiers(entity, groupbyField, i);
-			}
-		}
-		groupbyFieldValue = (groupbyFieldValue == null ? UNASSIGNED : groupbyFieldValue);
-		return groupbyFieldValue;
-	}
-	
-	/**
-	 * TODO For count aggregation, special treatment is the value is always 0 unless we support count(*) or count(<fieldname>) which counts number of rows or 
-	 * number of non-null field
-	 * For other aggregation, like sum,min,max,avg, we should resort to qualifiers
-	 * @param entity
-	 * @return
-	 */
-	protected List<Double> createPreAggregatedValues(TaggedLogAPIEntity entity) throws Exception{
-		List<Double> values = new ArrayList<Double>();
-		int functionIndex = 0;
-		for(AggregateFunctionType type : aggregateFunctionTypes){
-			if(type.name().equals(AggregateFunctionType.count.name())){
-				values.add(new Double(1));
-			}else{
-				// find value in qualifier by checking java bean
-				String aggregatedField = aggregatedFields.get(functionIndex);
-				if(TokenConstant.isExpression(aggregatedField)){
-					try {
-						String expr = TokenConstant.parseExpressionContent(aggregatedField);
-						values.add(ExpressionParser.eval(expr, entity));
-					}catch (Exception ex){
-						LOG.error("Failed to evaluate expression-based aggregation: " + aggregatedField, ex);
-						throw ex;
-					}
-				}else {
-					try {
-						Method m = _aggregateFieldReflectedMethodCache[functionIndex];
-						if (m == null) {
-//						pd = PropertyUtils.getPropertyDescriptor(entity, aggregatedField);
-//						if (pd == null) {
-//							final String errMsg = "Field/tag " + aggregatedField + " is not defined for entity " + entity.getClass().getSimpleName();
-//							logger.error(errMsg);
-//							throw new Exception(errMsg);
-//						}
-//						Object obj = pd.getReadMethod().invoke(entity);
-							String tmp = aggregatedField.substring(0, 1).toUpperCase() + aggregatedField.substring(1);
-							m = entity.getClass().getMethod("get" + tmp);
-							_aggregateFieldReflectedMethodCache[functionIndex] = m;
-						}
-						Object obj = m.invoke(entity);
-						values.add(numberToDouble(obj));
-					} catch (Exception ex) {
-						LOG.error("Cannot do aggregation for field " + aggregatedField, ex);
-						throw ex;
-					}
-				}
-			}
-			functionIndex++;
-		}
-		return values;
-	}
-	
-	/**
-	 * TODO this is a hack, we need elegant way to convert type to a broad precision
-     *
-	 * @param obj
-	 * @return
-	 */
-	protected Double numberToDouble(Object obj){
-		if(obj instanceof Double)
-			return (Double)obj;
-		if(obj instanceof Integer){
-			return new Double(((Integer)obj).doubleValue());
-		}
-		if(obj instanceof Long){
-			return new Double(((Long)obj).doubleValue());
-		}
-		// TODO hack to support string field for demo purpose, should be removed
-		if(obj == null){
-			return new Double(0.0);
-		}
-		if(obj instanceof String){
-			try{
-				return new Double((String)obj);
-			}catch(Exception ex){
-				LOG.warn("Datapoint ignored because it can not be converted to correct number for " + obj, ex);
-				return new Double(0.0);
-			}
-		}
-		
-		throw new IllegalAggregateFieldTypeException(obj.getClass().toString() + " type is not support. The aggregated field must be numeric type, int, long or double");
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/Aggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/Aggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/Aggregator.java
deleted file mode 100755
index fd07526..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/Aggregator.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import eagle.log.base.taggedlog.TaggedLogAPIEntity;
-
-public interface Aggregator {
-    /**
-     * Accumulate callback
-     *
-     * @param entity accumulated entity instance
-     * @throws Exception
-     */
-	public void accumulate(TaggedLogAPIEntity entity) throws Exception;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/EntityCreationListenerFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/EntityCreationListenerFactory.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/EntityCreationListenerFactory.java
deleted file mode 100644
index 2d492b7..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/EntityCreationListenerFactory.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import eagle.log.entity.EntityCreationListener;
-
-public class EntityCreationListenerFactory {
-	public static EntityCreationListener synchronizedEntityCreationListener(EntityCreationListener listener){
-		return new SynchronizedEntityCreationListener(listener);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/FlatAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/FlatAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/FlatAggregator.java
deleted file mode 100755
index dcc6b84..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/FlatAggregator.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import eagle.query.aggregate.AggregateFunctionType;
-
-/**
- * Not thread safe
- */
-public class FlatAggregator extends AbstractAggregator{
-	protected GroupbyBucket bucket;
-
-    /**
-     * @param groupbyFields
-     * @param aggregateFuntionTypes
-     * @param aggregatedFields
-     */
-	public FlatAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields){
-		super(groupbyFields, aggregateFuntionTypes, aggregatedFields);
-		bucket = new GroupbyBucket(this.aggregateFunctionTypes);
-	}
-	
-	public void accumulate(TaggedLogAPIEntity entity) throws Exception{
-		List<String> groupbyFieldValues = createGroup(entity);
-		List<Double> preAggregatedValues = createPreAggregatedValues(entity);
-		bucket.addDatapoint(groupbyFieldValues, preAggregatedValues);
-	}
-	
-	public Map<List<String>, List<Double>> result(){
-		return bucket.result(); 
-	}
-	
-	protected List<String> createGroup(TaggedLogAPIEntity entity){
-		List<String> groupbyFieldValues = new ArrayList<String>();
-		int i = 0;
-		for(String groupbyField : groupbyFields){
-			String groupbyFieldValue = determineGroupbyFieldValue(entity, groupbyField, i++);
-			groupbyFieldValues.add(groupbyFieldValue);
-		}
-		return groupbyFieldValues;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/GroupbyBucket.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/GroupbyBucket.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/GroupbyBucket.java
deleted file mode 100755
index 6475462..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/GroupbyBucket.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import eagle.query.QueryConstants;
-import eagle.query.aggregate.AggregateFunctionType;
-import eagle.query.aggregate.raw.GroupbyKey;
-import eagle.query.aggregate.raw.GroupbyKeyValue;
-import eagle.query.aggregate.raw.GroupbyValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class GroupbyBucket {
-	private final static Logger LOG = LoggerFactory.getLogger(GroupbyBucket.class);
-	
-	public static Map<String, FunctionFactory> _functionFactories = 
-			new HashMap<>();
-    
-	// TODO put this logic to AggregatorFunctionType
-	static{
-		_functionFactories.put(AggregateFunctionType.count.name(), new CountFactory());
-		_functionFactories.put(AggregateFunctionType.sum.name(), new SumFactory());
-		_functionFactories.put(AggregateFunctionType.min.name(), new MinFactory());
-		_functionFactories.put(AggregateFunctionType.max.name(), new MaxFactory());
-		_functionFactories.put(AggregateFunctionType.avg.name(), new AvgFactory());
-	}
-	
-	private List<AggregateFunctionType> types;
-//	private SortedMap<List<String>, List<Function>> group2FunctionMap = 
-//			new TreeMap<List<String>, List<Function>>(new GroupbyFieldsComparator());
-	
-	private Map<List<String>, List<Function>> group2FunctionMap = new HashMap<>(); //new GroupbyFieldsComparator());
-	
-	public GroupbyBucket(List<AggregateFunctionType> types){
-		this.types = types;
-	}
-	
-	public void addDatapoint(List<String> groupbyFieldValues, List<Double> values){
-		// LOG.info("DEBUG: addDatapoint: groupby=["+StringUtils.join(groupbyFieldValues,",")+"], values=["+StringUtils.join(values, ",")+"]");
-		
-		// locate groupby bucket
-		List<Function> functions = group2FunctionMap.get(groupbyFieldValues);
-		if(functions == null){
-			functions = new ArrayList<Function>();
-			for(AggregateFunctionType type : types){
-				functions.add(_functionFactories.get(type.name()).createFunction());
-			}
-			group2FunctionMap.put(groupbyFieldValues, functions);
-		}
-		int functionIndex = 0;
-		for(Double v : values){
-			functions.get(functionIndex).run(v);
-			functionIndex++;
-		}
-	}
-	
-	public Map<List<String>, List<Double>> result(){
-		Map<List<String>, List<Double>> result = new HashMap<List<String>, List<Double>>();
-		for(Map.Entry<List<String>, List<Function>> entry : this.group2FunctionMap.entrySet()){
-			List<Double> values = new ArrayList<Double>();
-			for(Function f : entry.getValue()){
-				values.add(f.result());
-			}
-			result.put(entry.getKey(), values);
-		}
-		return result;
-	}
-
-	public List<GroupbyKeyValue> getGroupbyKeyValue(){
-		List<GroupbyKeyValue>  results = new ArrayList<GroupbyKeyValue>();
-		
-		for(Map.Entry<List<String>, List<Function>> entry : this.group2FunctionMap.entrySet()){
-			GroupbyKey key = new GroupbyKey();
-			for(String keyStr:entry.getKey()){
-				try {
-					key.addValue(keyStr.getBytes(QueryConstants.CHARSET));
-				} catch (UnsupportedEncodingException e) {
-					LOG.error(e.getMessage(),e);
-				}
-			}
-			GroupbyValue value = new GroupbyValue();
-			for(Function f : entry.getValue()){
-				value.add(f.result());
-				value.addMeta(f.count());
-			}
-			results.add(new GroupbyKeyValue(key,value));
-		}
-		
-		return results;
-	}
-	
-	public static interface FunctionFactory{
-		public Function createFunction();
-	}
-	
-	public static abstract class Function{
-		protected int count;
-
-		public abstract void run(double v);
-		public abstract double result();
-		public int count(){
-			return count;
-		}
-		public void incrCount(){
-			count ++;
-		}
-	}
-
-	private static class CountFactory implements FunctionFactory{
-		@Override
-		public Function createFunction(){
-			return new Count();
-		}
-	}
-	
-	
-	private static class Count extends Sum{
-		public Count(){
-			super();
-		}
-	}
-	
-	private static class SumFactory implements FunctionFactory{
-		@Override
-		public Function createFunction(){
-			return new Sum();
-		}
-	}
-	
-	private static class Sum extends Function{
-		private double summary;
-		public Sum(){
-			this.summary = 0.0;
-		}
-		@Override
-		public void run(double v){
-			this.incrCount();
-			this.summary += v;
-		}
-		
-		@Override
-		public double result(){
-			return this.summary;
-		}
-	}
-	
-	private static class MinFactory implements FunctionFactory{
-		@Override
-		public Function createFunction(){
-			return new Min();
-		}
-	}
-	public static class Min extends Function{
-		private double minimum;
-		public Min(){
-			// TODO is this a bug, or only positive numeric calculation is supported
-			this.minimum = Double.MAX_VALUE;
-		}
-
-		@Override
-		public void run(double v){
-			if(v < minimum){
-				minimum = v;
-			}
-			this.incrCount();
-		}
-		
-		@Override
-		public double result(){
-			return minimum;
-		}
-	}
-	
-	private static class MaxFactory implements FunctionFactory{
-		@Override
-		public Function createFunction(){
-			return new Max();
-		}
-	}
-	public static class Max extends Function{
-		private double maximum;
-		public Max(){
-			// TODO is this a bug, or only positive numeric calculation is supported
-			this.maximum = 0.0;
-		}
-		@Override
-		public void run(double v){
-			if(v > maximum){
-				maximum = v;
-			}
-			this.incrCount();
-		}
-		
-		@Override
-		public double result(){
-			return maximum;
-		}
-	}
-	
-	private static class AvgFactory implements FunctionFactory{
-		@Override
-		public Function createFunction(){
-			return new Avg();
-		}
-	}
-	public static class Avg extends Function{
-		private double total;
-		public Avg(){
-			this.total = 0.0;
-		}
-		@Override
-		public void run(double v){
-			total += v;
-			this.incrCount();
-		}
-		@Override
-		public double result(){
-			return this.total/this.count;
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/GroupbyFieldsComparator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/GroupbyFieldsComparator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/GroupbyFieldsComparator.java
deleted file mode 100644
index 17e1602..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/GroupbyFieldsComparator.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import java.util.Comparator;
-import java.util.List;
-
-/**
- * this is default comparator for aggregation. The behavior is to sort by groupby fields ascendantly
- */
-public class GroupbyFieldsComparator implements Comparator<List<String>>{
-	@Override 
-    public int compare(List<String> list1, List<String> list2){
-		if(list1 == null || list2 == null || list1.size() != list2.size())
-			throw new IllegalArgumentException("2 list of groupby fields must be non-null and have the same size");
-		int r = 0;
-		int index = 0;
-		for(String s1 : list1){
-			r = s1.compareTo(list2.get(index++));
-			if(r != 0)
-				return r;
-		}
-		return r;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/HierarchicalAggregateEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/HierarchicalAggregateEntity.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/HierarchicalAggregateEntity.java
deleted file mode 100644
index 737af98..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/HierarchicalAggregateEntity.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.SortedSet;
-import java.util.TreeMap;
-
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
-public class HierarchicalAggregateEntity {
-	private String key;
-	private List<GroupbyBucket.Function> tmpValues = new ArrayList<GroupbyBucket.Function>();
-	private List<Double> values = new ArrayList<Double>();
-	private SortedMap<String, HierarchicalAggregateEntity> children = new TreeMap<String, HierarchicalAggregateEntity>();
-	private SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedList = null;
-
-	public SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> getSortedList() {
-		return sortedList;
-	}
-	public void setSortedList(
-			SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedList) {
-		this.sortedList = sortedList;
-	}
-	public List<GroupbyBucket.Function> getTmpValues() {
-		return tmpValues;
-	}
-	public void setTmpValues(List<GroupbyBucket.Function> tmpValues) {
-		this.tmpValues = tmpValues;
-	}
-	public String getKey() {
-		return key;
-	}
-	public void setKey(String key) {
-		this.key = key;
-	}
-	public List<Double> getValues() {
-		return values;
-	}
-	public void setValues(List<Double> values) {
-		this.values = values;
-	}
-	public SortedMap<String, HierarchicalAggregateEntity> getChildren() {
-		return children;
-	}
-	public void setChildren(SortedMap<String, HierarchicalAggregateEntity> children) {
-		this.children = children;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/HierarchicalAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/HierarchicalAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/HierarchicalAggregator.java
deleted file mode 100755
index e114d69..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/HierarchicalAggregator.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import java.util.List;
-import java.util.SortedMap;
-
-import eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import eagle.query.aggregate.AggregateFunctionType;
-
-public class HierarchicalAggregator extends AbstractAggregator{
-	private HierarchicalAggregateEntity root = new HierarchicalAggregateEntity();
-
-	public HierarchicalAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields){
-		super(groupbyFields, aggregateFuntionTypes, aggregatedFields);
-	}
-
-	public void accumulate(TaggedLogAPIEntity entity) throws Exception{
-		List<Double> preAggregatedValues = createPreAggregatedValues(entity);
-		// aggregate to root first
-		addDatapoint(root, preAggregatedValues);
-		// go through hierarchical tree
-		HierarchicalAggregateEntity current = root;
-		int i = 0;
-		for(String groupbyField : groupbyFields){
-			// determine groupbyFieldValue from tag or fields
-			String groupbyFieldValue = determineGroupbyFieldValue(entity, groupbyField, i);
-			SortedMap<String, HierarchicalAggregateEntity> children = current.getChildren();
-			if(children.get(groupbyFieldValue) == null){
-				HierarchicalAggregateEntity tmp = new HierarchicalAggregateEntity();
-				children.put(groupbyFieldValue, tmp);
-			}
-			children.get(groupbyFieldValue).setKey(groupbyFieldValue);
-			addDatapoint(children.get(groupbyFieldValue), preAggregatedValues);
-			current = children.get(groupbyFieldValue);
-		}
-	}
-
-	private void addDatapoint(HierarchicalAggregateEntity entity, List<Double> values){
-		List<GroupbyBucket.Function> functions = entity.getTmpValues();
-		// initialize list of function
-		if(functions.isEmpty()){
-			for(AggregateFunctionType type : aggregateFunctionTypes){
-				functions.add(GroupbyBucket._functionFactories.get(type.name()).createFunction());
-			}
-		}
-		int functionIndex = 0;
-		for(Double v : values){
-			functions.get(functionIndex).run(v);
-			functionIndex++;
-		}
-	}
-
-	private void finalizeHierarchicalAggregateEntity(HierarchicalAggregateEntity entity){
-		for(GroupbyBucket.Function f : entity.getTmpValues()){
-			entity.getValues().add(f.result());
-		}
-		for(HierarchicalAggregateEntity child : entity.getChildren().values()){
-			finalizeHierarchicalAggregateEntity(child);
-		}
-		entity.setTmpValues(null);
-	}
-
-	public HierarchicalAggregateEntity result(){
-		finalizeHierarchicalAggregateEntity(root);
-		return this.root;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/PostFlatAggregateSort.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/PostFlatAggregateSort.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/PostFlatAggregateSort.java
deleted file mode 100644
index b048cf8..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/PostFlatAggregateSort.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-public class PostFlatAggregateSort {
-	private static SortedSet<Map.Entry<List<String>, List<Double>>> sortByValue(Map<List<String>, List<Double>> map, List<SortOption> sortOptions) {
-	    SortedSet<Map.Entry<List<String>, List<Double>>> sortedEntries = new TreeSet<Map.Entry<List<String>, List<Double>>>(new MapEntryComparator(sortOptions));
-	    sortedEntries.addAll(map.entrySet());
-	    return sortedEntries;
-	}
-
-	/**
-	 * sort aggregated results with sort options
-	 * @param aggregatedResult aggregated result set, but it is not sorted
-	 * @sortOptions sorting options
-	 * @topN top N results will be returned if topN is specified. If it's not specified (as default value 0), all results will be returned
-	 */
-	public static List<Map.Entry<List<String>, List<Double>>> sort(Map<List<String>, List<Double>> aggregatedResult, List<SortOption> sortOptions, int topN){
-		SortedSet<Map.Entry<List<String>, List<Double>>> allList = sortByValue(aggregatedResult, sortOptions);
-		List<Map.Entry<List<String>, List<Double>>> result = new ArrayList<Map.Entry<List<String>, List<Double>>>();
-		for (Map.Entry<List<String>, List<Double>> entry : allList) {
-			result.add(entry);
-			if (topN > 0 && result.size() >= topN) {
-				break;
-			}
-		}
-		return result;
-	}
-
-	private static class MapEntryComparator implements Comparator<Map.Entry<List<String>, List<Double>>>{
-		private List<SortOption> sortOptions;
-		public MapEntryComparator(List<SortOption> sortOptions){
-			this.sortOptions = sortOptions;
-		}
-		/**
-		 * default to sort by all groupby fields
-		 */
-		@Override
-        public int compare(Map.Entry<List<String>, List<Double>> e1, Map.Entry<List<String>, List<Double>> e2){
-			int r = 0;
-			List<String> keyList1 = e1.getKey();
-			List<Double> valueList1 = e1.getValue();
-			List<String> keyList2 = e2.getKey();
-			List<Double> valueList2 = e2.getValue();
-			for(SortOption so : sortOptions){
-				int index = so.getIndex();
-				if (index == -1) {
-					continue;
-				}
-				if(!so.isInGroupby()){  // sort fields come from functions
-					Double value1 = valueList1.get(index);
-					Double value2 = valueList2.get(index);
-					r = value1.compareTo(value2);
-				}else{  // sort fields come from groupby fields
-					String key1 = keyList1.get(index);
-					String key2 = keyList2.get(index);
-					r = key1.compareTo(key2);
-				}
-				if(r == 0) continue;
-				if(!so.isAscendant()){
-					r = -r;
-				}
-				return r;
-			}
-			// default to sort by groupby fields ascendently
-			if(r ==0){ // TODO is this check necessary
-				return new GroupbyFieldsComparator().compare(keyList1, keyList2);
-			}
-			return r;
-        }
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/PostHierarchicalAggregateSort.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/PostHierarchicalAggregateSort.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/PostHierarchicalAggregateSort.java
deleted file mode 100644
index ba1218c..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/PostHierarchicalAggregateSort.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-public class PostHierarchicalAggregateSort {
-
-	private static SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortByValue(HierarchicalAggregateEntity entity, List<SortOption> sortOptions) {
-	    SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedEntries = new TreeSet<Map.Entry<String, HierarchicalAggregateEntity>>(new MapEntryComparator(sortOptions));
-	    sortedEntries.addAll(entity.getChildren().entrySet());
-	    return sortedEntries;
-	}
-
-	/**
-	 * sort aggregated results with sort options
-     *
-     * @param result
-     * @param sortOptions
-     * @return
-     */
-	public static HierarchicalAggregateEntity sort(HierarchicalAggregateEntity result, List<SortOption> sortOptions){
-		SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> tmp = sortByValue(result, sortOptions);
-		result.setSortedList(tmp);
-		result.setChildren(null);
-		for(Map.Entry<String, HierarchicalAggregateEntity> entry : tmp){
-			sort(entry.getValue(), sortOptions);
-		}
-		return result;
-	}
-
-	private static class MapEntryComparator implements Comparator<Map.Entry<String, HierarchicalAggregateEntity>>{
-		private List<SortOption> sortOptions;
-
-		public MapEntryComparator(List<SortOption> sortOptions){
-			this.sortOptions = sortOptions;
-		}
-
-		/**
-		 * default to sort by all groupby fields
-		 */
-		@Override
-        public int compare(Map.Entry<String, HierarchicalAggregateEntity> e1, Map.Entry<String, HierarchicalAggregateEntity> e2){
-			int r = 0;
-			String key1 = e1.getKey();
-			List<Double> valueList1 = e1.getValue().getValues();
-			String key2 = e2.getKey();
-			List<Double> valueList2 = e2.getValue().getValues();
-			for(SortOption so : sortOptions){
-				int index = so.getIndex();
-				if (index == -1) {
-					continue;
-				}
-				if(!so.isInGroupby()){  // sort fields come from functions
-					Double value1 = valueList1.get(index);
-					Double value2 = valueList2.get(index);
-					r = value1.compareTo(value2);
-				}  
-				// sort fields come from groupby fields, then silently ignored
-				
-				if(r == 0) continue;
-				if(!so.isAscendant()){
-					r = -r;
-				}
-				return r;
-			}
-			// default to sort by groupby fields ascendently
-			if(r ==0){
-				return key1.compareTo(key2);
-			}
-			return r;
-        }
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SortOption.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SortOption.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SortOption.java
deleted file mode 100644
index e9b8733..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SortOption.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-/**
- * sum(field1), max(field2) groupby(field3, field4) sort by field1 asc, field3 desc
- * There are 2 SortOption object, then
- * the 1st one is inGroupby=false, index=0, ascendent=true
- * the 2nd one is inGroupby=true, index=1, ascendent=false
- *
- */
-public class SortOption {
-	private boolean inGroupby; // sort field defaultly is not from groupby fields 
-	private int index; // index relative to list of groupby fields or list of functions
-	private boolean ascendant; //asc or desc
-
-	public boolean isInGroupby() {
-		return inGroupby;
-	}
-	public void setInGroupby(boolean inGroupby) {
-		this.inGroupby = inGroupby;
-	}
-	public int getIndex() {
-		return index;
-	}
-	public void setIndex(int index) {
-		this.index = index;
-	}
-	public boolean isAscendant() {
-		return ascendant;
-	}
-	public void setAscendant(boolean ascendant) {
-		this.ascendant = ascendant;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SortOptionsParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SortOptionsParser.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SortOptionsParser.java
deleted file mode 100644
index a5829d3..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SortOptionsParser.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SortOptionsParser {
-	private static final Logger LOG = LoggerFactory.getLogger(SortOptionsParser.class);
-	private static Pattern pattern = Pattern.compile("^(.+)\\s+(asc|desc)$");
-		
-	public static List<SortOption> parse(List<String> groupbyFields, List<String> aggregatedFields, List<String> sortOptions, List<String> sortFields){
-		List<SortOption> list = new ArrayList<SortOption>();
-		for(String sortOption : sortOptions){
-			Matcher m = pattern.matcher(sortOption);
-			if(!m.find()){
-				throw new IllegalArgumentException("sort option must have the format of <groupbyfield|function> asc|desc");
-			}
-			String field = m.group(1);
-			if (sortFields != null) {
-				sortFields.add(field);
-			}
-			SortOption so = new SortOption();
-			list.add(so);
-			so.setAscendant(m.group(2).equals("asc") ? true : false);
-			int index = aggregatedFields.indexOf(field); 
-			if(index > -1){
-				so.setInGroupby(false);
-				so.setIndex(index);
-				continue;
-			}
-			if(groupbyFields != null){  // if groupbyFields is not provided, ignore this sort field
-				index = groupbyFields.indexOf(field);
-				if(index > -1){
-					so.setInGroupby(true);
-					so.setIndex(index);
-					continue;
-				}
-			}
-			logNonExistingSortByField(field);
-			so.setInGroupby(false);
-			so.setIndex(-1);
-		}
-		return list;
-	}
-	
-	private static void logNonExistingSortByField(String sortByField){
-		LOG.warn("Sortby field is neither in aggregated fields or groupby fields, ignore " + sortByField);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SynchronizedAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SynchronizedAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SynchronizedAggregator.java
deleted file mode 100755
index 9380a52..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SynchronizedAggregator.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import eagle.log.base.taggedlog.TaggedLogAPIEntity;
-
-public class SynchronizedAggregator implements Aggregator{
-	private Object mutex = new Object();
-	private Aggregator agg;
-	
-	public SynchronizedAggregator(Aggregator agg){
-		this.agg = agg;
-	}
-	
-	@Override
-	public void accumulate(TaggedLogAPIEntity entity) throws Exception{
-		synchronized(mutex){
-			agg.accumulate(entity);
-		}
-	}
-}	

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SynchronizedEntityCreationListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SynchronizedEntityCreationListener.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SynchronizedEntityCreationListener.java
deleted file mode 100644
index 884a74a..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SynchronizedEntityCreationListener.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import eagle.log.entity.EntityCreationListener;
-
-public class SynchronizedEntityCreationListener implements EntityCreationListener{
-	private Object mutex = new Object();
-	private EntityCreationListener listener;
-	
-	public SynchronizedEntityCreationListener(EntityCreationListener listener){
-		this.listener = listener;
-	}
-	
-	@Override
-	public void entityCreated(TaggedLogAPIEntity entity) throws Exception{
-		synchronized(mutex){
-			listener.entityCreated(entity);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/TimeSeriesAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/TimeSeriesAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/TimeSeriesAggregator.java
deleted file mode 100755
index 5cf6ab4..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/TimeSeriesAggregator.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import eagle.query.aggregate.AggregateFunctionType;
-import eagle.query.aggregate.raw.GroupbyKeyAggregatable;
-import eagle.query.aggregate.raw.GroupbyKeyValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * TODO Assuming that data point comes in the sequence of occurrence time desc or asc would 
- * save memory for holding all the data in the memory
- *
- * <h3>Aggregate Bucket Structure</h3>
- * <pre>
- * {
- *  ["key<SUB>1</SUB>","key<SUB>2</SUB>",...,(entity.getTimestamp() - startTime)/intervalms]:[value<SUB>1</SUB>,value<SUB>2</SUB>,...,value<SUB>n</SUB>]
- * }
- * </pre>
- *
- */
-public class TimeSeriesAggregator extends FlatAggregator implements GroupbyKeyAggregatable {
-	private final static Logger LOG = LoggerFactory.getLogger(TimeSeriesAggregator.class);
-	private static final int DEFAULT_DATAPOINT_MAX_COUNT = 1000;
-	private long startTime;
-	private long endTime;
-	private long intervalms;
-	private int numFunctions;
-	private int ignoredEntityCounter = 0;
-	
-	public TimeSeriesAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields,
-			long startTime, long endTime, long intervalms){
-		super(groupbyFields, aggregateFuntionTypes, aggregatedFields);
-		// guard to avoid too many data points returned
-//		validateTimeRange(startTime, endTime, intervalms);
-		this.startTime = startTime;
-		this.endTime = endTime;
-		this.intervalms = intervalms;
-		this.numFunctions = aggregateFuntionTypes.size();
-	}
-
-//	@Deprecated
-//	public static void validateTimeRange(long startTime, long endTime, long intervalms){
-//		if(startTime >= endTime || intervalms <= 0){
-//			throw new IllegalArgumentException("invalid argument, startTime should be less than endTime and interval must be greater than 0, starTime is " + startTime + " and endTime is " + endTime + ", interval is " + intervalms);
-//		}
-//		if((endTime-startTime)/intervalms > DEFAULT_DATAPOINT_MAX_COUNT){
-//			throw new IllegalArgumentException("invalid argument, # of datapoints should be less than " + DEFAULT_DATAPOINT_MAX_COUNT + ", current # of datapoints is " + (endTime-startTime)/intervalms);
-//		}
-//	}
-	
-	public void accumulate(TaggedLogAPIEntity entity) throws Exception{
-		List<String> groupbyFieldValues = createGroup(entity);
-		// TODO: make sure timestamp be in range of this.startTime to this.endTime in outer side
-		// guard the time range to avoid to accumulate entities whose timestamp is bigger than endTime
-		if(entity.getTimestamp() >= this.endTime || entity.getTimestamp() < this.startTime){
-			if(LOG.isDebugEnabled()) LOG.debug("Ignore in-coming entity whose timestamp > endTime or < startTime, timestamp: " + entity.getTimestamp() + ", startTime:" + startTime + ", endTime:" + endTime);
-			this.ignoredEntityCounter ++;
-			return;
-		}
-		// time series bucket index
-		long located =(entity.getTimestamp() - startTime)/intervalms; 
-		groupbyFieldValues.add(String.valueOf(located));
-		List<Double> preAggregatedValues = createPreAggregatedValues(entity);
-		bucket.addDatapoint(groupbyFieldValues, preAggregatedValues);
-	}
-	
-	public Map<List<String>, List<Double>> result(){
-		if(this.ignoredEntityCounter > 0)
-			LOG.warn("Ignored "+this.ignoredEntityCounter+" entities for reason: timestamp > "+this.endTime+" or < "+this.startTime);
-		return bucket.result();
-	}
-
-	/**
-	 * Support new aggregate result
-	 *
-	 * @return
-	 */
-	@Override
-	public List<GroupbyKeyValue> getGroupbyKeyValues(){
-		if(this.ignoredEntityCounter > 0)
-			LOG.warn("Ignored "+this.ignoredEntityCounter+" entities for reason: timestamp > "+this.endTime+" or < "+this.startTime);
-		return bucket.getGroupbyKeyValue();
-	}
-	
-	public Map<List<String>, List<double[]>> getMetric(){
-		// groupbyfields+timeseriesbucket --> aggregatedvalues for different function
-		Map<List<String>, List<Double>> result = bucket.result();
-//		Map<List<String>, List<double[]>> timeseriesDatapoints = new HashMap<List<String>, List<double[]>>();
-//		/**
-//		 * bug fix: startTime is inclusive and endTime is exclusive
-//		 */
-////		int numDatapoints =(int)((endTime-startTime)/intervalms + 1);
-//		int numDatapoints =(int)((endTime-1-startTime)/intervalms + 1);
-//		for(Map.Entry<List<String>, List<Double>> entry : result.entrySet()){
-//			// get groups
-//			List<String> groupbyFields = entry.getKey();
-//			List<String> copy = new ArrayList<String>(groupbyFields);
-//			String strTimeseriesIndex = copy.remove(copy.size()-1);
-//			List<double[]> functionValues = timeseriesDatapoints.get(copy);
-//			if(functionValues == null){
-//				functionValues = new ArrayList<double[]>();
-//				timeseriesDatapoints.put(copy, functionValues);
-//				for(int i=0; i<numFunctions; i++){
-//					functionValues.add(new double[numDatapoints]);
-//				}
-//			}
-//			int timeseriesIndex = Integer.valueOf(strTimeseriesIndex);
-//			int functionIndex = 0;
-//			for(double[] values : functionValues){
-//				values[timeseriesIndex] = entry.getValue().get(functionIndex);
-//				functionIndex++;
-//			}
-//		}
-//		return timeseriesDatapoints;
-		return toMetric(result,(int)((endTime-1-startTime)/intervalms + 1),this.numFunctions);
-	}
-
-	public static Map<List<String>, List<double[]>> toMetric(Map<List<String>, List<Double>> result,int numDatapoints,int numFunctions){
-		Map<List<String>, List<double[]>> timeseriesDatapoints = new HashMap<List<String>, List<double[]>>();
-		/**
-		 * bug fix: startTime is inclusive and endTime is exclusive
-		 */
-//		int numDatapoints =(int)((endTime-startTime)/intervalms + 1);
-//		int numDatapoints =(int)((endTime-1-startTime)/intervalms + 1);
-		for(Map.Entry<List<String>, List<Double>> entry : result.entrySet()){
-			// get groups
-			List<String> groupbyFields = entry.getKey();
-			List<String> copy = new ArrayList<String>(groupbyFields);
-			String strTimeseriesIndex = copy.remove(copy.size()-1);
-			List<double[]> functionValues = timeseriesDatapoints.get(copy);
-			if(functionValues == null){
-				functionValues = new ArrayList<double[]>();
-				timeseriesDatapoints.put(copy, functionValues);
-				for(int i=0; i<numFunctions; i++){
-					functionValues.add(new double[numDatapoints]);
-				}
-			}
-			int timeseriesIndex = Integer.valueOf(strTimeseriesIndex);
-			int functionIndex = 0;
-			for(double[] values : functionValues){
-				values[timeseriesIndex] = entry.getValue().get(functionIndex);
-				functionIndex++;
-			}
-		}
-		return timeseriesDatapoints;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/TimeSeriesBucket.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/TimeSeriesBucket.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/TimeSeriesBucket.java
deleted file mode 100644
index 3b95c8f..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/TimeSeriesBucket.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * only numeric aggregation is supported and number type supported is double
- */
-public class TimeSeriesBucket {
-	private final static Logger LOG = LoggerFactory.getLogger(TimeSeriesBucket.class);
-	private long startTime;
-	private long endTime;
-	private long interval;
-	
-	// map of aggregation function to aggregated values 
-	List<double[]> aggregatedValues = new ArrayList<double[]>();
-	
-	// align from the startTime
-	/**
-	 * 
-	 * @param startTime milliseconds
-	 * @param endTime milliseconds
-	 * @param intervalMillseconds
-	 * @param aggFunctions
-	 */
-	public TimeSeriesBucket(long startTime, long endTime, long intervalms, int numAggFunctions){
-		int count =(int)((endTime-startTime)/intervalms);
-		for(int i=0; i<numAggFunctions; i++){
-			aggregatedValues.add(new double[count]);
-		}
-	}
-	
-	/**
-	 * add datapoint which has a list of values for different aggregate functions
-	 * for example, sum(numHosts), count(*), avg(timespan) etc
-	 * @param timestamp
-	 * @param values
-	 */
-	public void addDataPoint(long timestamp, List<Double> values){
-		// locate timeseries bucket
-		if(timestamp < startTime || timestamp > endTime){
-			LOG.warn("timestamp<startTime or timestamp>endTime, ignore this datapoint." + timestamp + "," + startTime + ":" + endTime);
-			return;
-		}
-		int located =(int)((timestamp - startTime)/interval);
-		int index = 0;
-		for(Double src : values){
-			double[] timeSeriesValues = aggregatedValues.get(index);
-			timeSeriesValues[located] += src;
-			index++;
-		}
-	}
-	
-	public List<double[]> aggregatedValues(){
-		return this.aggregatedValues;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/TimeSeriesPostFlatAggregateSort.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/TimeSeriesPostFlatAggregateSort.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/TimeSeriesPostFlatAggregateSort.java
deleted file mode 100644
index 3c75e52..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/TimeSeriesPostFlatAggregateSort.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-public class TimeSeriesPostFlatAggregateSort {
-	// private static final Logger logger =
-	// LoggerFactory.getLogger(PostFlatAggregateSort.class);
-
-	private static SortedSet<Map.Entry<List<String>, List<Double>>> sortByValue(
-			Map<List<String>, List<Double>> mapForSort,
-			List<SortOption> sortOptions) {
-		SortedSet<Map.Entry<List<String>, List<Double>>> sortedEntries = new TreeSet<Map.Entry<List<String>, List<Double>>>(
-				new MapEntryComparator(sortOptions));
-		sortedEntries.addAll(mapForSort.entrySet());
-		return sortedEntries;
-	}
-
-	/**
-	 * sort aggregated results with sort options
-	 * 
-	 * @param entity
-	 */
-	public static List<Map.Entry<List<String>, List<double[]>>> sort(
-			Map<List<String>, List<Double>> mapForSort,
-			Map<List<String>, List<double[]>> valueMap,
-			List<SortOption> sortOptions, int topN) {
-
-		processIndex(sortOptions);
-		List<Map.Entry<List<String>, List<double[]>>> result = new ArrayList<Map.Entry<List<String>, List<double[]>>>();
-		SortedSet<Map.Entry<List<String>, List<Double>>> sortedSet = sortByValue(
-				mapForSort, sortOptions);
-		for (Map.Entry<List<String>, List<Double>> entry : sortedSet) {
-			List<String> key = entry.getKey();
-			List<double[]> value = valueMap.get(key);
-			if (value != null) {
-				Map.Entry<List<String>, List<double[]>> newEntry = new ImmutableEntry<List<String>, List<double[]>>(key, value);
-				result.add(newEntry);
-				if (topN > 0 && result.size() >= topN) {
-					break;
-				}
-			}
-		}
-		return result;
-	}
-
-	private static void processIndex(List<SortOption> sortOptions) {
-		for (int i = 0; i < sortOptions.size(); ++i) {
-			SortOption so = sortOptions.get(i);
-			so.setIndex(i);
-		}
-	}
-
-	private static class MapEntryComparator implements
-			Comparator<Map.Entry<List<String>, List<Double>>> {
-		private List<SortOption> sortOptions;
-
-		public MapEntryComparator(List<SortOption> sortOptions) {
-			this.sortOptions = sortOptions;
-		}
-
-		/**
-		 * default to sort by all groupby fields
-		 */
-		@Override
-		public int compare(Map.Entry<List<String>, List<Double>> e1,
-				Map.Entry<List<String>, List<Double>> e2) {
-			int r = 0;
-			List<String> keyList1 = e1.getKey();
-			List<Double> valueList1 = e1.getValue();
-			List<String> keyList2 = e2.getKey();
-			List<Double> valueList2 = e2.getValue();
-			for (SortOption so : sortOptions) {
-				int index = so.getIndex();
-				if (index == -1) {
-					continue;
-				}
-				if (!so.isInGroupby()) { // sort fields come from functions
-					Double value1 = valueList1.get(index);
-					Double value2 = valueList2.get(index);
-					r = value1.compareTo(value2);
-				} else { // sort fields come from groupby fields
-					String key1 = keyList1.get(index);
-					String key2 = keyList2.get(index);
-					r = key1.compareTo(key2);
-				}
-				if (r == 0)
-					continue;
-				if (!so.isAscendant()) {
-					r = -r;
-				}
-				return r;
-			}
-			// default to sort by groupby fields ascendently
-			if (r == 0) { // TODO is this check necessary
-				return new GroupbyFieldsComparator()
-						.compare(keyList1, keyList2);
-			}
-			return r;
-		}
-	}
-
-	static class ImmutableEntry<K, V> implements Map.Entry<K, V>, Serializable {
-		private final K key;
-		private final V value;
-
-		ImmutableEntry(K key, V value) {
-			this.key = key;
-			this.value = value;
-		}
-
-		@Override
-		public K getKey() {
-			return key;
-		}
-
-		@Override
-		public V getValue() {
-			return value;
-		}
-
-		@Override
-		public final V setValue(V value) {
-			throw new UnsupportedOperationException();
-		}
-
-		private static final long serialVersionUID = 0;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/GenericEntityQuery.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/GenericEntityQuery.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/GenericEntityQuery.java
new file mode 100755
index 0000000..1319f43
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/GenericEntityQuery.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @since : 10/30/14,2014
+ */
+public class GenericEntityQuery implements GenericQuery,EntityCreationListener {
+	private static final Logger LOG = LoggerFactory.getLogger(GenericEntityQuery.class);
+
+	private List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>();
+	private StreamReader reader;
+
+	public GenericEntityQuery(String serviceName, SearchCondition condition, String metricName) throws IllegalAccessException, InstantiationException {
+		if(serviceName.equals(GenericMetricEntity.GENERIC_METRIC_SERVICE)){
+			if(LOG.isDebugEnabled()) LOG.debug("List metric query");
+			if(metricName == null || metricName.isEmpty()){
+				throw new IllegalArgumentException("metricName should not be empty for metric list query");
+			}
+			if(!condition.getOutputFields().contains(GenericMetricEntity.VALUE_FIELD)){
+				condition.getOutputFields().add(GenericMetricEntity.VALUE_FIELD);
+			}
+			reader = new GenericEntityStreamReader(serviceName, condition,metricName);
+		}else{
+			if(LOG.isDebugEnabled()) LOG.debug("List entity query");
+			reader = new GenericEntityStreamReader(serviceName, condition);
+		}
+		reader.register(this);
+	}
+
+	@Override
+	public long getLastTimestamp() {
+		return reader.getLastTimestamp();
+	}
+
+	@Override
+	public void entityCreated(TaggedLogAPIEntity entity){
+		entities.add(entity);
+	}
+
+	@Override
+	public List<TaggedLogAPIEntity> result() throws Exception{
+		if(LOG.isDebugEnabled()) LOG.debug("Start reading as batch mode");
+		reader.readAsStream();
+		return entities;
+	}
+
+	@Override
+	public long getFirstTimeStamp() {
+		return reader.getFirstTimestamp();
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/GenericQuery.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/GenericQuery.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/GenericQuery.java
new file mode 100755
index 0000000..d3af151
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/GenericQuery.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query;
+
+import java.util.List;
+
+/**
+ * @since : 10/30/14,2014
+ */
+public interface GenericQuery {
+	/**
+	 * Throw all exceptions to http server
+	 *
+     * @param <T> result entity type
+	 * @return result entities list
+	 *
+     * @throws Exception
+	 */
+	<T> List<T> result() throws Exception;
+
+	/**
+	 * Get last/largest timestamp on all rows
+	 *
+	 * @return last timestamp
+	 */
+	long getLastTimestamp();
+
+	/**
+	 * Get first timestamp on all rows
+	 */
+	long getFirstTimeStamp();
+}
\ No newline at end of file