You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2015/11/19 11:47:16 UTC
[09/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46]
Rename package name as "org.apache.eagle"
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/raw/RawAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/raw/RawAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/raw/RawAggregator.java
deleted file mode 100755
index e94487e..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/raw/RawAggregator.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.raw;
-
-import eagle.log.entity.QualifierCreationListener;
-import eagle.log.entity.meta.EntityDefinition;
-import eagle.query.aggregate.AggregateFunctionType;
-
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-
-public class RawAggregator implements QualifierCreationListener,GroupbyKeyAggregatable {
- private List<String> groupbyFields;
- private GroupbyKey key;
- private static final byte[] UNASSIGNED = "unassigned".getBytes();
- private RawGroupbyBucket bucket;
-
- public RawAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFunctionTypes, List<String> aggregatedFields, EntityDefinition ed){
- this.groupbyFields = groupbyFields;
- key = new GroupbyKey();
- bucket = new RawGroupbyBucket(aggregateFunctionTypes, aggregatedFields, ed);
- }
-
- @Override
- public void qualifierCreated(Map<String, byte[]> qualifiers){
- key.clear();
- ListIterator<String> it = groupbyFields.listIterator();
- while(it.hasNext()){
- byte[] groupbyFieldValue = qualifiers.get(it.next());
- if(groupbyFieldValue == null){
- key.addValue(UNASSIGNED);
- }else{
- key.addValue(groupbyFieldValue);
- }
- }
- GroupbyKey newKey = null;
- if(bucket.exists(key)){
- newKey = key;
- }else{
- newKey = new GroupbyKey(key);
- }
-
- bucket.addDatapoint(newKey, qualifiers);
- }
-
- /**
- * @return
- */
- public Map<List<String>, List<Double>> result(){
- return bucket.result();
- }
-
- public List<GroupbyKeyValue> getGroupbyKeyValues(){
- return bucket.groupbyKeyValues();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/raw/RawGroupbyBucket.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/raw/RawGroupbyBucket.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/raw/RawGroupbyBucket.java
deleted file mode 100755
index e633723..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/raw/RawGroupbyBucket.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.raw;
-
-import eagle.log.entity.EntityQualifierUtils;
-import eagle.log.entity.GenericMetricEntity;
-import eagle.log.entity.meta.*;
-import eagle.log.expression.ExpressionParser;
-import eagle.query.aggregate.AggregateFunctionType;
-import eagle.query.parser.TokenConstant;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DoubleWritable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-public class RawGroupbyBucket {
- private final static Logger LOG = LoggerFactory.getLogger(RawGroupbyBucket.class);
-
- private List<String> aggregatedFields;
- private EntityDefinition entityDefinition;
-
-
- private List<AggregateFunctionType> types;
- private SortedMap<GroupbyKey, List<Function>> group2FunctionMap =
- new TreeMap<GroupbyKey, List<Function>>(new GroupbyKeyComparator());
-
- public RawGroupbyBucket(List<AggregateFunctionType> types, List<String> aggregatedFields, EntityDefinition ed){
- this.types = types;
- this.aggregatedFields = aggregatedFields;
- this.entityDefinition = ed;
- }
-
- public boolean exists(GroupbyKey key){
- return group2FunctionMap.containsKey(key);
- }
-
- public void addDatapoint(GroupbyKey groupbyKey, Map<String, byte[]> values){
- // locate groupby bucket
- List<Function> functions = group2FunctionMap.get(groupbyKey);
- if(functions == null){
- functions = new ArrayList<Function>();
- for(AggregateFunctionType type : types){
- FunctionFactory ff = FunctionFactory.locateFunctionFactory(type);
- if(ff == null){
- LOG.error("FunctionFactory of AggregationFunctionType:"+type+" is null");
- }else{
- functions.add(ff.createFunction());
- }
- }
- group2FunctionMap.put(groupbyKey, functions);
- }
- ListIterator<Function> e1 = functions.listIterator();
- ListIterator<String> e2 = aggregatedFields.listIterator();
- while(e1.hasNext() && e2.hasNext()){
- Function f = e1.next();
- String aggregatedField = e2.next();
- byte[] v = values.get(aggregatedField);
- if(f instanceof Function.Count){ // handle count
- if(entityDefinition.getMetricDefinition()==null) {
- f.run(1.0);
- continue;
- }else if(v == null){
- aggregatedField = GenericMetricEntity.VALUE_FIELD;
- v = values.get(aggregatedField);
- }
- }
- if(v != null){
- Qualifier q = entityDefinition.getDisplayNameMap().get(aggregatedField);
- EntitySerDeser<?> serDeser = q.getSerDeser();
- // double d = 0.0;
- if(serDeser instanceof IntSerDeser){
- double d= (Integer)serDeser.deserialize(v);
- f.run(d);
- }else if(serDeser instanceof LongSerDeser){
- double d = (Long)serDeser.deserialize(v);
- f.run(d);
- }else if(serDeser instanceof DoubleSerDeser){
- double d = (Double)serDeser.deserialize(v);
- f.run(d);
- // TODO: support numeric array type that is not metric
- }else if(serDeser instanceof DoubleArraySerDeser){
- double[] d = ((DoubleArraySerDeser) serDeser).deserialize(v);
- if(f instanceof Function.Count){
- f.run(d.length);
- } else {
- for(double i:d) f.run(i);
- }
- }else if(serDeser instanceof IntArraySerDeser){
- int[] d = ((IntArraySerDeser) serDeser).deserialize(v);
- if(f instanceof Function.Count){
- f.run(d.length);
- }else{
- for(int i:d) f.run(i);
- }
- }else{
- if(LOG.isDebugEnabled()) LOG.debug("EntitySerDeser of field "+aggregatedField+" is not IntSerDeser or LongSerDeser or DoubleSerDeser or IntArraySerDeser or DoubleArraySerDeser, default as 0.0");
- }
- }else if(TokenConstant.isExpression(aggregatedField)){
- String expression = TokenConstant.parseExpressionContent(aggregatedField);
- try {
- Map<String,Double> doubleMap = EntityQualifierUtils.bytesMapToDoubleMap(values, entityDefinition);
- if(entityDefinition.getMetricDefinition() == null) {
- double value = ExpressionParser.eval(expression,doubleMap);
- // LOG.info("DEBUG: Eval "+expression +" = "+value);
- f.run(value);
- }else{
- Qualifier qualifier = entityDefinition.getDisplayNameMap().get(GenericMetricEntity.VALUE_FIELD);
- EntitySerDeser _serDeser = qualifier.getSerDeser();
- byte[] valueBytes = values.get(GenericMetricEntity.VALUE_FIELD);
- if( _serDeser instanceof DoubleArraySerDeser){
- double[] d = (double[]) _serDeser.deserialize(valueBytes);
- if(f instanceof Function.Count) {
- f.run(d.length);
- }else{
- for(double i:d){
- doubleMap.put(GenericMetricEntity.VALUE_FIELD,i);
- f.run(ExpressionParser.eval(expression, doubleMap));
- }
- }
- }else if(_serDeser instanceof IntArraySerDeser){
- int[] d = (int[]) _serDeser.deserialize(valueBytes);
- if(f instanceof Function.Count) {
- f.run(d.length);
- }else {
- for (double i : d) {
- doubleMap.put(GenericMetricEntity.VALUE_FIELD, i);
- f.run(ExpressionParser.eval(expression, doubleMap));
- }
- }
- }else{
- double value = ExpressionParser.eval(expression,doubleMap);
- f.run(value);
- }
- }
- } catch (Exception e) {
- LOG.error("Got exception to evaluate expression: "+expression+", exception: "+e.getMessage(),e);
- }
- }
- }
- }
-
- /**
- * expensive operation - create objects and format the result
- * @return
- */
- public List<GroupbyKeyValue> groupbyKeyValues(){
- List<GroupbyKeyValue> results = new ArrayList<GroupbyKeyValue>();
- for(Map.Entry<GroupbyKey, List<Function>> entry : this.group2FunctionMap.entrySet()){
- GroupbyValue value = new GroupbyValue();
- for(Function f : entry.getValue()){
- value.add(new DoubleWritable(f.result()));
- value.addMeta(f.count());
- }
- results.add(new GroupbyKeyValue(entry.getKey(),value));
- }
- return results;
- }
-
- /**
- * expensive operation - create objects and format the result
- * @return
- */
- public Map<List<String>, List<Double>> result(){
- Map<List<String>, List<Double>> result = new HashMap<List<String>, List<Double>>();
- for(Map.Entry<GroupbyKey, List<Function>> entry : this.group2FunctionMap.entrySet()){
- List<Double> values = new ArrayList<Double>();
- for(Function f : entry.getValue()){
- values.add(f.result());
- }
- GroupbyKey key = entry.getKey();
- List<BytesWritable> list1 = key.getValue();
- List<String> list2 = new ArrayList<String>();
- for(BytesWritable e : list1){
- list2.add(new String(e.copyBytes()));
- }
- result.put(list2, values);
- }
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/raw/WritableList.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/raw/WritableList.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/raw/WritableList.java
deleted file mode 100755
index db774c1..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/raw/WritableList.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.raw;
-
-import org.apache.hadoop.io.Writable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.lang.reflect.ParameterizedType;
-import java.util.ArrayList;
-
-/**
- * @since : 11/6/14,2014
- */
-public class WritableList<E extends Writable> extends ArrayList<E> implements Writable{
- private Class<E> itemTypeClass;
-
- public WritableList(Class<E> typeClass){
- this.itemTypeClass = typeClass;
- }
-
- public WritableList(Class<E> typeClass,int initialCapacity){
- super(initialCapacity);
- this.itemTypeClass = typeClass;
- }
-
-
- /**
- * <h3> Get item class by </h3>
- * <pre>
- * (Class<E>) ((ParameterizedType)getClass().getGenericSuperclass()).getActualTypeArguments()[0];
- * </pre>
- */
- @Deprecated
- public WritableList(){
- this.itemTypeClass = (Class<E>) ((ParameterizedType)getClass().getGenericSuperclass()).getActualTypeArguments()[0];
- }
-
- private void check() throws IOException{
- if(this.itemTypeClass == null){
- throw new IOException("Class Type of WritableArrayList<E extends Writable> is null");
- }
- }
-
- public Class<E> getItemClass(){
- return itemTypeClass;
- }
-
- /**
- * Serialize the fields of this object to <code>out</code>.
- *
- * @param out <code>DataOuput</code> to serialize this object into.
- * @throws java.io.IOException
- */
- @Override
- public void write(DataOutput out) throws IOException {
- this.check();
- out.writeInt(this.size());
- for(Writable item: this){
- item.write(out);
- }
- }
-
- /**
- * Deserialize the fields of this object from <code>in</code>.
- * <p/>
- * <p>For efficiency, implementations should attempt to re-use storage in the
- * existing object where possible.</p>
- *
- * @param in <code>DataInput</code> to deseriablize this object from.
- * @throws java.io.IOException
- */
- @Override
- public void readFields(DataInput in) throws IOException {
- this.check();
- int size = in.readInt();
- for(int i=0;i<size;i++){
- try {
- E item = itemTypeClass.newInstance();
- item.readFields(in);
- this.add(item);
- } catch (InstantiationException e) {
- throw new IOException("Got exception to create instance for class: "+itemTypeClass+": "+e.getMessage(),e);
- } catch (IllegalAccessException e) {
- throw new IOException("Got exception to create instance for class: "+itemTypeClass+": "+e.getMessage(),e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/AbstractAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/AbstractAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/AbstractAggregator.java
deleted file mode 100755
index c857f5c..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/AbstractAggregator.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import eagle.log.entity.EntityCreationListener;
-import eagle.log.expression.ExpressionParser;
-import eagle.query.aggregate.AggregateFunctionType;
-import eagle.query.aggregate.IllegalAggregateFieldTypeException;
-import eagle.query.parser.TokenConstant;
-import org.apache.commons.beanutils.PropertyUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.beans.PropertyDescriptor;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.List;
-
-public abstract class AbstractAggregator implements Aggregator, EntityCreationListener{
- private final static Logger LOG = LoggerFactory.getLogger(AbstractAggregator.class);
-
- private static final String UNASSIGNED = "unassigned";
- protected List<String> groupbyFields;
- protected List<AggregateFunctionType> aggregateFunctionTypes;
- protected List<String> aggregatedFields;
- // a cache to know immediately if groupby field should come from tags(true) or qualifiers(false)
- private Boolean[] _groupbyFieldPlacementCache;
- private Method[] _aggregateFieldReflectedMethodCache;
-
- public AbstractAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields){
- this.groupbyFields = groupbyFields;
- this.aggregateFunctionTypes = aggregateFuntionTypes;
- this.aggregatedFields = aggregatedFields;
- _aggregateFieldReflectedMethodCache = new Method[this.aggregatedFields.size()];
- _groupbyFieldPlacementCache = new Boolean[this.groupbyFields.size()];
- }
-
- @Override
- public void entityCreated(TaggedLogAPIEntity entity) throws Exception{
- accumulate(entity);
- }
-
- public abstract Object result();
-
- protected String createGroupFromTags(TaggedLogAPIEntity entity, String groupbyField, int i){
- String groupbyFieldValue = entity.getTags().get(groupbyField);
- if(groupbyFieldValue != null){
- _groupbyFieldPlacementCache[i] = true;
- return groupbyFieldValue;
- }
- return null;
- }
-
- protected String createGroupFromQualifiers(TaggedLogAPIEntity entity, String groupbyField, int i){
- try{
- PropertyDescriptor pd = PropertyUtils.getPropertyDescriptor(entity, groupbyField);
- if(pd == null)
- return null;
-// _groupbyFieldPlacementCache.put(groupbyField, false);
- _groupbyFieldPlacementCache[i] = false;
- return (String)(pd.getReadMethod().invoke(entity));
- }catch(NoSuchMethodException ex){
- return null;
- }catch(InvocationTargetException ex){
- return null;
- }catch(IllegalAccessException ex){
- return null;
- }
- }
-
- protected String determineGroupbyFieldValue(TaggedLogAPIEntity entity, String groupbyField, int i){
- Boolean placement = _groupbyFieldPlacementCache[i];
- String groupbyFieldValue = null;
- if(placement != null){
- groupbyFieldValue = placement.booleanValue() ? createGroupFromTags(entity, groupbyField, i) : createGroupFromQualifiers(entity, groupbyField, i);
- }else{
- groupbyFieldValue = createGroupFromTags(entity, groupbyField, i);
- if(groupbyFieldValue == null){
- groupbyFieldValue = createGroupFromQualifiers(entity, groupbyField, i);
- }
- }
- groupbyFieldValue = (groupbyFieldValue == null ? UNASSIGNED : groupbyFieldValue);
- return groupbyFieldValue;
- }
-
- /**
- * TODO For count aggregation, special treatment is the value is always 0 unless we support count(*) or count(<fieldname>) which counts number of rows or
- * number of non-null field
- * For other aggregation, like sum,min,max,avg, we should resort to qualifiers
- * @param entity
- * @return
- */
- protected List<Double> createPreAggregatedValues(TaggedLogAPIEntity entity) throws Exception{
- List<Double> values = new ArrayList<Double>();
- int functionIndex = 0;
- for(AggregateFunctionType type : aggregateFunctionTypes){
- if(type.name().equals(AggregateFunctionType.count.name())){
- values.add(new Double(1));
- }else{
- // find value in qualifier by checking java bean
- String aggregatedField = aggregatedFields.get(functionIndex);
- if(TokenConstant.isExpression(aggregatedField)){
- try {
- String expr = TokenConstant.parseExpressionContent(aggregatedField);
- values.add(ExpressionParser.eval(expr, entity));
- }catch (Exception ex){
- LOG.error("Failed to evaluate expression-based aggregation: " + aggregatedField, ex);
- throw ex;
- }
- }else {
- try {
- Method m = _aggregateFieldReflectedMethodCache[functionIndex];
- if (m == null) {
-// pd = PropertyUtils.getPropertyDescriptor(entity, aggregatedField);
-// if (pd == null) {
-// final String errMsg = "Field/tag " + aggregatedField + " is not defined for entity " + entity.getClass().getSimpleName();
-// logger.error(errMsg);
-// throw new Exception(errMsg);
-// }
-// Object obj = pd.getReadMethod().invoke(entity);
- String tmp = aggregatedField.substring(0, 1).toUpperCase() + aggregatedField.substring(1);
- m = entity.getClass().getMethod("get" + tmp);
- _aggregateFieldReflectedMethodCache[functionIndex] = m;
- }
- Object obj = m.invoke(entity);
- values.add(numberToDouble(obj));
- } catch (Exception ex) {
- LOG.error("Cannot do aggregation for field " + aggregatedField, ex);
- throw ex;
- }
- }
- }
- functionIndex++;
- }
- return values;
- }
-
- /**
- * TODO this is a hack, we need elegant way to convert type to a broad precision
- *
- * @param obj
- * @return
- */
- protected Double numberToDouble(Object obj){
- if(obj instanceof Double)
- return (Double)obj;
- if(obj instanceof Integer){
- return new Double(((Integer)obj).doubleValue());
- }
- if(obj instanceof Long){
- return new Double(((Long)obj).doubleValue());
- }
- // TODO hack to support string field for demo purpose, should be removed
- if(obj == null){
- return new Double(0.0);
- }
- if(obj instanceof String){
- try{
- return new Double((String)obj);
- }catch(Exception ex){
- LOG.warn("Datapoint ignored because it can not be converted to correct number for " + obj, ex);
- return new Double(0.0);
- }
- }
-
- throw new IllegalAggregateFieldTypeException(obj.getClass().toString() + " type is not support. The aggregated field must be numeric type, int, long or double");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/Aggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/Aggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/Aggregator.java
deleted file mode 100755
index fd07526..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/Aggregator.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import eagle.log.base.taggedlog.TaggedLogAPIEntity;
-
-public interface Aggregator {
- /**
- * Accumulate callback
- *
- * @param entity accumulated entity instance
- * @throws Exception
- */
- public void accumulate(TaggedLogAPIEntity entity) throws Exception;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/EntityCreationListenerFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/EntityCreationListenerFactory.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/EntityCreationListenerFactory.java
deleted file mode 100644
index 2d492b7..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/EntityCreationListenerFactory.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import eagle.log.entity.EntityCreationListener;
-
-public class EntityCreationListenerFactory {
- public static EntityCreationListener synchronizedEntityCreationListener(EntityCreationListener listener){
- return new SynchronizedEntityCreationListener(listener);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/FlatAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/FlatAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/FlatAggregator.java
deleted file mode 100755
index dcc6b84..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/FlatAggregator.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import eagle.query.aggregate.AggregateFunctionType;
-
-/**
- * Not thread safe
- */
-public class FlatAggregator extends AbstractAggregator{
- protected GroupbyBucket bucket;
-
- /**
- * @param groupbyFields
- * @param aggregateFuntionTypes
- * @param aggregatedFields
- */
- public FlatAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields){
- super(groupbyFields, aggregateFuntionTypes, aggregatedFields);
- bucket = new GroupbyBucket(this.aggregateFunctionTypes);
- }
-
- public void accumulate(TaggedLogAPIEntity entity) throws Exception{
- List<String> groupbyFieldValues = createGroup(entity);
- List<Double> preAggregatedValues = createPreAggregatedValues(entity);
- bucket.addDatapoint(groupbyFieldValues, preAggregatedValues);
- }
-
- public Map<List<String>, List<Double>> result(){
- return bucket.result();
- }
-
- protected List<String> createGroup(TaggedLogAPIEntity entity){
- List<String> groupbyFieldValues = new ArrayList<String>();
- int i = 0;
- for(String groupbyField : groupbyFields){
- String groupbyFieldValue = determineGroupbyFieldValue(entity, groupbyField, i++);
- groupbyFieldValues.add(groupbyFieldValue);
- }
- return groupbyFieldValues;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/GroupbyBucket.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/GroupbyBucket.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/GroupbyBucket.java
deleted file mode 100755
index 6475462..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/GroupbyBucket.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import eagle.query.QueryConstants;
-import eagle.query.aggregate.AggregateFunctionType;
-import eagle.query.aggregate.raw.GroupbyKey;
-import eagle.query.aggregate.raw.GroupbyKeyValue;
-import eagle.query.aggregate.raw.GroupbyValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class GroupbyBucket {
- private final static Logger LOG = LoggerFactory.getLogger(GroupbyBucket.class);
-
- public static Map<String, FunctionFactory> _functionFactories =
- new HashMap<>();
-
- // TODO put this logic to AggregatorFunctionType
- static{
- _functionFactories.put(AggregateFunctionType.count.name(), new CountFactory());
- _functionFactories.put(AggregateFunctionType.sum.name(), new SumFactory());
- _functionFactories.put(AggregateFunctionType.min.name(), new MinFactory());
- _functionFactories.put(AggregateFunctionType.max.name(), new MaxFactory());
- _functionFactories.put(AggregateFunctionType.avg.name(), new AvgFactory());
- }
-
- private List<AggregateFunctionType> types;
-// private SortedMap<List<String>, List<Function>> group2FunctionMap =
-// new TreeMap<List<String>, List<Function>>(new GroupbyFieldsComparator());
-
- private Map<List<String>, List<Function>> group2FunctionMap = new HashMap<>(); //new GroupbyFieldsComparator());
-
- public GroupbyBucket(List<AggregateFunctionType> types){
- this.types = types;
- }
-
- public void addDatapoint(List<String> groupbyFieldValues, List<Double> values){
- // LOG.info("DEBUG: addDatapoint: groupby=["+StringUtils.join(groupbyFieldValues,",")+"], values=["+StringUtils.join(values, ",")+"]");
-
- // locate groupby bucket
- List<Function> functions = group2FunctionMap.get(groupbyFieldValues);
- if(functions == null){
- functions = new ArrayList<Function>();
- for(AggregateFunctionType type : types){
- functions.add(_functionFactories.get(type.name()).createFunction());
- }
- group2FunctionMap.put(groupbyFieldValues, functions);
- }
- int functionIndex = 0;
- for(Double v : values){
- functions.get(functionIndex).run(v);
- functionIndex++;
- }
- }
-
- public Map<List<String>, List<Double>> result(){
- Map<List<String>, List<Double>> result = new HashMap<List<String>, List<Double>>();
- for(Map.Entry<List<String>, List<Function>> entry : this.group2FunctionMap.entrySet()){
- List<Double> values = new ArrayList<Double>();
- for(Function f : entry.getValue()){
- values.add(f.result());
- }
- result.put(entry.getKey(), values);
- }
- return result;
- }
-
- public List<GroupbyKeyValue> getGroupbyKeyValue(){
- List<GroupbyKeyValue> results = new ArrayList<GroupbyKeyValue>();
-
- for(Map.Entry<List<String>, List<Function>> entry : this.group2FunctionMap.entrySet()){
- GroupbyKey key = new GroupbyKey();
- for(String keyStr:entry.getKey()){
- try {
- key.addValue(keyStr.getBytes(QueryConstants.CHARSET));
- } catch (UnsupportedEncodingException e) {
- LOG.error(e.getMessage(),e);
- }
- }
- GroupbyValue value = new GroupbyValue();
- for(Function f : entry.getValue()){
- value.add(f.result());
- value.addMeta(f.count());
- }
- results.add(new GroupbyKeyValue(key,value));
- }
-
- return results;
- }
-
- public static interface FunctionFactory{
- public Function createFunction();
- }
-
- public static abstract class Function{
- protected int count;
-
- public abstract void run(double v);
- public abstract double result();
- public int count(){
- return count;
- }
- public void incrCount(){
- count ++;
- }
- }
-
- private static class CountFactory implements FunctionFactory{
- @Override
- public Function createFunction(){
- return new Count();
- }
- }
-
-
- private static class Count extends Sum{
- public Count(){
- super();
- }
- }
-
- private static class SumFactory implements FunctionFactory{
- @Override
- public Function createFunction(){
- return new Sum();
- }
- }
-
- private static class Sum extends Function{
- private double summary;
- public Sum(){
- this.summary = 0.0;
- }
- @Override
- public void run(double v){
- this.incrCount();
- this.summary += v;
- }
-
- @Override
- public double result(){
- return this.summary;
- }
- }
-
- private static class MinFactory implements FunctionFactory{
- @Override
- public Function createFunction(){
- return new Min();
- }
- }
- public static class Min extends Function{
- private double minimum;
- public Min(){
- // TODO is this a bug, or only positive numeric calculation is supported
- this.minimum = Double.MAX_VALUE;
- }
-
- @Override
- public void run(double v){
- if(v < minimum){
- minimum = v;
- }
- this.incrCount();
- }
-
- @Override
- public double result(){
- return minimum;
- }
- }
-
- private static class MaxFactory implements FunctionFactory{
- @Override
- public Function createFunction(){
- return new Max();
- }
- }
- public static class Max extends Function{
- private double maximum;
- public Max(){
- // TODO is this a bug, or only positive numeric calculation is supported
- this.maximum = 0.0;
- }
- @Override
- public void run(double v){
- if(v > maximum){
- maximum = v;
- }
- this.incrCount();
- }
-
- @Override
- public double result(){
- return maximum;
- }
- }
-
- private static class AvgFactory implements FunctionFactory{
- @Override
- public Function createFunction(){
- return new Avg();
- }
- }
- public static class Avg extends Function{
- private double total;
- public Avg(){
- this.total = 0.0;
- }
- @Override
- public void run(double v){
- total += v;
- this.incrCount();
- }
- @Override
- public double result(){
- return this.total/this.count;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/GroupbyFieldsComparator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/GroupbyFieldsComparator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/GroupbyFieldsComparator.java
deleted file mode 100644
index 17e1602..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/GroupbyFieldsComparator.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import java.util.Comparator;
-import java.util.List;
-
-/**
- * this is default comparator for aggregation. The behavior is to sort by groupby fields ascendantly
- */
-public class GroupbyFieldsComparator implements Comparator<List<String>>{
- @Override
- public int compare(List<String> list1, List<String> list2){
- if(list1 == null || list2 == null || list1.size() != list2.size())
- throw new IllegalArgumentException("2 list of groupby fields must be non-null and have the same size");
- int r = 0;
- int index = 0;
- for(String s1 : list1){
- r = s1.compareTo(list2.get(index++));
- if(r != 0)
- return r;
- }
- return r;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/HierarchicalAggregateEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/HierarchicalAggregateEntity.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/HierarchicalAggregateEntity.java
deleted file mode 100644
index 737af98..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/HierarchicalAggregateEntity.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.SortedSet;
-import java.util.TreeMap;
-
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
-public class HierarchicalAggregateEntity {
- private String key;
- private List<GroupbyBucket.Function> tmpValues = new ArrayList<GroupbyBucket.Function>();
- private List<Double> values = new ArrayList<Double>();
- private SortedMap<String, HierarchicalAggregateEntity> children = new TreeMap<String, HierarchicalAggregateEntity>();
- private SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedList = null;
-
- public SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> getSortedList() {
- return sortedList;
- }
- public void setSortedList(
- SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedList) {
- this.sortedList = sortedList;
- }
- public List<GroupbyBucket.Function> getTmpValues() {
- return tmpValues;
- }
- public void setTmpValues(List<GroupbyBucket.Function> tmpValues) {
- this.tmpValues = tmpValues;
- }
- public String getKey() {
- return key;
- }
- public void setKey(String key) {
- this.key = key;
- }
- public List<Double> getValues() {
- return values;
- }
- public void setValues(List<Double> values) {
- this.values = values;
- }
- public SortedMap<String, HierarchicalAggregateEntity> getChildren() {
- return children;
- }
- public void setChildren(SortedMap<String, HierarchicalAggregateEntity> children) {
- this.children = children;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/HierarchicalAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/HierarchicalAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/HierarchicalAggregator.java
deleted file mode 100755
index e114d69..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/HierarchicalAggregator.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import java.util.List;
-import java.util.SortedMap;
-
-import eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import eagle.query.aggregate.AggregateFunctionType;
-
-public class HierarchicalAggregator extends AbstractAggregator{
- private HierarchicalAggregateEntity root = new HierarchicalAggregateEntity();
-
- public HierarchicalAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields){
- super(groupbyFields, aggregateFuntionTypes, aggregatedFields);
- }
-
- public void accumulate(TaggedLogAPIEntity entity) throws Exception{
- List<Double> preAggregatedValues = createPreAggregatedValues(entity);
- // aggregate to root first
- addDatapoint(root, preAggregatedValues);
- // go through hierarchical tree
- HierarchicalAggregateEntity current = root;
- int i = 0;
- for(String groupbyField : groupbyFields){
- // determine groupbyFieldValue from tag or fields
- String groupbyFieldValue = determineGroupbyFieldValue(entity, groupbyField, i);
- SortedMap<String, HierarchicalAggregateEntity> children = current.getChildren();
- if(children.get(groupbyFieldValue) == null){
- HierarchicalAggregateEntity tmp = new HierarchicalAggregateEntity();
- children.put(groupbyFieldValue, tmp);
- }
- children.get(groupbyFieldValue).setKey(groupbyFieldValue);
- addDatapoint(children.get(groupbyFieldValue), preAggregatedValues);
- current = children.get(groupbyFieldValue);
- }
- }
-
- private void addDatapoint(HierarchicalAggregateEntity entity, List<Double> values){
- List<GroupbyBucket.Function> functions = entity.getTmpValues();
- // initialize list of function
- if(functions.isEmpty()){
- for(AggregateFunctionType type : aggregateFunctionTypes){
- functions.add(GroupbyBucket._functionFactories.get(type.name()).createFunction());
- }
- }
- int functionIndex = 0;
- for(Double v : values){
- functions.get(functionIndex).run(v);
- functionIndex++;
- }
- }
-
- private void finalizeHierarchicalAggregateEntity(HierarchicalAggregateEntity entity){
- for(GroupbyBucket.Function f : entity.getTmpValues()){
- entity.getValues().add(f.result());
- }
- for(HierarchicalAggregateEntity child : entity.getChildren().values()){
- finalizeHierarchicalAggregateEntity(child);
- }
- entity.setTmpValues(null);
- }
-
- public HierarchicalAggregateEntity result(){
- finalizeHierarchicalAggregateEntity(root);
- return this.root;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/PostFlatAggregateSort.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/PostFlatAggregateSort.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/PostFlatAggregateSort.java
deleted file mode 100644
index b048cf8..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/PostFlatAggregateSort.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-public class PostFlatAggregateSort {
- private static SortedSet<Map.Entry<List<String>, List<Double>>> sortByValue(Map<List<String>, List<Double>> map, List<SortOption> sortOptions) {
- SortedSet<Map.Entry<List<String>, List<Double>>> sortedEntries = new TreeSet<Map.Entry<List<String>, List<Double>>>(new MapEntryComparator(sortOptions));
- sortedEntries.addAll(map.entrySet());
- return sortedEntries;
- }
-
- /**
- * sort aggregated results with sort options
- * @param aggregatedResult aggregated result set, but it is not sorted
- * @sortOptions sorting options
- * @topN top N results will be returned if topN is specified. If it's not specified (as default value 0), all results will be returned
- */
- public static List<Map.Entry<List<String>, List<Double>>> sort(Map<List<String>, List<Double>> aggregatedResult, List<SortOption> sortOptions, int topN){
- SortedSet<Map.Entry<List<String>, List<Double>>> allList = sortByValue(aggregatedResult, sortOptions);
- List<Map.Entry<List<String>, List<Double>>> result = new ArrayList<Map.Entry<List<String>, List<Double>>>();
- for (Map.Entry<List<String>, List<Double>> entry : allList) {
- result.add(entry);
- if (topN > 0 && result.size() >= topN) {
- break;
- }
- }
- return result;
- }
-
- private static class MapEntryComparator implements Comparator<Map.Entry<List<String>, List<Double>>>{
- private List<SortOption> sortOptions;
- public MapEntryComparator(List<SortOption> sortOptions){
- this.sortOptions = sortOptions;
- }
- /**
- * default to sort by all groupby fields
- */
- @Override
- public int compare(Map.Entry<List<String>, List<Double>> e1, Map.Entry<List<String>, List<Double>> e2){
- int r = 0;
- List<String> keyList1 = e1.getKey();
- List<Double> valueList1 = e1.getValue();
- List<String> keyList2 = e2.getKey();
- List<Double> valueList2 = e2.getValue();
- for(SortOption so : sortOptions){
- int index = so.getIndex();
- if (index == -1) {
- continue;
- }
- if(!so.isInGroupby()){ // sort fields come from functions
- Double value1 = valueList1.get(index);
- Double value2 = valueList2.get(index);
- r = value1.compareTo(value2);
- }else{ // sort fields come from groupby fields
- String key1 = keyList1.get(index);
- String key2 = keyList2.get(index);
- r = key1.compareTo(key2);
- }
- if(r == 0) continue;
- if(!so.isAscendant()){
- r = -r;
- }
- return r;
- }
- // default to sort by groupby fields ascendently
- if(r ==0){ // TODO is this check necessary
- return new GroupbyFieldsComparator().compare(keyList1, keyList2);
- }
- return r;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/PostHierarchicalAggregateSort.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/PostHierarchicalAggregateSort.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/PostHierarchicalAggregateSort.java
deleted file mode 100644
index ba1218c..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/PostHierarchicalAggregateSort.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-public class PostHierarchicalAggregateSort {
-
- private static SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortByValue(HierarchicalAggregateEntity entity, List<SortOption> sortOptions) {
- SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedEntries = new TreeSet<Map.Entry<String, HierarchicalAggregateEntity>>(new MapEntryComparator(sortOptions));
- sortedEntries.addAll(entity.getChildren().entrySet());
- return sortedEntries;
- }
-
- /**
- * sort aggregated results with sort options
- *
- * @param result
- * @param sortOptions
- * @return
- */
- public static HierarchicalAggregateEntity sort(HierarchicalAggregateEntity result, List<SortOption> sortOptions){
- SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> tmp = sortByValue(result, sortOptions);
- result.setSortedList(tmp);
- result.setChildren(null);
- for(Map.Entry<String, HierarchicalAggregateEntity> entry : tmp){
- sort(entry.getValue(), sortOptions);
- }
- return result;
- }
-
- private static class MapEntryComparator implements Comparator<Map.Entry<String, HierarchicalAggregateEntity>>{
- private List<SortOption> sortOptions;
-
- public MapEntryComparator(List<SortOption> sortOptions){
- this.sortOptions = sortOptions;
- }
-
- /**
- * default to sort by all groupby fields
- */
- @Override
- public int compare(Map.Entry<String, HierarchicalAggregateEntity> e1, Map.Entry<String, HierarchicalAggregateEntity> e2){
- int r = 0;
- String key1 = e1.getKey();
- List<Double> valueList1 = e1.getValue().getValues();
- String key2 = e2.getKey();
- List<Double> valueList2 = e2.getValue().getValues();
- for(SortOption so : sortOptions){
- int index = so.getIndex();
- if (index == -1) {
- continue;
- }
- if(!so.isInGroupby()){ // sort fields come from functions
- Double value1 = valueList1.get(index);
- Double value2 = valueList2.get(index);
- r = value1.compareTo(value2);
- }
- // sort fields come from groupby fields, then silently ignored
-
- if(r == 0) continue;
- if(!so.isAscendant()){
- r = -r;
- }
- return r;
- }
- // default to sort by groupby fields ascendently
- if(r ==0){
- return key1.compareTo(key2);
- }
- return r;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SortOption.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SortOption.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SortOption.java
deleted file mode 100644
index e9b8733..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SortOption.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-/**
- * sum(field1), max(field2) groupby(field3, field4) sort by field1 asc, field3 desc
- * There are 2 SortOption object, then
- * the 1st one is inGroupby=false, index=0, ascendent=true
- * the 2nd one is inGroupby=true, index=1, ascendent=false
- *
- */
-public class SortOption {
- private boolean inGroupby; // sort field defaultly is not from groupby fields
- private int index; // index relative to list of groupby fields or list of functions
- private boolean ascendant; //asc or desc
-
- public boolean isInGroupby() {
- return inGroupby;
- }
- public void setInGroupby(boolean inGroupby) {
- this.inGroupby = inGroupby;
- }
- public int getIndex() {
- return index;
- }
- public void setIndex(int index) {
- this.index = index;
- }
- public boolean isAscendant() {
- return ascendant;
- }
- public void setAscendant(boolean ascendant) {
- this.ascendant = ascendant;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SortOptionsParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SortOptionsParser.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SortOptionsParser.java
deleted file mode 100644
index a5829d3..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SortOptionsParser.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SortOptionsParser {
- private static final Logger LOG = LoggerFactory.getLogger(SortOptionsParser.class);
- private static Pattern pattern = Pattern.compile("^(.+)\\s+(asc|desc)$");
-
- public static List<SortOption> parse(List<String> groupbyFields, List<String> aggregatedFields, List<String> sortOptions, List<String> sortFields){
- List<SortOption> list = new ArrayList<SortOption>();
- for(String sortOption : sortOptions){
- Matcher m = pattern.matcher(sortOption);
- if(!m.find()){
- throw new IllegalArgumentException("sort option must have the format of <groupbyfield|function> asc|desc");
- }
- String field = m.group(1);
- if (sortFields != null) {
- sortFields.add(field);
- }
- SortOption so = new SortOption();
- list.add(so);
- so.setAscendant(m.group(2).equals("asc") ? true : false);
- int index = aggregatedFields.indexOf(field);
- if(index > -1){
- so.setInGroupby(false);
- so.setIndex(index);
- continue;
- }
- if(groupbyFields != null){ // if groupbyFields is not provided, ignore this sort field
- index = groupbyFields.indexOf(field);
- if(index > -1){
- so.setInGroupby(true);
- so.setIndex(index);
- continue;
- }
- }
- logNonExistingSortByField(field);
- so.setInGroupby(false);
- so.setIndex(-1);
- }
- return list;
- }
-
- private static void logNonExistingSortByField(String sortByField){
- LOG.warn("Sortby field is neither in aggregated fields or groupby fields, ignore " + sortByField);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SynchronizedAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SynchronizedAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SynchronizedAggregator.java
deleted file mode 100755
index 9380a52..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SynchronizedAggregator.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import eagle.log.base.taggedlog.TaggedLogAPIEntity;
-
-public class SynchronizedAggregator implements Aggregator{
- private Object mutex = new Object();
- private Aggregator agg;
-
- public SynchronizedAggregator(Aggregator agg){
- this.agg = agg;
- }
-
- @Override
- public void accumulate(TaggedLogAPIEntity entity) throws Exception{
- synchronized(mutex){
- agg.accumulate(entity);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SynchronizedEntityCreationListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SynchronizedEntityCreationListener.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SynchronizedEntityCreationListener.java
deleted file mode 100644
index 884a74a..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/SynchronizedEntityCreationListener.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import eagle.log.entity.EntityCreationListener;
-
-public class SynchronizedEntityCreationListener implements EntityCreationListener{
- private Object mutex = new Object();
- private EntityCreationListener listener;
-
- public SynchronizedEntityCreationListener(EntityCreationListener listener){
- this.listener = listener;
- }
-
- @Override
- public void entityCreated(TaggedLogAPIEntity entity) throws Exception{
- synchronized(mutex){
- listener.entityCreated(entity);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/TimeSeriesAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/TimeSeriesAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/TimeSeriesAggregator.java
deleted file mode 100755
index 5cf6ab4..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/TimeSeriesAggregator.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import eagle.query.aggregate.AggregateFunctionType;
-import eagle.query.aggregate.raw.GroupbyKeyAggregatable;
-import eagle.query.aggregate.raw.GroupbyKeyValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * TODO Assuming that data point comes in the sequence of occurrence time desc or asc would
- * save memory for holding all the data in the memory
- *
- * <h3>Aggregate Bucket Structure</h3>
- * <pre>
- * {
- * ["key<SUB>1</SUB>","key<SUB>2</SUB>",...,(entity.getTimestamp() - startTime)/intervalms]:[value<SUB>1</SUB>,value<SUB>2</SUB>,...,value<SUB>n</SUB>]
- * }
- * </pre>
- *
- */
-public class TimeSeriesAggregator extends FlatAggregator implements GroupbyKeyAggregatable {
- private final static Logger LOG = LoggerFactory.getLogger(TimeSeriesAggregator.class);
- private static final int DEFAULT_DATAPOINT_MAX_COUNT = 1000;
- private long startTime;
- private long endTime;
- private long intervalms;
- private int numFunctions;
- private int ignoredEntityCounter = 0;
-
- public TimeSeriesAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields,
- long startTime, long endTime, long intervalms){
- super(groupbyFields, aggregateFuntionTypes, aggregatedFields);
- // guard to avoid too many data points returned
-// validateTimeRange(startTime, endTime, intervalms);
- this.startTime = startTime;
- this.endTime = endTime;
- this.intervalms = intervalms;
- this.numFunctions = aggregateFuntionTypes.size();
- }
-
-// @Deprecated
-// public static void validateTimeRange(long startTime, long endTime, long intervalms){
-// if(startTime >= endTime || intervalms <= 0){
-// throw new IllegalArgumentException("invalid argument, startTime should be less than endTime and interval must be greater than 0, starTime is " + startTime + " and endTime is " + endTime + ", interval is " + intervalms);
-// }
-// if((endTime-startTime)/intervalms > DEFAULT_DATAPOINT_MAX_COUNT){
-// throw new IllegalArgumentException("invalid argument, # of datapoints should be less than " + DEFAULT_DATAPOINT_MAX_COUNT + ", current # of datapoints is " + (endTime-startTime)/intervalms);
-// }
-// }
-
- public void accumulate(TaggedLogAPIEntity entity) throws Exception{
- List<String> groupbyFieldValues = createGroup(entity);
- // TODO: make sure timestamp be in range of this.startTime to this.endTime in outer side
- // guard the time range to avoid to accumulate entities whose timestamp is bigger than endTime
- if(entity.getTimestamp() >= this.endTime || entity.getTimestamp() < this.startTime){
- if(LOG.isDebugEnabled()) LOG.debug("Ignore in-coming entity whose timestamp > endTime or < startTime, timestamp: " + entity.getTimestamp() + ", startTime:" + startTime + ", endTime:" + endTime);
- this.ignoredEntityCounter ++;
- return;
- }
- // time series bucket index
- long located =(entity.getTimestamp() - startTime)/intervalms;
- groupbyFieldValues.add(String.valueOf(located));
- List<Double> preAggregatedValues = createPreAggregatedValues(entity);
- bucket.addDatapoint(groupbyFieldValues, preAggregatedValues);
- }
-
- public Map<List<String>, List<Double>> result(){
- if(this.ignoredEntityCounter > 0)
- LOG.warn("Ignored "+this.ignoredEntityCounter+" entities for reason: timestamp > "+this.endTime+" or < "+this.startTime);
- return bucket.result();
- }
-
- /**
- * Support new aggregate result
- *
- * @return
- */
- @Override
- public List<GroupbyKeyValue> getGroupbyKeyValues(){
- if(this.ignoredEntityCounter > 0)
- LOG.warn("Ignored "+this.ignoredEntityCounter+" entities for reason: timestamp > "+this.endTime+" or < "+this.startTime);
- return bucket.getGroupbyKeyValue();
- }
-
- public Map<List<String>, List<double[]>> getMetric(){
- // groupbyfields+timeseriesbucket --> aggregatedvalues for different function
- Map<List<String>, List<Double>> result = bucket.result();
-// Map<List<String>, List<double[]>> timeseriesDatapoints = new HashMap<List<String>, List<double[]>>();
-// /**
-// * bug fix: startTime is inclusive and endTime is exclusive
-// */
-//// int numDatapoints =(int)((endTime-startTime)/intervalms + 1);
-// int numDatapoints =(int)((endTime-1-startTime)/intervalms + 1);
-// for(Map.Entry<List<String>, List<Double>> entry : result.entrySet()){
-// // get groups
-// List<String> groupbyFields = entry.getKey();
-// List<String> copy = new ArrayList<String>(groupbyFields);
-// String strTimeseriesIndex = copy.remove(copy.size()-1);
-// List<double[]> functionValues = timeseriesDatapoints.get(copy);
-// if(functionValues == null){
-// functionValues = new ArrayList<double[]>();
-// timeseriesDatapoints.put(copy, functionValues);
-// for(int i=0; i<numFunctions; i++){
-// functionValues.add(new double[numDatapoints]);
-// }
-// }
-// int timeseriesIndex = Integer.valueOf(strTimeseriesIndex);
-// int functionIndex = 0;
-// for(double[] values : functionValues){
-// values[timeseriesIndex] = entry.getValue().get(functionIndex);
-// functionIndex++;
-// }
-// }
-// return timeseriesDatapoints;
- return toMetric(result,(int)((endTime-1-startTime)/intervalms + 1),this.numFunctions);
- }
-
- public static Map<List<String>, List<double[]>> toMetric(Map<List<String>, List<Double>> result,int numDatapoints,int numFunctions){
- Map<List<String>, List<double[]>> timeseriesDatapoints = new HashMap<List<String>, List<double[]>>();
- /**
- * bug fix: startTime is inclusive and endTime is exclusive
- */
-// int numDatapoints =(int)((endTime-startTime)/intervalms + 1);
-// int numDatapoints =(int)((endTime-1-startTime)/intervalms + 1);
- for(Map.Entry<List<String>, List<Double>> entry : result.entrySet()){
- // get groups
- List<String> groupbyFields = entry.getKey();
- List<String> copy = new ArrayList<String>(groupbyFields);
- String strTimeseriesIndex = copy.remove(copy.size()-1);
- List<double[]> functionValues = timeseriesDatapoints.get(copy);
- if(functionValues == null){
- functionValues = new ArrayList<double[]>();
- timeseriesDatapoints.put(copy, functionValues);
- for(int i=0; i<numFunctions; i++){
- functionValues.add(new double[numDatapoints]);
- }
- }
- int timeseriesIndex = Integer.valueOf(strTimeseriesIndex);
- int functionIndex = 0;
- for(double[] values : functionValues){
- values[timeseriesIndex] = entry.getValue().get(functionIndex);
- functionIndex++;
- }
- }
- return timeseriesDatapoints;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/TimeSeriesBucket.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/TimeSeriesBucket.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/TimeSeriesBucket.java
deleted file mode 100644
index 3b95c8f..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/TimeSeriesBucket.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * only numeric aggregation is supported and number type supported is double
- */
-public class TimeSeriesBucket {
- private final static Logger LOG = LoggerFactory.getLogger(TimeSeriesBucket.class);
- private long startTime;
- private long endTime;
- private long interval;
-
- // map of aggregation function to aggregated values
- List<double[]> aggregatedValues = new ArrayList<double[]>();
-
- // align from the startTime
- /**
- *
- * @param startTime milliseconds
- * @param endTime milliseconds
- * @param intervalMillseconds
- * @param aggFunctions
- */
- public TimeSeriesBucket(long startTime, long endTime, long intervalms, int numAggFunctions){
- int count =(int)((endTime-startTime)/intervalms);
- for(int i=0; i<numAggFunctions; i++){
- aggregatedValues.add(new double[count]);
- }
- }
-
- /**
- * add datapoint which has a list of values for different aggregate functions
- * for example, sum(numHosts), count(*), avg(timespan) etc
- * @param timestamp
- * @param values
- */
- public void addDataPoint(long timestamp, List<Double> values){
- // locate timeseries bucket
- if(timestamp < startTime || timestamp > endTime){
- LOG.warn("timestamp<startTime or timestamp>endTime, ignore this datapoint." + timestamp + "," + startTime + ":" + endTime);
- return;
- }
- int located =(int)((timestamp - startTime)/interval);
- int index = 0;
- for(Double src : values){
- double[] timeSeriesValues = aggregatedValues.get(index);
- timeSeriesValues[located] += src;
- index++;
- }
- }
-
- public List<double[]> aggregatedValues(){
- return this.aggregatedValues;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/TimeSeriesPostFlatAggregateSort.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/TimeSeriesPostFlatAggregateSort.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/TimeSeriesPostFlatAggregateSort.java
deleted file mode 100644
index 3c75e52..0000000
--- a/eagle-core/eagle-query/eagle-query-base/src/main/java/eagle/query/aggregate/timeseries/TimeSeriesPostFlatAggregateSort.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.query.aggregate.timeseries;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-public class TimeSeriesPostFlatAggregateSort {
- // private static final Logger logger =
- // LoggerFactory.getLogger(PostFlatAggregateSort.class);
-
- private static SortedSet<Map.Entry<List<String>, List<Double>>> sortByValue(
- Map<List<String>, List<Double>> mapForSort,
- List<SortOption> sortOptions) {
- SortedSet<Map.Entry<List<String>, List<Double>>> sortedEntries = new TreeSet<Map.Entry<List<String>, List<Double>>>(
- new MapEntryComparator(sortOptions));
- sortedEntries.addAll(mapForSort.entrySet());
- return sortedEntries;
- }
-
- /**
- * sort aggregated results with sort options
- *
- * @param entity
- */
- public static List<Map.Entry<List<String>, List<double[]>>> sort(
- Map<List<String>, List<Double>> mapForSort,
- Map<List<String>, List<double[]>> valueMap,
- List<SortOption> sortOptions, int topN) {
-
- processIndex(sortOptions);
- List<Map.Entry<List<String>, List<double[]>>> result = new ArrayList<Map.Entry<List<String>, List<double[]>>>();
- SortedSet<Map.Entry<List<String>, List<Double>>> sortedSet = sortByValue(
- mapForSort, sortOptions);
- for (Map.Entry<List<String>, List<Double>> entry : sortedSet) {
- List<String> key = entry.getKey();
- List<double[]> value = valueMap.get(key);
- if (value != null) {
- Map.Entry<List<String>, List<double[]>> newEntry = new ImmutableEntry<List<String>, List<double[]>>(key, value);
- result.add(newEntry);
- if (topN > 0 && result.size() >= topN) {
- break;
- }
- }
- }
- return result;
- }
-
- private static void processIndex(List<SortOption> sortOptions) {
- for (int i = 0; i < sortOptions.size(); ++i) {
- SortOption so = sortOptions.get(i);
- so.setIndex(i);
- }
- }
-
- private static class MapEntryComparator implements
- Comparator<Map.Entry<List<String>, List<Double>>> {
- private List<SortOption> sortOptions;
-
- public MapEntryComparator(List<SortOption> sortOptions) {
- this.sortOptions = sortOptions;
- }
-
- /**
- * default to sort by all groupby fields
- */
- @Override
- public int compare(Map.Entry<List<String>, List<Double>> e1,
- Map.Entry<List<String>, List<Double>> e2) {
- int r = 0;
- List<String> keyList1 = e1.getKey();
- List<Double> valueList1 = e1.getValue();
- List<String> keyList2 = e2.getKey();
- List<Double> valueList2 = e2.getValue();
- for (SortOption so : sortOptions) {
- int index = so.getIndex();
- if (index == -1) {
- continue;
- }
- if (!so.isInGroupby()) { // sort fields come from functions
- Double value1 = valueList1.get(index);
- Double value2 = valueList2.get(index);
- r = value1.compareTo(value2);
- } else { // sort fields come from groupby fields
- String key1 = keyList1.get(index);
- String key2 = keyList2.get(index);
- r = key1.compareTo(key2);
- }
- if (r == 0)
- continue;
- if (!so.isAscendant()) {
- r = -r;
- }
- return r;
- }
- // default to sort by groupby fields ascendently
- if (r == 0) { // TODO is this check necessary
- return new GroupbyFieldsComparator()
- .compare(keyList1, keyList2);
- }
- return r;
- }
- }
-
- static class ImmutableEntry<K, V> implements Map.Entry<K, V>, Serializable {
- private final K key;
- private final V value;
-
- ImmutableEntry(K key, V value) {
- this.key = key;
- this.value = value;
- }
-
- @Override
- public K getKey() {
- return key;
- }
-
- @Override
- public V getValue() {
- return value;
- }
-
- @Override
- public final V setValue(V value) {
- throw new UnsupportedOperationException();
- }
-
- private static final long serialVersionUID = 0;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/GenericEntityQuery.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/GenericEntityQuery.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/GenericEntityQuery.java
new file mode 100755
index 0000000..1319f43
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/GenericEntityQuery.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @since : 10/30/14,2014
+ */
+public class GenericEntityQuery implements GenericQuery,EntityCreationListener {
+ private static final Logger LOG = LoggerFactory.getLogger(GenericEntityQuery.class);
+
+ private List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>();
+ private StreamReader reader;
+
+ public GenericEntityQuery(String serviceName, SearchCondition condition, String metricName) throws IllegalAccessException, InstantiationException {
+ if(serviceName.equals(GenericMetricEntity.GENERIC_METRIC_SERVICE)){
+ if(LOG.isDebugEnabled()) LOG.debug("List metric query");
+ if(metricName == null || metricName.isEmpty()){
+ throw new IllegalArgumentException("metricName should not be empty for metric list query");
+ }
+ if(!condition.getOutputFields().contains(GenericMetricEntity.VALUE_FIELD)){
+ condition.getOutputFields().add(GenericMetricEntity.VALUE_FIELD);
+ }
+ reader = new GenericEntityStreamReader(serviceName, condition,metricName);
+ }else{
+ if(LOG.isDebugEnabled()) LOG.debug("List entity query");
+ reader = new GenericEntityStreamReader(serviceName, condition);
+ }
+ reader.register(this);
+ }
+
+ @Override
+ public long getLastTimestamp() {
+ return reader.getLastTimestamp();
+ }
+
+ @Override
+ public void entityCreated(TaggedLogAPIEntity entity){
+ entities.add(entity);
+ }
+
+ @Override
+ public List<TaggedLogAPIEntity> result() throws Exception{
+ if(LOG.isDebugEnabled()) LOG.debug("Start reading as batch mode");
+ reader.readAsStream();
+ return entities;
+ }
+
+ @Override
+ public long getFirstTimeStamp() {
+ return reader.getFirstTimestamp();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/GenericQuery.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/GenericQuery.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/GenericQuery.java
new file mode 100755
index 0000000..d3af151
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/GenericQuery.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.query;
+
+import java.util.List;
+
+/**
+ * @since : 10/30/14,2014
+ */
+public interface GenericQuery {
+ /**
+ * Throw all exceptions to http server
+ *
+ * @param <T> result entity type
+ * @return result entities list
+ *
+ * @throws Exception
+ */
+ <T> List<T> result() throws Exception;
+
+ /**
+ * Get last/largest timestamp on all rows
+ *
+ * @return last timestamp
+ */
+ long getLastTimestamp();
+
+ /**
+ * Get first timestamp on all rows
+ */
+ long getFirstTimeStamp();
+}
\ No newline at end of file