You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@eagle.apache.org by "Liangfei.Su" <su...@gmail.com> on 2015/12/24 04:17:15 UTC

StreamAnalyze /Dynamic aggregation

This regarding to EAGLE-79. Below summaries some of the discussion from the
eagle contributors, would like to hear more input.

Please advise.

1.  aggregation policy
given any event schema, describe your aggregation and interpret that into
siddhi language.
note1: support time down-sampling, spatial rollup
note2: metric semantics type, counter(number between the snapshots),
gauge(snapshot value), total(number since start till this snapshot)
note3: does external time support time batch?

Ralph: external time batch could be implemented as extension to siddhi.
Putting implementaion into https://github.com/wso2/siddhi/issues/76.
Existing aggregation func is ok.
Queries like:
        String stream = "define stream jmxMetric(cpu int, memory int,
bytesIn long, bytesOut long, timestamp long);";
        String query = "@info(name = 'downSample') "
                + "from jmxMetric#window.eagle:externalTimeBatch(timestamp,
10 sec) "
                + "select "
                + "avg(cpu) as avgCpu, max(cpu) as maxCpu, min(cpu) as
minCpu, "
                + " '|' as s, "
                + " avg(memory) as avgMem, max(memory) as maxMem,
min(memory) as minMem, "
                + " '|' as s1, "
                + " avg(bytesIn) as avgBytesIn, max(bytesIn) as maxBytesIn,
min(bytesIn) as minBytesIn, "
                + " '|' as s2, "
                + " avg(bytesOut) as avgBytesOut, max(bytesOut) as
maxBytesOut, min(bytesOut) as minBytesOut, "
                + " '|' as s3, "
                + " timestamp as timeWindowEnds, "
                + " '|' as s4, "
                + " count(1) as metric_count "
                + " INSERT INTO tmp;";


2. aggregation optimization
For one single metric, we may need many aggregation policy, for example
multiple aggregation functions sum, avg, min, max or multiple combination
of spatial dimensions.
for example
agg1: select avg(value), min(value), max(value) where metricType=‘xxxx'
from xxx-stream group by host for every 1 hour
agg2: select avg(value), min(value), max(value) where metricType=‘xxxx’
from xxx-stream group by site for every 1 hour
One optimization would be put agg2 after agg1 because one site consists of
many hosts semantically


3. data partition
because we can’t predict what aggregation policy user would create, it is
hard for data partition.
For example if you partition your data by host, then you can easily do
aggregate with host without merge the stream because one host’s data will
always enter one task
at least we can partition data by “host, component, metricType"


4. turn off storm ack
sometimes aggregation would take long time, it would be necessary to turn
off storm ack for such task otherwise it will repeatedly report tuple
timeout



Ralph