You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/08/05 16:05:27 UTC

incubator-eagle git commit: stream tumbling window aggregate library Author: yonzhang2012@apache.org Reviewer: Hao Chen https://issues.apache.org/jira/browse/EAGLE-410

Repository: incubator-eagle
Updated Branches:
  refs/heads/develop 2115d0ad5 -> 1d842563a


stream tumbling window aggregate library
Author: yonzhang2012@apache.org
Reviewer: Hao Chen
https://issues.apache.org/jira/browse/EAGLE-410


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/1d842563
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/1d842563
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/1d842563

Branch: refs/heads/develop
Commit: 1d842563ae7a7d0416a58324eb75760dbdc8d06f
Parents: 2115d0a
Author: yonzhang <yo...@gmail.com>
Authored: Fri Aug 5 09:08:57 2016 -0700
Committer: yonzhang <yo...@gmail.com>
Committed: Fri Aug 5 09:08:57 2016 -0700

----------------------------------------------------------------------
 .../alert/siddhi/TestExternalBatchWindow.java   |  36 ++--
 .../src/main/resources/eagle.siddhiext          |   3 +-
 .../impl/ApplicationProviderSPILoader.java      |   6 +-
 eagle-core/eagle-common/pom.xml                 |   4 +
 .../java/org/apache/eagle/common/agg/Agg.java   |  26 +++
 .../eagle/common/agg/AggregateHandler.java      |  29 +++
 .../eagle/common/agg/AggregateResult.java       |  61 ++++++
 .../org/apache/eagle/common/agg/Column.java     |  25 +++
 .../org/apache/eagle/common/agg/Groupby.java    |  26 +++
 .../eagle/common/agg/SiddhiAggregator.java      | 143 ++++++++++++++
 .../eagle/common/agg/StreamAggregator.java      | 101 ++++++++++
 .../eagle/common/agg/StreamDefinition.java      |  26 +++
 .../eagle/common/agg/TimeBatchAggSpec.java      |  27 +++
 .../eagle/common/agg/TimeBatchWindowSpec.java   |  31 ++++
 .../eagle/common/TestSiddhiAggregator.java      | 111 +++++++++++
 .../eagle/common/agg/TestSiddhiAggregator.java  |  93 ++++++++++
 .../common/agg/TestSiddhiExternalTimeBatch.java | 114 ++++++++++++
 .../eagle/common/agg/TestStreamAggregator.java  |  65 +++++++
 .../ExternalTimeBatchWindowProcessor.java       | 184 -------------------
 .../src/main/resources/eagle.siddhiext          |   2 -
 .../hadoop/metric/TestHadoopMetricSiddhiQL.java |  43 ++++-
 .../eagle-security-hbase-auditlog/pom.xml       |   5 +
 .../hbase/HBaseAuditLogApplication.java         |  62 +++++++
 .../src/main/resources/metadata.xml             |  91 +++++++++
 eagle-server/pom.xml                            |   2 +-
 pom.xml                                         |   2 +-
 26 files changed, 1106 insertions(+), 212 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestExternalBatchWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestExternalBatchWindow.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestExternalBatchWindow.java
index 7903e0d..e289793 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestExternalBatchWindow.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestExternalBatchWindow.java
@@ -41,12 +41,12 @@ import org.wso2.siddhi.core.stream.output.StreamCallback;
 public class TestExternalBatchWindow {
 
     private static SiddhiManager siddhiManager;
-    
+
     @BeforeClass
     public static void beforeClass() {
         siddhiManager = new SiddhiManager();
     }
-    
+
     @AfterClass
     public static void afterClass() {
         siddhiManager.shutdown();
@@ -75,7 +75,7 @@ public class TestExternalBatchWindow {
         for (int i = 0; i < length; i++) {
             input.send(new Object[] { 15, now + i * 1000 });
         }
-        
+
         Thread.sleep(1000);
         Assert.assertFalse("Event happens inner external time batch window, should not have event recieved in callback!", recieved.get());
 
@@ -83,9 +83,9 @@ public class TestExternalBatchWindow {
     }
 
     private ExecutionPlanRuntime simpleQueryRuntime() {
-        String query = "define stream jmxMetric(cpu int, timestamp long); " 
+        String query = "define stream jmxMetric(cpu int, timestamp long); "
                 + "@info(name='query')"
-                + "from jmxMetric#window.eagle:externalTimeBatch(timestamp, 10 sec) " 
+                + "from jmxMetric#window.externalTimeBatch(timestamp, 10 sec) "
                 + "select avg(cpu) as avgCpu, count(1) as count insert into tmp;";
 
         return siddhiManager.createExecutionPlanRuntime(query);
@@ -108,7 +108,7 @@ public class TestExternalBatchWindow {
     public void test05ExternalJoin() {
         // TODO
     }
-    
+
     @Test
     public void test06EdgeCase() throws Exception {
         // every 10 sec
@@ -141,7 +141,7 @@ public class TestExternalBatchWindow {
         for (int i = 0; i < length; i++) {
             input.send(new Object[] { 15, now + i * 10 });
         }
-        
+
         // second round
         // if the trigger event mix with the last window, we should see the avgValue is not expected
         for (int i = 0; i < length; i++) {
@@ -149,11 +149,11 @@ public class TestExternalBatchWindow {
         }
         // to trigger second round
         input.send(new Object[] { 10000, now + 10 * 10000 });
-        
+
 //        latch.await();// for debug
 
         Thread.sleep(1000);
-        
+
         Assert.assertEquals(2, recCount.get());
     }
 
@@ -162,11 +162,11 @@ public class TestExternalBatchWindow {
         String defaultStream = "define stream LoginEvents (myTime long, ip string, phone string,price int);";
 
         String query = " @info(name='pull76') "
-                + " from LoginEvents#window.eagle:externalTimeBatch(myTime, 5 sec)  "
+                + " from LoginEvents#window.externalTimeBatch(myTime, 5 sec)  "
                 + " select myTime, phone, ip, price, count(ip) as cntip , "
                 + " min(myTime) as mintime, max(myTime) as maxtime "
                 + " insert into events ;";
-        
+
         ExecutionPlanRuntime runtime = siddhiManager.createExecutionPlanRuntime(defaultStream + query);
 
         InputHandler inputHandler = runtime.getInputHandler("LoginEvents");
@@ -194,10 +194,10 @@ public class TestExternalBatchWindow {
                 }
             }
         });
-        
-        
+
+
         runtime.start();
-        
+
         long start = System.currentTimeMillis();
         Calendar c = Calendar.getInstance();
         c.add(Calendar.HOUR, 1);
@@ -214,12 +214,12 @@ public class TestExternalBatchWindow {
         Thread.sleep(1000);
         runtime.shutdown();
     }
-    
+
     @Test
     public void test01DownSampling() throws Exception {
         String stream = "define stream jmxMetric(cpu int, memory int, bytesIn long, bytesOut long, timestamp long);";
-        String query = "@info(name = 'downSample') " 
-                + "from jmxMetric#window.eagle:externalTimeBatch(timestamp, 10 sec) "
+        String query = "@info(name = 'downSample') "
+                + "from jmxMetric#window.externalTimeBatch(timestamp, 10 sec) "
                 + "select "
                 + "avg(cpu) as avgCpu, max(cpu) as maxCpu, min(cpu) as minCpu, "
                 + " '|' as s, "
@@ -227,7 +227,7 @@ public class TestExternalBatchWindow {
                 + " '|' as s1, "
                 + " avg(bytesIn) as avgBytesIn, max(bytesIn) as maxBytesIn, min(bytesIn) as minBytesIn, "
                 + " '|' as s2, "
-                + " avg(bytesOut) as avgBytesOut, max(bytesOut) as maxBytesOut, min(bytesOut) as minBytesOut, " 
+                + " avg(bytesOut) as avgBytesOut, max(bytesOut) as maxBytesOut, min(bytesOut) as minBytesOut, "
                 + " '|' as s3, "
                 + " timestamp as timeWindowEnds, "
                 + " '|' as s4, "

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext
index 506bad9..4ce9805 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext
@@ -15,5 +15,4 @@
 # limitations under the License.
 #
 
-# externalTimeBatch=org.apache.eagle.policy.siddhi.extension.ExternalTimeBatchWindowProcessor
-collect=org.apache.eagle.alert.engine.siddhi.extension.AttributeCollectAggregator
\ No newline at end of file
+collect=org.apache.eagle.alert.engine.siddhi.extension.AttributeCollectAggregator

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
index 42285b3..977b017 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
@@ -67,17 +67,17 @@ public class ApplicationProviderSPILoader extends ApplicationProviderLoader{
         } else {
             LOG.info("Loading application providers from context class loader");
             ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
-            loadProviderFromClassLoader(classLoader,(applicationProviderConfig) -> DynamicJarPathFinder.findPath(applicationProviderConfig.getClass()));
+            loadProviderFromClassLoader(classLoader,(applicationProvider) -> DynamicJarPathFinder.findPath(applicationProvider.getClass()));
         }
     }
 
-    private void loadProviderFromClassLoader(ClassLoader jarFileClassLoader, Function<ApplicationProviderConfig,String> jarFileSupplier){
+    private void loadProviderFromClassLoader(ClassLoader jarFileClassLoader, Function<ApplicationProvider,String> jarFileSupplier){
         ServiceLoader<ApplicationProvider> serviceLoader = ServiceLoader.load(ApplicationProvider.class, jarFileClassLoader);
         for (ApplicationProvider applicationProvider : serviceLoader) {
             try {
                 ApplicationProviderConfig providerConfig = new ApplicationProviderConfig();
                 providerConfig.setClassName(applicationProvider.getClass().getCanonicalName());
-                providerConfig.setJarPath(jarFileSupplier.apply(providerConfig));
+                providerConfig.setJarPath(jarFileSupplier.apply(applicationProvider));
                 applicationProvider.prepare(providerConfig, getConfig());
                 registerProvider(applicationProvider);
             }catch (Throwable ex){

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-common/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/pom.xml b/eagle-core/eagle-common/pom.xml
index 33ed55d..4591324 100644
--- a/eagle-core/eagle-common/pom.xml
+++ b/eagle-core/eagle-common/pom.xml
@@ -32,6 +32,10 @@
 
 	<dependencies>
 		<dependency>
+			<groupId>org.wso2.siddhi</groupId>
+			<artifactId>siddhi-core</artifactId>
+		</dependency>
+		<dependency>
 			<groupId>commons-configuration</groupId>
 			<artifactId>commons-configuration</artifactId>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/Agg.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/Agg.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/Agg.java
new file mode 100644
index 0000000..5e26fd2
--- /dev/null
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/Agg.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.common.agg;
+
+/**
+ * Since 8/3/16.
+ */
+public class Agg {
+    String field;
+    String function;
+    String alias;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/AggregateHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/AggregateHandler.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/AggregateHandler.java
new file mode 100644
index 0000000..59324c2
--- /dev/null
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/AggregateHandler.java
@@ -0,0 +1,29 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  * <p/>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p/>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.common.agg;
+
+import java.util.List;
+
+/**
+ * Since 8/4/16.
+ */
+public interface AggregateHandler {
+    void onAggregate(List<AggregateResult> result);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/AggregateResult.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/AggregateResult.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/AggregateResult.java
new file mode 100644
index 0000000..bb0ac5d
--- /dev/null
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/AggregateResult.java
@@ -0,0 +1,61 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  * <p/>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p/>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.common.agg;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Since 8/4/16.
+ * schema is : gbfield, gbfield, ..., aggField, aggField, ...
+ */
+public class AggregateResult {
+    private Object[] data;
+    private Map<String, Integer> colIndices;
+    private List<String> colNames;
+    public AggregateResult(Object[] data, Map<String, Integer> colIndices, List<String> colNames){
+        this.data = data;
+        this.colIndices = colIndices;
+        this.colNames = colNames;
+    }
+
+    public Object get(int index){
+        return data[index];
+    }
+
+    public Object get(String fieldName){
+        int index = colIndices.get(fieldName);
+        return get(index);
+    }
+
+    public String toString(){
+        StringBuilder sb = new StringBuilder();
+        sb.append("{");
+        for(int i=0; i<data.length; i++){
+            sb.append(colNames.get(i));
+            sb.append("=");
+            sb.append(data[i]);
+            sb.append(",");
+        }
+        sb.deleteCharAt(sb.length() - 1);
+        sb.append("}");
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/Column.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/Column.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/Column.java
new file mode 100644
index 0000000..00a11d6
--- /dev/null
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/Column.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.common.agg;
+
+/**
+ * Since 8/3/16.
+ */
+public class Column {
+    String name;
+    String type;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/Groupby.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/Groupby.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/Groupby.java
new file mode 100644
index 0000000..a74d677
--- /dev/null
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/Groupby.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.common.agg;
+
+import java.util.List;
+
+/**
+ * Since 8/3/16.
+ */
+public class Groupby {
+    List<String> cols;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/SiddhiAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/SiddhiAggregator.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/SiddhiAggregator.java
new file mode 100644
index 0000000..d0b3987
--- /dev/null
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/SiddhiAggregator.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.common.agg;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.query.output.callback.QueryCallback;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Since 8/3/16.
+ */
+public class SiddhiAggregator {
+    private static final Logger LOG = LoggerFactory.getLogger(SiddhiAggregator.class);
+    private TimeBatchWindowSpec spec;
+    private StreamDefinition sd;
+    private InputHandler input;
+    public SiddhiAggregator(TimeBatchWindowSpec spec, StreamDefinition sd, final AggregateHandler handler){
+        this.spec = spec;
+        this.sd = sd;
+
+        Map<String, Integer> colIndices = new HashMap<>();
+        List<String> colNames = new ArrayList<>();
+        int i = 0;
+        for(String col : spec.groupby.cols){
+            colIndices.put(col, i++);
+            colNames.add(col);
+        }
+        for(Agg agg : spec.aggs){
+            colIndices.put(agg.alias, i++);
+            colNames.add(agg.alias);
+        }
+
+        String query = buildSiddhiAggQuery();
+        SiddhiManager sm = new SiddhiManager();
+        ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(query);
+
+        input = runtime.getInputHandler("s");
+
+        runtime.addCallback("query", new QueryCallback() {
+            @Override
+            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+                List<AggregateResult> rows = new ArrayList<AggregateResult>();
+                for(Event e : inEvents) {
+                    AggregateResult result = new AggregateResult(e.getData(), colIndices, colNames);
+                    rows.add(result);
+                }
+                handler.onAggregate(rows);
+            }
+        });
+        runtime.start();
+    }
+
+    public void add(Object[] data) throws Exception{
+        input.send(data);
+    }
+
+    /**
+     * example siddhi query
+     *   String ql = "define stream s (host string, timestamp long, metric string, site string, value double);" +
+          "@info(name='query') " +
+          " from s[metric == \"missingblocks\"]#window.externalTimeBatch(timestamp, 1 min, 0) select host, count(value) as avg group by host insert into tmp; ";
+     * @return
+     */
+    private String buildSiddhiAggQuery(){
+        StringBuilder sb = new StringBuilder();
+        sb.append("define stream s(");
+        if(sd.columns == null || sd.columns.size() == 0) {
+            throw new IllegalStateException("input stream should contains at least one column");
+        }
+        for(Column col : sd.columns){
+            appendColumnDef(sb, col);
+            sb.append(",");
+        }
+        sb.deleteCharAt(sb.length()-1);
+        sb.append(");");
+
+        sb.append(" @info(name='query') ");
+        sb.append("from s[");
+        sb.append(spec.filter);
+        sb.append("]#window.externalTimeBatch(");
+        sb.append(spec.timestampColumn);
+        sb.append(",");
+        sb.append(spec.windowDuration);
+        sb.append(",");
+        sb.append(spec.start);
+        sb.append(")");
+        sb.append(" select ");
+        for(String gbField : spec.groupby.cols){
+            sb.append(gbField);
+            sb.append(",");
+        }
+        if(spec.aggs == null){
+            throw new IllegalStateException("at least one aggregate function should be present");
+        }
+        for(Agg agg : spec.aggs){
+            sb.append(agg.function);
+            sb.append("(");
+            sb.append(agg.field);
+            sb.append(") as ");
+            sb.append(agg.alias);
+            sb.append(",");
+        }
+        sb.deleteCharAt(sb.length() - 1);
+        sb.append(" group by ");
+        for(String gbField : spec.groupby.cols){
+            sb.append(gbField);
+            sb.append(",");
+        }
+        sb.deleteCharAt(sb.length() - 1);
+        sb.append(" insert into tmp;");
+        LOG.info("query : " + sb.toString());
+        return sb.toString();
+    }
+
+    private void appendColumnDef(StringBuilder sb, Column col){
+        sb.append(col.name);
+        sb.append(" ");
+        sb.append(col.type);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/StreamAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/StreamAggregator.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/StreamAggregator.java
new file mode 100644
index 0000000..37802fd
--- /dev/null
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/StreamAggregator.java
@@ -0,0 +1,101 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  * <p/>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p/>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.common.agg;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Since 8/4/16.
+ */
+public class StreamAggregator {
+    public static StreamAggregatorBuilder builder(){
+        return new StreamAggregatorBuilder();
+    }
+    public static class StreamAggregatorBuilder{
+        private Groupby gb = new Groupby();
+        private String windowDuration;
+        private long start;
+        private String filter;
+        private List<Agg> aggs = new ArrayList<>();
+        private String timestampColumn;
+        private StreamDefinition sd = new StreamDefinition();
+        private AggregateHandler handler;
+        public StreamAggregatorBuilder groupby(String ... gbFields){
+            gb.cols = Arrays.asList(gbFields);
+            return this;
+        }
+        public StreamAggregatorBuilder window(String windowDuration){
+            window(windowDuration, 0);
+            return this;
+        }
+        public StreamAggregatorBuilder window(String windowDuration, long start){
+            this.windowDuration = windowDuration;
+            this.start = start;
+            return this;
+        }
+        public StreamAggregatorBuilder timeColumn(String timestampColumn){
+            this.timestampColumn = timestampColumn;
+            return this;
+        }
+        public StreamAggregatorBuilder filter(String filter){
+            this.filter = filter;
+            return this;
+        }
+        public StreamAggregatorBuilder agg(String function, String field, String alias){
+            Agg agg = new Agg();
+            agg.function = function;
+            agg.field = field;
+            agg.alias = alias;
+            aggs.add(agg);
+            return this;
+        }
+        public StreamAggregatorBuilder columnDef(String colName, String colType){
+            Column col = new Column();
+            col.name = colName;
+            col.type = colType;
+            if(sd.columns == null){
+                sd.columns = new ArrayList<>();
+            }
+            sd.columns.add(col);
+            return this;
+        }
+        public StreamAggregatorBuilder streamDef(StreamDefinition sd){
+            this.sd = sd;
+            return this;
+        }
+        public StreamAggregatorBuilder aggregateHandler(AggregateHandler handler){
+            this.handler = handler;
+            return this;
+        }
+        public SiddhiAggregator build(){
+            TimeBatchWindowSpec spec = new TimeBatchWindowSpec();
+            spec.aggs = aggs;
+            spec.filter = filter;
+            spec.groupby = gb;
+            spec.start = start;
+            spec.timestampColumn = timestampColumn;
+            spec.windowDuration = windowDuration;
+            SiddhiAggregator aggregator = new SiddhiAggregator(spec, sd, handler);
+            return aggregator;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/StreamDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/StreamDefinition.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/StreamDefinition.java
new file mode 100644
index 0000000..1489496
--- /dev/null
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/StreamDefinition.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.common.agg;
+
+import java.util.List;
+
+/**
+ * Since 8/3/16.
+ */
+public class StreamDefinition {
+    List<Column> columns; // column sequence is significant
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/TimeBatchAggSpec.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/TimeBatchAggSpec.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/TimeBatchAggSpec.java
new file mode 100644
index 0000000..fdb269a
--- /dev/null
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/TimeBatchAggSpec.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.common.agg;
+
+/**
+ * Since 8/3/16.
+ */
+public class TimeBatchAggSpec {
+    long batchSize;
+    long offset;
+    Groupby groupby;
+    Agg agg;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/TimeBatchWindowSpec.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/TimeBatchWindowSpec.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/TimeBatchWindowSpec.java
new file mode 100644
index 0000000..6b58b3a
--- /dev/null
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/TimeBatchWindowSpec.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.common.agg;
+
+import java.util.List;
+
+/**
+ * Since 8/3/16.
+ */
+public class TimeBatchWindowSpec {
+    Groupby groupby;
+    List<Agg> aggs;
+    String filter;
+    String timestampColumn;
+    String windowDuration;
+    long start;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestSiddhiAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestSiddhiAggregator.java b/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestSiddhiAggregator.java
new file mode 100644
index 0000000..86d6016
--- /dev/null
+++ b/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestSiddhiAggregator.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.common;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.query.output.callback.QueryCallback;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Since 8/3/16.
+ */
+public class TestSiddhiAggregator {
+    @Test
+    public void testSiddhi() throws Exception{
+        String ql = "define stream s (host string, timestamp long, metric string, site string, value double);" +
+                " @info(name='query') " +
+                " from s[metric == \"missingblocks\"]#window.externalTimeBatch(timestamp, 1 min, 0) select host, count(value) as avg group by host insert into tmp; ";
+        SiddhiManager sm = new SiddhiManager();
+        ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(ql);
+
+        InputHandler input = runtime.getInputHandler("s");
+
+        AtomicInteger index = new AtomicInteger(0);
+
+        runtime.addCallback("query", new QueryCallback() {
+                @Override
+                public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+                    printEvents(inEvents);
+                    if(index.get() == 0){
+                        Assert.assertEquals(3, inEvents.length);
+                        Assert.assertEquals("host1", inEvents[0].getData()[0]);
+                        Assert.assertEquals(3L, inEvents[0].getData()[1]);
+                        Assert.assertEquals("host2", inEvents[1].getData()[0]);
+                        Assert.assertEquals(4L, inEvents[1].getData()[1]);
+                        Assert.assertEquals("host3", inEvents[2].getData()[0]);
+                        Assert.assertEquals(2L, inEvents[2].getData()[1]);
+                        index.incrementAndGet();
+                    }else if(index.get() == 1){
+                        Assert.assertEquals(3, inEvents.length);
+                        Assert.assertEquals("host1", inEvents[0].getData()[0]);
+                        Assert.assertEquals(1L, inEvents[0].getData()[1]);
+                        Assert.assertEquals("host2", inEvents[1].getData()[0]);
+                        Assert.assertEquals(2L, inEvents[1].getData()[1]);
+                        Assert.assertEquals("host3", inEvents[2].getData()[0]);
+                        Assert.assertEquals(2L, inEvents[2].getData()[1]);
+                        index.incrementAndGet();
+                    }
+                }
+            });
+        runtime.start();
+
+        sendEvents(3, 4, 2, input, 1000L);
+        Thread.sleep(1000);
+        sendEvents(1, 2, 2, input, 61000L);
+        sendEvents(3, 10, 7, input, 121000L);
+        runtime.shutdown();
+        sm.shutdown();
+        Thread.sleep(1000);
+    }
+
+    void sendEvents(int countHost1, int countHost2, int countHost3, InputHandler input, long startTime) throws Exception{
+        for(int i=0; i<countHost1; i++){
+            Event e = createEvent("host1", startTime + i*100);
+            input.send(e);
+        }
+        startTime += 2000;
+        for(int i=0; i<countHost2; i++){
+            Event e = createEvent("host2", startTime + i*100);
+            input.send(e);
+        }
+        startTime += 4000;
+       for(int i=0; i<countHost3; i++){
+            Event e = createEvent("host3", startTime + i*100);
+            input.send(e);
+        }
+    }
+
+    void printEvents(Event[] inEvents){
+        for(Event e : inEvents) {
+            System.out.print(e);
+            System.out.print(",");
+        }
+        System.out.println();
+    }
+    Event createEvent(String host, long timestamp){
+        Event e = new Event();
+        e.setTimestamp(timestamp);
+        e.setData(new Object[]{host, timestamp, "missingblocks", "site1", 14.0});
+        return e;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/agg/TestSiddhiAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/agg/TestSiddhiAggregator.java b/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/agg/TestSiddhiAggregator.java
new file mode 100644
index 0000000..75b50ea
--- /dev/null
+++ b/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/agg/TestSiddhiAggregator.java
@@ -0,0 +1,93 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  * <p/>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p/>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.common.agg;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Since 8/4/16.
+ */
+public class TestSiddhiAggregator {
+    @Test
+    public void test() throws Exception{
+        TimeBatchWindowSpec spec = new TimeBatchWindowSpec();
+        Agg agg = new Agg();
+        agg.field = "value";
+        agg.function = "avg";
+        agg.alias = "avg";
+        spec.aggs = Arrays.asList(agg);
+        spec.filter = "metric==\"missingblocks\"";
+        Groupby gb = new Groupby();
+        gb.cols = Arrays.asList("host");
+        spec.groupby = gb;
+        spec.start = 0L;
+        spec.timestampColumn = "timestamp";
+        spec.windowDuration = "1 min";
+
+        StreamDefinition sd = new StreamDefinition();
+        List<Column> columns = new ArrayList<>();
+        sd.columns = columns;
+        Column host = new Column();
+        host.name = "host";
+        host.type = "string";
+        columns.add(host);
+        Column timestamp = new Column();
+        timestamp.name = "timestamp";
+        timestamp.type = "long";
+        columns.add(timestamp);
+        Column metric = new Column();
+        metric.name = "metric";
+        metric.type = "string";
+        columns.add(metric);
+        Column site = new Column();
+        site.name = "site";
+        site.type = "string";
+        columns.add(site);
+        Column value = new Column();
+        value.name = "value";
+        value.type = "double";
+        columns.add(value);
+
+        SiddhiAggregator aggregator = new SiddhiAggregator(spec, sd, new AggregateHandler() {
+            @Override
+            public void onAggregate(List<AggregateResult> result) {
+                System.out.println(result);
+            }
+        });
+
+        aggregator.add(new Object[]{"host1", 1000L, "missingblocks", "site1", 10.0});
+        aggregator.add(new Object[]{"host2", 2000L, "missingblocks", "site1", 16.0});
+        aggregator.add(new Object[]{"host3", 2000L, "missingblocks", "site1", 11.0});
+        aggregator.add(new Object[]{"host1", 21000L, "missingblocks", "site1", 20.0});
+
+        aggregator.add(new Object[]{"host1", 61000L, "missingblocks", "site1", 14.0});
+        aggregator.add(new Object[]{"host2", 61500L, "missingblocks", "site1", 14.0});
+        aggregator.add(new Object[]{"host3", 62000L, "missingblocks", "site1", 13.0});
+        aggregator.add(new Object[]{"host2", 63500L, "missingblocks", "site1", 19.0});
+
+        aggregator.add(new Object[]{"host1", 121000L, "missingblocks", "site1", 14.0});
+        aggregator.add(new Object[]{"host2", 121000L, "missingblocks", "site1", 14.0});
+        aggregator.add(new Object[]{"host3", 122000L, "missingblocks", "site1", 13.0});
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/agg/TestSiddhiExternalTimeBatch.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/agg/TestSiddhiExternalTimeBatch.java b/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/agg/TestSiddhiExternalTimeBatch.java
new file mode 100644
index 0000000..e59c0c4
--- /dev/null
+++ b/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/agg/TestSiddhiExternalTimeBatch.java
@@ -0,0 +1,114 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  * <p/>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p/>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.eagle.common.agg;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.query.output.callback.QueryCallback;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Since 8/3/16.
+ */
+public class TestSiddhiExternalTimeBatch {
+    @Test
+    public void testSiddhi() throws Exception{
+        String ql = "define stream s (host string, timestamp long, metric string, site string, value double);" +
+                " @info(name='query') " +
+                " from s[metric == \"missingblocks\"]#window.externalTimeBatch(timestamp, 1 min, 0) select host, count(value) as avg group by host insert into tmp; ";
+        System.out.println("query: " + ql);
+        SiddhiManager sm = new SiddhiManager();
+        ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(ql);
+
+        InputHandler input = runtime.getInputHandler("s");
+
+        AtomicInteger index = new AtomicInteger(0);
+
+        runtime.addCallback("query", new QueryCallback() {
+                @Override
+                public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+                    printEvents(inEvents);
+                    if(index.get() == 0){
+                        Assert.assertEquals(3, inEvents.length);
+                        Assert.assertEquals("host1", inEvents[0].getData()[0]);
+                        Assert.assertEquals(3L, inEvents[0].getData()[1]);
+                        Assert.assertEquals("host2", inEvents[1].getData()[0]);
+                        Assert.assertEquals(4L, inEvents[1].getData()[1]);
+                        Assert.assertEquals("host3", inEvents[2].getData()[0]);
+                        Assert.assertEquals(2L, inEvents[2].getData()[1]);
+                        index.incrementAndGet();
+                    }else if(index.get() == 1){
+                        Assert.assertEquals(3, inEvents.length);
+                        Assert.assertEquals("host1", inEvents[0].getData()[0]);
+                        Assert.assertEquals(1L, inEvents[0].getData()[1]);
+                        Assert.assertEquals("host2", inEvents[1].getData()[0]);
+                        Assert.assertEquals(2L, inEvents[1].getData()[1]);
+                        Assert.assertEquals("host3", inEvents[2].getData()[0]);
+                        Assert.assertEquals(2L, inEvents[2].getData()[1]);
+                        index.incrementAndGet();
+                    }
+                }
+            });
+        runtime.start();
+
+        sendEvents(3, 4, 2, input, 1000L);
+        Thread.sleep(1000);
+        sendEvents(1, 2, 2, input, 61000L);
+        sendEvents(3, 10, 7, input, 121000L);
+        runtime.shutdown();
+        sm.shutdown();
+        Thread.sleep(1000);
+    }
+
+    void sendEvents(int countHost1, int countHost2, int countHost3, InputHandler input, long startTime) throws Exception{
+        for(int i=0; i<countHost1; i++){
+            Event e = createEvent("host1", startTime + i*100);
+            input.send(e);
+        }
+        startTime += 2000;
+        for(int i=0; i<countHost2; i++){
+            Event e = createEvent("host2", startTime + i*100);
+            input.send(e);
+        }
+        startTime += 4000;
+       for(int i=0; i<countHost3; i++){
+            Event e = createEvent("host3", startTime + i*100);
+            input.send(e);
+        }
+    }
+
+    void printEvents(Event[] inEvents){
+        for(Event e : inEvents) {
+            System.out.print(e);
+            System.out.print(",");
+        }
+        System.out.println();
+    }
+    Event createEvent(String host, long timestamp){
+        Event e = new Event();
+        e.setTimestamp(timestamp);
+        e.setData(new Object[]{host, timestamp, "missingblocks", "site1", 14.0});
+        return e;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/agg/TestStreamAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/agg/TestStreamAggregator.java b/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/agg/TestStreamAggregator.java
new file mode 100644
index 0000000..76b2264
--- /dev/null
+++ b/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/agg/TestStreamAggregator.java
@@ -0,0 +1,65 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  * <p/>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p/>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.common.agg;
+
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * Since 8/4/16.
+ */
+public class TestStreamAggregator {
+    @Test
+    public void test() throws Exception{
+        SiddhiAggregator aggregator = StreamAggregator.builder()
+                .columnDef("host", "string")
+                .columnDef("timestamp", "long")
+                .columnDef("metric", "string")
+                .columnDef("site", "string")
+                .columnDef("value", "double")
+                .filter("metric==\"missingblocks\"")
+                .groupby("host")
+                .agg("avg", "value", "avg")
+                .timeColumn("timestamp")
+                .window("1 min", 0)
+                .aggregateHandler(new AggregateHandler() {
+                    @Override
+                    public void onAggregate(List<AggregateResult> result) {
+                        System.out.println(result);
+                    }
+                })
+                .build();
+
+        aggregator.add(new Object[]{"host1", 1000L, "missingblocks", "site1", 10.0});
+        aggregator.add(new Object[]{"host2", 2000L, "missingblocks", "site1", 16.0});
+        aggregator.add(new Object[]{"host3", 2000L, "missingblocks", "site1", 11.0});
+        aggregator.add(new Object[]{"host1", 21000L, "missingblocks", "site1", 20.0});
+
+        aggregator.add(new Object[]{"host1", 61000L, "missingblocks", "site1", 14.0});
+        aggregator.add(new Object[]{"host2", 61500L, "missingblocks", "site1", 14.0});
+        aggregator.add(new Object[]{"host3", 62000L, "missingblocks", "site1", 13.0});
+        aggregator.add(new Object[]{"host2", 63500L, "missingblocks", "site1", 19.0});
+
+        aggregator.add(new Object[]{"host1", 121000L, "missingblocks", "site1", 14.0});
+        aggregator.add(new Object[]{"host2", 121000L, "missingblocks", "site1", 14.0});
+        aggregator.add(new Object[]{"host3", 122000L, "missingblocks", "site1", 13.0});
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/extension/ExternalTimeBatchWindowProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/extension/ExternalTimeBatchWindowProcessor.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/extension/ExternalTimeBatchWindowProcessor.java
deleted file mode 100644
index c70d1a1..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/extension/ExternalTimeBatchWindowProcessor.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy.siddhi.extension;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.config.ExecutionPlanContext;
-import org.wso2.siddhi.core.event.ComplexEvent;
-import org.wso2.siddhi.core.event.ComplexEventChunk;
-import org.wso2.siddhi.core.event.MetaComplexEvent;
-import org.wso2.siddhi.core.event.stream.StreamEvent;
-import org.wso2.siddhi.core.event.stream.StreamEventCloner;
-import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
-import org.wso2.siddhi.core.executor.ExpressionExecutor;
-import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
-import org.wso2.siddhi.core.query.processor.Processor;
-import org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor;
-import org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor;
-import org.wso2.siddhi.core.table.EventTable;
-import org.wso2.siddhi.core.util.collection.operator.Finder;
-import org.wso2.siddhi.core.util.parser.CollectionOperatorParser;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
-import org.wso2.siddhi.query.api.expression.Expression;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * @since Dec 23, 2015
- *
- */
-
-public class ExternalTimeBatchWindowProcessor extends WindowProcessor implements FindableProcessor {
-    
-    private long timeToKeep;
-
-    private ComplexEventChunk<StreamEvent> currentEventChunk = new ComplexEventChunk<StreamEvent>();
-    private ComplexEventChunk<StreamEvent> expiredEventChunk = new ComplexEventChunk<StreamEvent>();
-    
-    static final Logger log = LoggerFactory.getLogger(ExternalTimeBatchWindowProcessor.class);
-    private VariableExpressionExecutor timeStampVariableExpressionExecutor;
-
-    private long lastSendTime = -1;
-
-    @Override
-    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
-        this.expiredEventChunk = new ComplexEventChunk<StreamEvent>();
-        if (attributeExpressionExecutors.length == 2) {
-            if (attributeExpressionExecutors[1].getReturnType() == Attribute.Type.INT) {
-                timeToKeep = Integer.parseInt(String.valueOf(((ConstantExpressionExecutor) attributeExpressionExecutors[1]).getValue()));
-            } else {
-                timeToKeep = Long.parseLong(String.valueOf(((ConstantExpressionExecutor) attributeExpressionExecutors[1]).getValue()));
-            }
-            if (!(attributeExpressionExecutors[0] instanceof VariableExpressionExecutor)) {
-                throw new ExecutionPlanValidationException("ExternalTime window's 1st parameter timeStamp should be a type long stream attribute but found " + attributeExpressionExecutors[0].getClass());
-            }
-            timeStampVariableExpressionExecutor = ((VariableExpressionExecutor) attributeExpressionExecutors[0]);
-            if (timeStampVariableExpressionExecutor.getReturnType() != Attribute.Type.LONG) {
-                throw new ExecutionPlanValidationException("ExternalTime window's 1st parameter timeStamp should be type long, but found " + timeStampVariableExpressionExecutor.getReturnType());
-            }
-        } else {
-            throw new ExecutionPlanValidationException("ExternalTime window should only have two parameter (<long> timeStamp, <int|long|time> windowTime), but found " + attributeExpressionExecutors.length + " input attributes");
-        }
-    }
-    
-    /**
-     * Here an assumption is taken: 
-     * Parameter: timestamp: The time which the window determines as current time and will act upon, 
-     *              the value of this parameter should be monotonically increasing. 
-     * from https://docs.wso2.com/display/CEP400/Inbuilt+Windows#InbuiltWindows-externalTime
-     * 
-     */
-    @Override
-    protected synchronized void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner) {
-        // event incoming trigger process. No events means no action
-        if (!streamEventChunk.hasNext()) {
-            return;
-        }
-
-        // for window beginning, if window is empty, set lastSendTime to incomingChunk first.
-        if (currentEventChunk.getFirst() == null && lastSendTime < 0) {
-            lastSendTime = (Long) streamEventChunk.getFirst().getAttribute(timeStampVariableExpressionExecutor.getPosition());
-        }
-
-        while(streamEventChunk.hasNext()) {
-            StreamEvent currStreamEvent = streamEventChunk.next();
-            if (currStreamEvent.getType() != ComplexEvent.Type.CURRENT) {
-                continue;
-            }
-            
-            long currentTime = (Long) currStreamEvent.getAttribute(timeStampVariableExpressionExecutor.getPosition());
-            if (currentTime < lastSendTime + timeToKeep) {
-                cloneAppend(streamEventCloner, currStreamEvent);
-            } else if (currentTime >= lastSendTime + timeToKeep) {
-                flushCurentChunk(nextProcessor, streamEventCloner, currentTime);
-                cloneAppend(streamEventCloner, currStreamEvent);
-            }
-        }
-    }
-
-    private void cloneAppend(StreamEventCloner streamEventCloner, StreamEvent currStreamEvent) {
-        StreamEvent clonedStreamEvent = streamEventCloner.copyStreamEvent(currStreamEvent);
-        currentEventChunk.add(clonedStreamEvent);
-    }
-
-    private void flushCurentChunk(Processor nextProcessor, StreamEventCloner streamEventCloner, long currentTime) {
-        // need flush the currentEventChunk
-        currentEventChunk.reset();
-        ComplexEventChunk<StreamEvent> newEventChunk = new ComplexEventChunk<StreamEvent>();
-
-        // mark the timestamp for the expiredType event
-        while (expiredEventChunk.hasNext()) {
-            StreamEvent expiredEvent = expiredEventChunk.next();
-            expiredEvent.setTimestamp(currentTime);
-        }
-        // add expired event to newEventChunk too.
-        if (expiredEventChunk.getFirst() != null) {
-            newEventChunk.add(expiredEventChunk.getFirst());
-        }
-        
-        // make current event chunk as expired in expiredChunk
-        expiredEventChunk.clear();
-        while (currentEventChunk.hasNext()) {
-            StreamEvent currentEvent = currentEventChunk.next();
-            StreamEvent toExpireEvent = streamEventCloner.copyStreamEvent(currentEvent);
-            toExpireEvent.setType(StreamEvent.Type.EXPIRED);
-            expiredEventChunk.add(toExpireEvent);
-        }
-
-        // add current event chunk to next processor
-        if (currentEventChunk.getFirst() != null) {
-            newEventChunk.add(currentEventChunk.getFirst());
-        }
-        currentEventChunk.clear();
-
-        // update timestamp, call next processor
-        lastSendTime = currentTime;
-        if (newEventChunk.getFirst() != null) {
-            nextProcessor.process(newEventChunk);
-        }
-    }
-
-    public void start() {
-        //Do nothing
-    }
-
-    public void stop() {
-        //Do nothing
-    }
-
-    public Object[] currentState() {
-        return new Object[]{currentEventChunk, expiredEventChunk};
-    }
-
-    @SuppressWarnings("unchecked")
-    public void restoreState(Object[] state) {
-        currentEventChunk = (ComplexEventChunk<StreamEvent>) state[0];
-        expiredEventChunk = (ComplexEventChunk<StreamEvent>) state[1];
-    }
-
-    public synchronized StreamEvent find(ComplexEvent matchingEvent, Finder finder) {
-        return finder.find(matchingEvent, expiredEventChunk, streamEventCloner);
-    }
-
-    public Finder constructFinder(Expression expression, MetaComplexEvent metaComplexEvent, ExecutionPlanContext executionPlanContext, List<VariableExpressionExecutor> variableExpressionExecutors, Map<String, EventTable> eventTableMap, int matchingStreamIndex, long withinTime) {
-        return CollectionOperatorParser.parse(expression, metaComplexEvent, executionPlanContext, variableExpressionExecutors, eventTableMap, matchingStreamIndex, inputDefinition, withinTime);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/eagle.siddhiext
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/eagle.siddhiext b/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/eagle.siddhiext
index 2671c31..cce3aca 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/eagle.siddhiext
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/eagle.siddhiext
@@ -14,5 +14,3 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
-externalTimeBatch=org.apache.eagle.policy.siddhi.extension.ExternalTimeBatchWindowProcessor
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-hadoop-metric/src/test/java/org/apache/eagle/hadoop/metric/TestHadoopMetricSiddhiQL.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/test/java/org/apache/eagle/hadoop/metric/TestHadoopMetricSiddhiQL.java b/eagle-hadoop-metric/src/test/java/org/apache/eagle/hadoop/metric/TestHadoopMetricSiddhiQL.java
index 1a18655..c8096d6 100644
--- a/eagle-hadoop-metric/src/test/java/org/apache/eagle/hadoop/metric/TestHadoopMetricSiddhiQL.java
+++ b/eagle-hadoop-metric/src/test/java/org/apache/eagle/hadoop/metric/TestHadoopMetricSiddhiQL.java
@@ -117,7 +117,7 @@ public class TestHadoopMetricSiddhiQL {
         latch.await(10, TimeUnit.SECONDS);
         Thread.sleep(3000);
 
-        System.out.println(count.get());
+        System.out.println("callback count=" + count.get());
         if (eventHappenCount >= 0) {
             Assert.assertEquals(eventHappenCount, count.get());
         } else {
@@ -310,4 +310,45 @@ public class TestHadoopMetricSiddhiQL {
         return events;
     }
 
+    @Test
+    public void testNoActiveNamenodeFor3Times() throws Exception {
+        String sql = " define stream s (host string, timestamp long, metric string, component string, site string, value double); " +
+                " @info(name='query') " +
+                " from s[metric == \"hadoop.namenode.hastate.active.count\"]#window.length(3) select  metric, host, value, timestamp, component, site, avg(convert(value, \"long\")) as avgValue, count() as cnt having avgValue==0 and cnt==3  insert into tmp;";
+//        " from s[metric == \"hadoop.namenode.hastate.active.count\"]#window.length(3) select  metric, host, value, timestamp, component, site, min(convert(value, \"long\")) as minValue, max(convert(value, \"long\")) as maxValue, count() as cnt having minValue==0 and maxValue==0 and cnt==3  insert into tmp;";
+
+        System.out.println(sql);
+
+        testQL(sql, generateMBEvents_times_0(1), 0);
+        testQL(sql, generateMBEvents_times_0(2), 0);
+        testQL(sql, generateMBEvents_times_0(3), 1);
+    }
+
+    private List<Event> generateMBEvents_times_0(int times_0) {
+        List<Event> events = new LinkedList<>();
+
+        long base1 = System.currentTimeMillis();
+        double[] values = new double[3];
+        if(times_0 == 1){
+            values[0] = 1.0;
+            values[1] = 0.0;
+            values[2] = 1.0;
+        }else if(times_0 == 2){
+            values[0] = 1.0;
+            values[1] = 0.0;
+            values[2] = 0.0;
+        }else if(times_0 == 3){
+            values[0] = 0.0;
+            values[1] = 0.0;
+            values[2] = 0.0;
+        }
+        for(int i=0; i<3; i++) {
+            // master / slave in sync
+            base1 = base1 + 1000;
+            Event e = new Event();
+            e.setData(new Object[]{"a", base1, "hadoop.namenode.hastate.active.count", "namenode", "sandbox", values[i]});
+            events.add(e);
+        }
+        return events;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-security/eagle-security-hbase-auditlog/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/pom.xml b/eagle-security/eagle-security-hbase-auditlog/pom.xml
index a8b82c7..46d67b5 100644
--- a/eagle-security/eagle-security-hbase-auditlog/pom.xml
+++ b/eagle-security/eagle-security-hbase-auditlog/pom.xml
@@ -39,5 +39,10 @@
             <artifactId>eagle-stream-application-manager</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-app-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
new file mode 100644
index 0000000..49393e1
--- /dev/null
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.security.hbase;
+
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.security.topo.NewKafkaSourcedSpoutProvider;
+import storm.kafka.StringScheme;
+import storm.kafka.bolt.KafkaBolt;
+
+/**
+ * Since 7/27/16.
+ */
+public class HBaseAuditLogApplication{
+    public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
+    public final static String PARSER_TASK_NUM = "topology.numOfParserTasks";
+    public final static String JOIN_TASK_NUM = "topology.numOfJoinTasks";
+    public final static String SINK_TASK_NUM = "topology.numOfSinkTasks";
+    protected void buildApp(TopologyBuilder builder) {
+        System.setProperty("config.resource", "/application.conf");
+        Config config = ConfigFactory.load();
+        NewKafkaSourcedSpoutProvider provider = new NewKafkaSourcedSpoutProvider();
+        IRichSpout spout = provider.getSpout(config);
+
+        HBaseAuditLogParserBolt bolt = new HBaseAuditLogParserBolt();
+
+        int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
+        int numOfParserTasks = config.getInt(PARSER_TASK_NUM);
+        int numOfJoinTasks = config.getInt(JOIN_TASK_NUM);
+        int numOfSinkTasks = config.getInt(SINK_TASK_NUM);
+
+        builder.setSpout("ingest", spout, numOfSpoutTasks);
+        BoltDeclarer boltDeclarer = builder.setBolt("parserBolt", bolt, numOfParserTasks);
+        boltDeclarer.fieldsGrouping("ingest", new Fields(StringScheme.STRING_SCHEME_KEY));
+
+        HbaseResourceSensitivityDataJoinBolt joinBolt = new HbaseResourceSensitivityDataJoinBolt(config);
+        BoltDeclarer joinBoltDeclarer = builder.setBolt("joinBolt", joinBolt, numOfJoinTasks);
+        joinBoltDeclarer.fieldsGrouping("parserBolt", new Fields("f1"));
+
+        KafkaBolt kafkaBolt = new KafkaBolt();
+        BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", kafkaBolt, numOfSinkTasks);
+        kafkaBoltDeclarer.shuffleGrouping("joinBolt");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-security/eagle-security-hbase-auditlog/src/main/resources/metadata.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/resources/metadata.xml b/eagle-security/eagle-security-hbase-auditlog/src/main/resources/metadata.xml
new file mode 100644
index 0000000..4b48c0a
--- /dev/null
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/resources/metadata.xml
@@ -0,0 +1,91 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<application>
+    <type>HBaseAuditLogApplication</type>
+    <name>HBase Audit Log Monitoring Application</name>
+    <version>0.5.0-incubating</version>
+    <appClass>org.apache.eagle.security.hbase.HBaseAuditLogApplication</appClass>
+    <viewPath>/apps/example</viewPath>
+    <configuration>
+        <property>
+            <name>message</name>
+            <displayName>Message</displayName>
+            <value>Hello, example application!</value>
+            <description>Just an sample configuration property</description>
+        </property>
+    </configuration>
+    <streams>
+        <stream>
+            <streamId>hbase_audit_log_stream</streamId>
+            <description>HBase Audit Log Stream</description>
+            <validate>true</validate>
+            <timeseries>true</timeseries>
+            <columns>
+                <column>
+                    <name>action</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>host</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>status</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>timestamp</name>
+                    <type>long</type>
+                </column>
+            </columns>
+        </stream>
+    </streams>
+    <docs>
+        <install>
+# Step 1: Create source kafka topic named "${site}_example_source_topic"
+
+./bin/kafka-topics.sh --create --topic example_source_topic --replication-factor 1 --replication 1
+
+# Step 2: Set up data collector to flow data into kafka topic in
+
+./bin/logstash -f log_collector.conf
+
+## `log_collector.conf` sample as following:
+
+input {
+
+}
+filter {
+
+}
+output{
+
+}
+
+# Step 3: start application
+
+# Step 4: monitor with featured portal or alert with policies
+        </install>
+        <uninstall>
+# Step 1: stop and uninstall application
+# Step 2: delete kafka topic named "${site}_example_source_topic"
+# Step 3: stop logstash
+        </uninstall>
+    </docs>
+</application>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-server/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-server/pom.xml b/eagle-server/pom.xml
index b2fb0d2..6617693 100644
--- a/eagle-server/pom.xml
+++ b/eagle-server/pom.xml
@@ -146,4 +146,4 @@
             </resource>
         </resources>
     </build>
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ccae27f..d103c88 100755
--- a/pom.xml
+++ b/pom.xml
@@ -273,7 +273,7 @@
         <javax.mail.version>1.4</javax.mail.version>
         <extcos4.version>0.4b</extcos4.version>
         <extcos3.version>0.3b</extcos3.version>
-        <siddhi.version>3.0.5</siddhi.version>
+        <siddhi.version>3.1.1</siddhi.version>
 
         <!-- Testing -->
         <junit.version>4.12</junit.version>