You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/08/05 16:05:27 UTC
incubator-eagle git commit: stream tumbling window aggregate library
Author: yonzhang2012@apache.org Reviewer: Hao Chen
https://issues.apache.org/jira/browse/EAGLE-410
Repository: incubator-eagle
Updated Branches:
refs/heads/develop 2115d0ad5 -> 1d842563a
stream tumbling window aggregate library
Author: yonzhang2012@apache.org
Reviewer: Hao Chen
https://issues.apache.org/jira/browse/EAGLE-410
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/1d842563
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/1d842563
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/1d842563
Branch: refs/heads/develop
Commit: 1d842563ae7a7d0416a58324eb75760dbdc8d06f
Parents: 2115d0a
Author: yonzhang <yo...@gmail.com>
Authored: Fri Aug 5 09:08:57 2016 -0700
Committer: yonzhang <yo...@gmail.com>
Committed: Fri Aug 5 09:08:57 2016 -0700
----------------------------------------------------------------------
.../alert/siddhi/TestExternalBatchWindow.java | 36 ++--
.../src/main/resources/eagle.siddhiext | 3 +-
.../impl/ApplicationProviderSPILoader.java | 6 +-
eagle-core/eagle-common/pom.xml | 4 +
.../java/org/apache/eagle/common/agg/Agg.java | 26 +++
.../eagle/common/agg/AggregateHandler.java | 29 +++
.../eagle/common/agg/AggregateResult.java | 61 ++++++
.../org/apache/eagle/common/agg/Column.java | 25 +++
.../org/apache/eagle/common/agg/Groupby.java | 26 +++
.../eagle/common/agg/SiddhiAggregator.java | 143 ++++++++++++++
.../eagle/common/agg/StreamAggregator.java | 101 ++++++++++
.../eagle/common/agg/StreamDefinition.java | 26 +++
.../eagle/common/agg/TimeBatchAggSpec.java | 27 +++
.../eagle/common/agg/TimeBatchWindowSpec.java | 31 ++++
.../eagle/common/TestSiddhiAggregator.java | 111 +++++++++++
.../eagle/common/agg/TestSiddhiAggregator.java | 93 ++++++++++
.../common/agg/TestSiddhiExternalTimeBatch.java | 114 ++++++++++++
.../eagle/common/agg/TestStreamAggregator.java | 65 +++++++
.../ExternalTimeBatchWindowProcessor.java | 184 -------------------
.../src/main/resources/eagle.siddhiext | 2 -
.../hadoop/metric/TestHadoopMetricSiddhiQL.java | 43 ++++-
.../eagle-security-hbase-auditlog/pom.xml | 5 +
.../hbase/HBaseAuditLogApplication.java | 62 +++++++
.../src/main/resources/metadata.xml | 91 +++++++++
eagle-server/pom.xml | 2 +-
pom.xml | 2 +-
26 files changed, 1106 insertions(+), 212 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestExternalBatchWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestExternalBatchWindow.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestExternalBatchWindow.java
index 7903e0d..e289793 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestExternalBatchWindow.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestExternalBatchWindow.java
@@ -41,12 +41,12 @@ import org.wso2.siddhi.core.stream.output.StreamCallback;
public class TestExternalBatchWindow {
private static SiddhiManager siddhiManager;
-
+
@BeforeClass
public static void beforeClass() {
siddhiManager = new SiddhiManager();
}
-
+
@AfterClass
public static void afterClass() {
siddhiManager.shutdown();
@@ -75,7 +75,7 @@ public class TestExternalBatchWindow {
for (int i = 0; i < length; i++) {
input.send(new Object[] { 15, now + i * 1000 });
}
-
+
Thread.sleep(1000);
Assert.assertFalse("Event happens inner external time batch window, should not have event recieved in callback!", recieved.get());
@@ -83,9 +83,9 @@ public class TestExternalBatchWindow {
}
private ExecutionPlanRuntime simpleQueryRuntime() {
- String query = "define stream jmxMetric(cpu int, timestamp long); "
+ String query = "define stream jmxMetric(cpu int, timestamp long); "
+ "@info(name='query')"
- + "from jmxMetric#window.eagle:externalTimeBatch(timestamp, 10 sec) "
+ + "from jmxMetric#window.externalTimeBatch(timestamp, 10 sec) "
+ "select avg(cpu) as avgCpu, count(1) as count insert into tmp;";
return siddhiManager.createExecutionPlanRuntime(query);
@@ -108,7 +108,7 @@ public class TestExternalBatchWindow {
public void test05ExternalJoin() {
// TODO
}
-
+
@Test
public void test06EdgeCase() throws Exception {
// every 10 sec
@@ -141,7 +141,7 @@ public class TestExternalBatchWindow {
for (int i = 0; i < length; i++) {
input.send(new Object[] { 15, now + i * 10 });
}
-
+
// second round
// if the trigger event mix with the last window, we should see the avgValue is not expected
for (int i = 0; i < length; i++) {
@@ -149,11 +149,11 @@ public class TestExternalBatchWindow {
}
// to trigger second round
input.send(new Object[] { 10000, now + 10 * 10000 });
-
+
// latch.await();// for debug
Thread.sleep(1000);
-
+
Assert.assertEquals(2, recCount.get());
}
@@ -162,11 +162,11 @@ public class TestExternalBatchWindow {
String defaultStream = "define stream LoginEvents (myTime long, ip string, phone string,price int);";
String query = " @info(name='pull76') "
- + " from LoginEvents#window.eagle:externalTimeBatch(myTime, 5 sec) "
+ + " from LoginEvents#window.externalTimeBatch(myTime, 5 sec) "
+ " select myTime, phone, ip, price, count(ip) as cntip , "
+ " min(myTime) as mintime, max(myTime) as maxtime "
+ " insert into events ;";
-
+
ExecutionPlanRuntime runtime = siddhiManager.createExecutionPlanRuntime(defaultStream + query);
InputHandler inputHandler = runtime.getInputHandler("LoginEvents");
@@ -194,10 +194,10 @@ public class TestExternalBatchWindow {
}
}
});
-
-
+
+
runtime.start();
-
+
long start = System.currentTimeMillis();
Calendar c = Calendar.getInstance();
c.add(Calendar.HOUR, 1);
@@ -214,12 +214,12 @@ public class TestExternalBatchWindow {
Thread.sleep(1000);
runtime.shutdown();
}
-
+
@Test
public void test01DownSampling() throws Exception {
String stream = "define stream jmxMetric(cpu int, memory int, bytesIn long, bytesOut long, timestamp long);";
- String query = "@info(name = 'downSample') "
- + "from jmxMetric#window.eagle:externalTimeBatch(timestamp, 10 sec) "
+ String query = "@info(name = 'downSample') "
+ + "from jmxMetric#window.externalTimeBatch(timestamp, 10 sec) "
+ "select "
+ "avg(cpu) as avgCpu, max(cpu) as maxCpu, min(cpu) as minCpu, "
+ " '|' as s, "
@@ -227,7 +227,7 @@ public class TestExternalBatchWindow {
+ " '|' as s1, "
+ " avg(bytesIn) as avgBytesIn, max(bytesIn) as maxBytesIn, min(bytesIn) as minBytesIn, "
+ " '|' as s2, "
- + " avg(bytesOut) as avgBytesOut, max(bytesOut) as maxBytesOut, min(bytesOut) as minBytesOut, "
+ + " avg(bytesOut) as avgBytesOut, max(bytesOut) as maxBytesOut, min(bytesOut) as minBytesOut, "
+ " '|' as s3, "
+ " timestamp as timeWindowEnds, "
+ " '|' as s4, "
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext
index 506bad9..4ce9805 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext
@@ -15,5 +15,4 @@
# limitations under the License.
#
-# externalTimeBatch=org.apache.eagle.policy.siddhi.extension.ExternalTimeBatchWindowProcessor
-collect=org.apache.eagle.alert.engine.siddhi.extension.AttributeCollectAggregator
\ No newline at end of file
+collect=org.apache.eagle.alert.engine.siddhi.extension.AttributeCollectAggregator
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
index 42285b3..977b017 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
@@ -67,17 +67,17 @@ public class ApplicationProviderSPILoader extends ApplicationProviderLoader{
} else {
LOG.info("Loading application providers from context class loader");
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
- loadProviderFromClassLoader(classLoader,(applicationProviderConfig) -> DynamicJarPathFinder.findPath(applicationProviderConfig.getClass()));
+ loadProviderFromClassLoader(classLoader,(applicationProvider) -> DynamicJarPathFinder.findPath(applicationProvider.getClass()));
}
}
- private void loadProviderFromClassLoader(ClassLoader jarFileClassLoader, Function<ApplicationProviderConfig,String> jarFileSupplier){
+ private void loadProviderFromClassLoader(ClassLoader jarFileClassLoader, Function<ApplicationProvider,String> jarFileSupplier){
ServiceLoader<ApplicationProvider> serviceLoader = ServiceLoader.load(ApplicationProvider.class, jarFileClassLoader);
for (ApplicationProvider applicationProvider : serviceLoader) {
try {
ApplicationProviderConfig providerConfig = new ApplicationProviderConfig();
providerConfig.setClassName(applicationProvider.getClass().getCanonicalName());
- providerConfig.setJarPath(jarFileSupplier.apply(providerConfig));
+ providerConfig.setJarPath(jarFileSupplier.apply(applicationProvider));
applicationProvider.prepare(providerConfig, getConfig());
registerProvider(applicationProvider);
}catch (Throwable ex){
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-common/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/pom.xml b/eagle-core/eagle-common/pom.xml
index 33ed55d..4591324 100644
--- a/eagle-core/eagle-common/pom.xml
+++ b/eagle-core/eagle-common/pom.xml
@@ -32,6 +32,10 @@
<dependencies>
<dependency>
+ <groupId>org.wso2.siddhi</groupId>
+ <artifactId>siddhi-core</artifactId>
+ </dependency>
+ <dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/Agg.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/Agg.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/Agg.java
new file mode 100644
index 0000000..5e26fd2
--- /dev/null
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/Agg.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.common.agg;
+
+/**
+ * Since 8/3/16.
+ */
+public class Agg {
+ String field;
+ String function;
+ String alias;
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/AggregateHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/AggregateHandler.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/AggregateHandler.java
new file mode 100644
index 0000000..59324c2
--- /dev/null
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/AggregateHandler.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * * <p/>
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * * <p/>
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.common.agg;
+
+import java.util.List;
+
+/**
+ * Since 8/4/16.
+ */
+public interface AggregateHandler {
+ void onAggregate(List<AggregateResult> result);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/AggregateResult.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/AggregateResult.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/AggregateResult.java
new file mode 100644
index 0000000..bb0ac5d
--- /dev/null
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/AggregateResult.java
@@ -0,0 +1,61 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * * <p/>
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * * <p/>
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.common.agg;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Since 8/4/16.
+ * schema is : gbfield, gbfield, ..., aggField, aggField, ...
+ */
+public class AggregateResult {
+ private Object[] data;
+ private Map<String, Integer> colIndices;
+ private List<String> colNames;
+ public AggregateResult(Object[] data, Map<String, Integer> colIndices, List<String> colNames){
+ this.data = data;
+ this.colIndices = colIndices;
+ this.colNames = colNames;
+ }
+
+ public Object get(int index){
+ return data[index];
+ }
+
+ public Object get(String fieldName){
+ int index = colIndices.get(fieldName);
+ return get(index);
+ }
+
+ public String toString(){
+ StringBuilder sb = new StringBuilder();
+ sb.append("{");
+ for(int i=0; i<data.length; i++){
+ sb.append(colNames.get(i));
+ sb.append("=");
+ sb.append(data[i]);
+ sb.append(",");
+ }
+ sb.deleteCharAt(sb.length() - 1);
+ sb.append("}");
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/Column.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/Column.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/Column.java
new file mode 100644
index 0000000..00a11d6
--- /dev/null
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/Column.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.common.agg;
+
+/**
+ * Since 8/3/16.
+ */
+public class Column {
+ String name;
+ String type;
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/Groupby.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/Groupby.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/Groupby.java
new file mode 100644
index 0000000..a74d677
--- /dev/null
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/Groupby.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.common.agg;
+
+import java.util.List;
+
+/**
+ * Since 8/3/16.
+ */
+public class Groupby {
+ List<String> cols;
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/SiddhiAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/SiddhiAggregator.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/SiddhiAggregator.java
new file mode 100644
index 0000000..d0b3987
--- /dev/null
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/SiddhiAggregator.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.common.agg;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.query.output.callback.QueryCallback;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Since 8/3/16.
+ */
+public class SiddhiAggregator {
+ private static final Logger LOG = LoggerFactory.getLogger(SiddhiAggregator.class);
+ private TimeBatchWindowSpec spec;
+ private StreamDefinition sd;
+ private InputHandler input;
+ public SiddhiAggregator(TimeBatchWindowSpec spec, StreamDefinition sd, final AggregateHandler handler){
+ this.spec = spec;
+ this.sd = sd;
+
+ Map<String, Integer> colIndices = new HashMap<>();
+ List<String> colNames = new ArrayList<>();
+ int i = 0;
+ for(String col : spec.groupby.cols){
+ colIndices.put(col, i++);
+ colNames.add(col);
+ }
+ for(Agg agg : spec.aggs){
+ colIndices.put(agg.alias, i++);
+ colNames.add(agg.alias);
+ }
+
+ String query = buildSiddhiAggQuery();
+ SiddhiManager sm = new SiddhiManager();
+ ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(query);
+
+ input = runtime.getInputHandler("s");
+
+ runtime.addCallback("query", new QueryCallback() {
+ @Override
+ public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+ List<AggregateResult> rows = new ArrayList<AggregateResult>();
+ for(Event e : inEvents) {
+ AggregateResult result = new AggregateResult(e.getData(), colIndices, colNames);
+ rows.add(result);
+ }
+ handler.onAggregate(rows);
+ }
+ });
+ runtime.start();
+ }
+
+ public void add(Object[] data) throws Exception{
+ input.send(data);
+ }
+
+ /**
+ * example siddhi query
+ * String ql = "define stream s (host string, timestamp long, metric string, site string, value double);" +
+ "@info(name='query') " +
+ " from s[metric == \"missingblocks\"]#window.externalTimeBatch(timestamp, 1 min, 0) select host, count(value) as avg group by host insert into tmp; ";
+ * @return
+ */
+ private String buildSiddhiAggQuery(){
+ StringBuilder sb = new StringBuilder();
+ sb.append("define stream s(");
+ if(sd.columns == null || sd.columns.size() == 0) {
+ throw new IllegalStateException("input stream should contains at least one column");
+ }
+ for(Column col : sd.columns){
+ appendColumnDef(sb, col);
+ sb.append(",");
+ }
+ sb.deleteCharAt(sb.length()-1);
+ sb.append(");");
+
+ sb.append(" @info(name='query') ");
+ sb.append("from s[");
+ sb.append(spec.filter);
+ sb.append("]#window.externalTimeBatch(");
+ sb.append(spec.timestampColumn);
+ sb.append(",");
+ sb.append(spec.windowDuration);
+ sb.append(",");
+ sb.append(spec.start);
+ sb.append(")");
+ sb.append(" select ");
+ for(String gbField : spec.groupby.cols){
+ sb.append(gbField);
+ sb.append(",");
+ }
+ if(spec.aggs == null){
+ throw new IllegalStateException("at least one aggregate function should be present");
+ }
+ for(Agg agg : spec.aggs){
+ sb.append(agg.function);
+ sb.append("(");
+ sb.append(agg.field);
+ sb.append(") as ");
+ sb.append(agg.alias);
+ sb.append(",");
+ }
+ sb.deleteCharAt(sb.length() - 1);
+ sb.append(" group by ");
+ for(String gbField : spec.groupby.cols){
+ sb.append(gbField);
+ sb.append(",");
+ }
+ sb.deleteCharAt(sb.length() - 1);
+ sb.append(" insert into tmp;");
+ LOG.info("query : " + sb.toString());
+ return sb.toString();
+ }
+
+ private void appendColumnDef(StringBuilder sb, Column col){
+ sb.append(col.name);
+ sb.append(" ");
+ sb.append(col.type);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/StreamAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/StreamAggregator.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/StreamAggregator.java
new file mode 100644
index 0000000..37802fd
--- /dev/null
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/StreamAggregator.java
@@ -0,0 +1,101 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * * <p/>
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * * <p/>
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.common.agg;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Since 8/4/16.
+ */
+public class StreamAggregator {
+ public static StreamAggregatorBuilder builder(){
+ return new StreamAggregatorBuilder();
+ }
+ public static class StreamAggregatorBuilder{
+ private Groupby gb = new Groupby();
+ private String windowDuration;
+ private long start;
+ private String filter;
+ private List<Agg> aggs = new ArrayList<>();
+ private String timestampColumn;
+ private StreamDefinition sd = new StreamDefinition();
+ private AggregateHandler handler;
+ public StreamAggregatorBuilder groupby(String ... gbFields){
+ gb.cols = Arrays.asList(gbFields);
+ return this;
+ }
+ public StreamAggregatorBuilder window(String windowDuration){
+ window(windowDuration, 0);
+ return this;
+ }
+ public StreamAggregatorBuilder window(String windowDuration, long start){
+ this.windowDuration = windowDuration;
+ this.start = start;
+ return this;
+ }
+ public StreamAggregatorBuilder timeColumn(String timestampColumn){
+ this.timestampColumn = timestampColumn;
+ return this;
+ }
+ public StreamAggregatorBuilder filter(String filter){
+ this.filter = filter;
+ return this;
+ }
+ public StreamAggregatorBuilder agg(String function, String field, String alias){
+ Agg agg = new Agg();
+ agg.function = function;
+ agg.field = field;
+ agg.alias = alias;
+ aggs.add(agg);
+ return this;
+ }
+ public StreamAggregatorBuilder columnDef(String colName, String colType){
+ Column col = new Column();
+ col.name = colName;
+ col.type = colType;
+ if(sd.columns == null){
+ sd.columns = new ArrayList<>();
+ }
+ sd.columns.add(col);
+ return this;
+ }
+ public StreamAggregatorBuilder streamDef(StreamDefinition sd){
+ this.sd = sd;
+ return this;
+ }
+ public StreamAggregatorBuilder aggregateHandler(AggregateHandler handler){
+ this.handler = handler;
+ return this;
+ }
+ public SiddhiAggregator build(){
+ TimeBatchWindowSpec spec = new TimeBatchWindowSpec();
+ spec.aggs = aggs;
+ spec.filter = filter;
+ spec.groupby = gb;
+ spec.start = start;
+ spec.timestampColumn = timestampColumn;
+ spec.windowDuration = windowDuration;
+ SiddhiAggregator aggregator = new SiddhiAggregator(spec, sd, handler);
+ return aggregator;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/StreamDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/StreamDefinition.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/StreamDefinition.java
new file mode 100644
index 0000000..1489496
--- /dev/null
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/StreamDefinition.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.common.agg;
+
+import java.util.List;
+
+/**
+ * Since 8/3/16.
+ */
+public class StreamDefinition {
+ List<Column> columns; // column sequence is significant
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/TimeBatchAggSpec.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/TimeBatchAggSpec.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/TimeBatchAggSpec.java
new file mode 100644
index 0000000..fdb269a
--- /dev/null
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/TimeBatchAggSpec.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.common.agg;
+
+/**
+ * Since 8/3/16.
+ */
+public class TimeBatchAggSpec {
+ long batchSize;
+ long offset;
+ Groupby groupby;
+ Agg agg;
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/TimeBatchWindowSpec.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/TimeBatchWindowSpec.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/TimeBatchWindowSpec.java
new file mode 100644
index 0000000..6b58b3a
--- /dev/null
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/agg/TimeBatchWindowSpec.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.common.agg;
+
+import java.util.List;
+
+/**
+ * Since 8/3/16.
+ */
+public class TimeBatchWindowSpec {
+ Groupby groupby;
+ List<Agg> aggs;
+ String filter;
+ String timestampColumn;
+ String windowDuration;
+ long start;
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestSiddhiAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestSiddhiAggregator.java b/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestSiddhiAggregator.java
new file mode 100644
index 0000000..86d6016
--- /dev/null
+++ b/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestSiddhiAggregator.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.common;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.query.output.callback.QueryCallback;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Since 8/3/16.
+ */
+public class TestSiddhiAggregator {
+ @Test
+ public void testSiddhi() throws Exception{
+ String ql = "define stream s (host string, timestamp long, metric string, site string, value double);" +
+ " @info(name='query') " +
+ " from s[metric == \"missingblocks\"]#window.externalTimeBatch(timestamp, 1 min, 0) select host, count(value) as avg group by host insert into tmp; ";
+ SiddhiManager sm = new SiddhiManager();
+ ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(ql);
+
+ InputHandler input = runtime.getInputHandler("s");
+
+ AtomicInteger index = new AtomicInteger(0);
+
+ runtime.addCallback("query", new QueryCallback() {
+ @Override
+ public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+ printEvents(inEvents);
+ if(index.get() == 0){
+ Assert.assertEquals(3, inEvents.length);
+ Assert.assertEquals("host1", inEvents[0].getData()[0]);
+ Assert.assertEquals(3L, inEvents[0].getData()[1]);
+ Assert.assertEquals("host2", inEvents[1].getData()[0]);
+ Assert.assertEquals(4L, inEvents[1].getData()[1]);
+ Assert.assertEquals("host3", inEvents[2].getData()[0]);
+ Assert.assertEquals(2L, inEvents[2].getData()[1]);
+ index.incrementAndGet();
+ }else if(index.get() == 1){
+ Assert.assertEquals(3, inEvents.length);
+ Assert.assertEquals("host1", inEvents[0].getData()[0]);
+ Assert.assertEquals(1L, inEvents[0].getData()[1]);
+ Assert.assertEquals("host2", inEvents[1].getData()[0]);
+ Assert.assertEquals(2L, inEvents[1].getData()[1]);
+ Assert.assertEquals("host3", inEvents[2].getData()[0]);
+ Assert.assertEquals(2L, inEvents[2].getData()[1]);
+ index.incrementAndGet();
+ }
+ }
+ });
+ runtime.start();
+
+ sendEvents(3, 4, 2, input, 1000L);
+ Thread.sleep(1000);
+ sendEvents(1, 2, 2, input, 61000L);
+ sendEvents(3, 10, 7, input, 121000L);
+ runtime.shutdown();
+ sm.shutdown();
+ Thread.sleep(1000);
+ }
+
+ void sendEvents(int countHost1, int countHost2, int countHost3, InputHandler input, long startTime) throws Exception{
+ for(int i=0; i<countHost1; i++){
+ Event e = createEvent("host1", startTime + i*100);
+ input.send(e);
+ }
+ startTime += 2000;
+ for(int i=0; i<countHost2; i++){
+ Event e = createEvent("host2", startTime + i*100);
+ input.send(e);
+ }
+ startTime += 4000;
+ for(int i=0; i<countHost3; i++){
+ Event e = createEvent("host3", startTime + i*100);
+ input.send(e);
+ }
+ }
+
+ void printEvents(Event[] inEvents){
+ for(Event e : inEvents) {
+ System.out.print(e);
+ System.out.print(",");
+ }
+ System.out.println();
+ }
+ Event createEvent(String host, long timestamp){
+ Event e = new Event();
+ e.setTimestamp(timestamp);
+ e.setData(new Object[]{host, timestamp, "missingblocks", "site1", 14.0});
+ return e;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/agg/TestSiddhiAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/agg/TestSiddhiAggregator.java b/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/agg/TestSiddhiAggregator.java
new file mode 100644
index 0000000..75b50ea
--- /dev/null
+++ b/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/agg/TestSiddhiAggregator.java
@@ -0,0 +1,93 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * * <p/>
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * * <p/>
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.common.agg;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Since 8/4/16.
+ */
+public class TestSiddhiAggregator {
+ @Test
+ public void test() throws Exception{
+ TimeBatchWindowSpec spec = new TimeBatchWindowSpec();
+ Agg agg = new Agg();
+ agg.field = "value";
+ agg.function = "avg";
+ agg.alias = "avg";
+ spec.aggs = Arrays.asList(agg);
+ spec.filter = "metric==\"missingblocks\"";
+ Groupby gb = new Groupby();
+ gb.cols = Arrays.asList("host");
+ spec.groupby = gb;
+ spec.start = 0L;
+ spec.timestampColumn = "timestamp";
+ spec.windowDuration = "1 min";
+
+ StreamDefinition sd = new StreamDefinition();
+ List<Column> columns = new ArrayList<>();
+ sd.columns = columns;
+ Column host = new Column();
+ host.name = "host";
+ host.type = "string";
+ columns.add(host);
+ Column timestamp = new Column();
+ timestamp.name = "timestamp";
+ timestamp.type = "long";
+ columns.add(timestamp);
+ Column metric = new Column();
+ metric.name = "metric";
+ metric.type = "string";
+ columns.add(metric);
+ Column site = new Column();
+ site.name = "site";
+ site.type = "string";
+ columns.add(site);
+ Column value = new Column();
+ value.name = "value";
+ value.type = "double";
+ columns.add(value);
+
+ SiddhiAggregator aggregator = new SiddhiAggregator(spec, sd, new AggregateHandler() {
+ @Override
+ public void onAggregate(List<AggregateResult> result) {
+ System.out.println(result);
+ }
+ });
+
+ aggregator.add(new Object[]{"host1", 1000L, "missingblocks", "site1", 10.0});
+ aggregator.add(new Object[]{"host2", 2000L, "missingblocks", "site1", 16.0});
+ aggregator.add(new Object[]{"host3", 2000L, "missingblocks", "site1", 11.0});
+ aggregator.add(new Object[]{"host1", 21000L, "missingblocks", "site1", 20.0});
+
+ aggregator.add(new Object[]{"host1", 61000L, "missingblocks", "site1", 14.0});
+ aggregator.add(new Object[]{"host2", 61500L, "missingblocks", "site1", 14.0});
+ aggregator.add(new Object[]{"host3", 62000L, "missingblocks", "site1", 13.0});
+ aggregator.add(new Object[]{"host2", 63500L, "missingblocks", "site1", 19.0});
+
+ aggregator.add(new Object[]{"host1", 121000L, "missingblocks", "site1", 14.0});
+ aggregator.add(new Object[]{"host2", 121000L, "missingblocks", "site1", 14.0});
+ aggregator.add(new Object[]{"host3", 122000L, "missingblocks", "site1", 13.0});
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/agg/TestSiddhiExternalTimeBatch.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/agg/TestSiddhiExternalTimeBatch.java b/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/agg/TestSiddhiExternalTimeBatch.java
new file mode 100644
index 0000000..e59c0c4
--- /dev/null
+++ b/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/agg/TestSiddhiExternalTimeBatch.java
@@ -0,0 +1,114 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * * <p/>
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * * <p/>
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+package org.apache.eagle.common.agg;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.query.output.callback.QueryCallback;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Since 8/3/16.
+ */
+public class TestSiddhiExternalTimeBatch {
+ @Test
+ public void testSiddhi() throws Exception{
+ String ql = "define stream s (host string, timestamp long, metric string, site string, value double);" +
+ " @info(name='query') " +
+ " from s[metric == \"missingblocks\"]#window.externalTimeBatch(timestamp, 1 min, 0) select host, count(value) as avg group by host insert into tmp; ";
+ System.out.println("query: " + ql);
+ SiddhiManager sm = new SiddhiManager();
+ ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(ql);
+
+ InputHandler input = runtime.getInputHandler("s");
+
+ AtomicInteger index = new AtomicInteger(0);
+
+ runtime.addCallback("query", new QueryCallback() {
+ @Override
+ public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+ printEvents(inEvents);
+ if(index.get() == 0){
+ Assert.assertEquals(3, inEvents.length);
+ Assert.assertEquals("host1", inEvents[0].getData()[0]);
+ Assert.assertEquals(3L, inEvents[0].getData()[1]);
+ Assert.assertEquals("host2", inEvents[1].getData()[0]);
+ Assert.assertEquals(4L, inEvents[1].getData()[1]);
+ Assert.assertEquals("host3", inEvents[2].getData()[0]);
+ Assert.assertEquals(2L, inEvents[2].getData()[1]);
+ index.incrementAndGet();
+ }else if(index.get() == 1){
+ Assert.assertEquals(3, inEvents.length);
+ Assert.assertEquals("host1", inEvents[0].getData()[0]);
+ Assert.assertEquals(1L, inEvents[0].getData()[1]);
+ Assert.assertEquals("host2", inEvents[1].getData()[0]);
+ Assert.assertEquals(2L, inEvents[1].getData()[1]);
+ Assert.assertEquals("host3", inEvents[2].getData()[0]);
+ Assert.assertEquals(2L, inEvents[2].getData()[1]);
+ index.incrementAndGet();
+ }
+ }
+ });
+ runtime.start();
+
+ sendEvents(3, 4, 2, input, 1000L);
+ Thread.sleep(1000);
+ sendEvents(1, 2, 2, input, 61000L);
+ sendEvents(3, 10, 7, input, 121000L);
+ runtime.shutdown();
+ sm.shutdown();
+ Thread.sleep(1000);
+ }
+
+ void sendEvents(int countHost1, int countHost2, int countHost3, InputHandler input, long startTime) throws Exception{
+ for(int i=0; i<countHost1; i++){
+ Event e = createEvent("host1", startTime + i*100);
+ input.send(e);
+ }
+ startTime += 2000;
+ for(int i=0; i<countHost2; i++){
+ Event e = createEvent("host2", startTime + i*100);
+ input.send(e);
+ }
+ startTime += 4000;
+ for(int i=0; i<countHost3; i++){
+ Event e = createEvent("host3", startTime + i*100);
+ input.send(e);
+ }
+ }
+
+ void printEvents(Event[] inEvents){
+ for(Event e : inEvents) {
+ System.out.print(e);
+ System.out.print(",");
+ }
+ System.out.println();
+ }
+ Event createEvent(String host, long timestamp){
+ Event e = new Event();
+ e.setTimestamp(timestamp);
+ e.setData(new Object[]{host, timestamp, "missingblocks", "site1", 14.0});
+ return e;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/agg/TestStreamAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/agg/TestStreamAggregator.java b/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/agg/TestStreamAggregator.java
new file mode 100644
index 0000000..76b2264
--- /dev/null
+++ b/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/agg/TestStreamAggregator.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * * <p/>
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * * <p/>
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.common.agg;
+
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * Since 8/4/16.
+ */
+public class TestStreamAggregator {
+ @Test
+ public void test() throws Exception{
+ SiddhiAggregator aggregator = StreamAggregator.builder()
+ .columnDef("host", "string")
+ .columnDef("timestamp", "long")
+ .columnDef("metric", "string")
+ .columnDef("site", "string")
+ .columnDef("value", "double")
+ .filter("metric==\"missingblocks\"")
+ .groupby("host")
+ .agg("avg", "value", "avg")
+ .timeColumn("timestamp")
+ .window("1 min", 0)
+ .aggregateHandler(new AggregateHandler() {
+ @Override
+ public void onAggregate(List<AggregateResult> result) {
+ System.out.println(result);
+ }
+ })
+ .build();
+
+ aggregator.add(new Object[]{"host1", 1000L, "missingblocks", "site1", 10.0});
+ aggregator.add(new Object[]{"host2", 2000L, "missingblocks", "site1", 16.0});
+ aggregator.add(new Object[]{"host3", 2000L, "missingblocks", "site1", 11.0});
+ aggregator.add(new Object[]{"host1", 21000L, "missingblocks", "site1", 20.0});
+
+ aggregator.add(new Object[]{"host1", 61000L, "missingblocks", "site1", 14.0});
+ aggregator.add(new Object[]{"host2", 61500L, "missingblocks", "site1", 14.0});
+ aggregator.add(new Object[]{"host3", 62000L, "missingblocks", "site1", 13.0});
+ aggregator.add(new Object[]{"host2", 63500L, "missingblocks", "site1", 19.0});
+
+ aggregator.add(new Object[]{"host1", 121000L, "missingblocks", "site1", 14.0});
+ aggregator.add(new Object[]{"host2", 121000L, "missingblocks", "site1", 14.0});
+ aggregator.add(new Object[]{"host3", 122000L, "missingblocks", "site1", 13.0});
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/extension/ExternalTimeBatchWindowProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/extension/ExternalTimeBatchWindowProcessor.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/extension/ExternalTimeBatchWindowProcessor.java
deleted file mode 100644
index c70d1a1..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/extension/ExternalTimeBatchWindowProcessor.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy.siddhi.extension;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.config.ExecutionPlanContext;
-import org.wso2.siddhi.core.event.ComplexEvent;
-import org.wso2.siddhi.core.event.ComplexEventChunk;
-import org.wso2.siddhi.core.event.MetaComplexEvent;
-import org.wso2.siddhi.core.event.stream.StreamEvent;
-import org.wso2.siddhi.core.event.stream.StreamEventCloner;
-import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
-import org.wso2.siddhi.core.executor.ExpressionExecutor;
-import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
-import org.wso2.siddhi.core.query.processor.Processor;
-import org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor;
-import org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor;
-import org.wso2.siddhi.core.table.EventTable;
-import org.wso2.siddhi.core.util.collection.operator.Finder;
-import org.wso2.siddhi.core.util.parser.CollectionOperatorParser;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
-import org.wso2.siddhi.query.api.expression.Expression;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * @since Dec 23, 2015
- *
- */
-
-public class ExternalTimeBatchWindowProcessor extends WindowProcessor implements FindableProcessor {
-
- private long timeToKeep;
-
- private ComplexEventChunk<StreamEvent> currentEventChunk = new ComplexEventChunk<StreamEvent>();
- private ComplexEventChunk<StreamEvent> expiredEventChunk = new ComplexEventChunk<StreamEvent>();
-
- static final Logger log = LoggerFactory.getLogger(ExternalTimeBatchWindowProcessor.class);
- private VariableExpressionExecutor timeStampVariableExpressionExecutor;
-
- private long lastSendTime = -1;
-
- @Override
- protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
- this.expiredEventChunk = new ComplexEventChunk<StreamEvent>();
- if (attributeExpressionExecutors.length == 2) {
- if (attributeExpressionExecutors[1].getReturnType() == Attribute.Type.INT) {
- timeToKeep = Integer.parseInt(String.valueOf(((ConstantExpressionExecutor) attributeExpressionExecutors[1]).getValue()));
- } else {
- timeToKeep = Long.parseLong(String.valueOf(((ConstantExpressionExecutor) attributeExpressionExecutors[1]).getValue()));
- }
- if (!(attributeExpressionExecutors[0] instanceof VariableExpressionExecutor)) {
- throw new ExecutionPlanValidationException("ExternalTime window's 1st parameter timeStamp should be a type long stream attribute but found " + attributeExpressionExecutors[0].getClass());
- }
- timeStampVariableExpressionExecutor = ((VariableExpressionExecutor) attributeExpressionExecutors[0]);
- if (timeStampVariableExpressionExecutor.getReturnType() != Attribute.Type.LONG) {
- throw new ExecutionPlanValidationException("ExternalTime window's 1st parameter timeStamp should be type long, but found " + timeStampVariableExpressionExecutor.getReturnType());
- }
- } else {
- throw new ExecutionPlanValidationException("ExternalTime window should only have two parameter (<long> timeStamp, <int|long|time> windowTime), but found " + attributeExpressionExecutors.length + " input attributes");
- }
- }
-
- /**
- * Here an assumption is taken:
- * Parameter: timestamp: The time which the window determines as current time and will act upon,
- * the value of this parameter should be monotonically increasing.
- * from https://docs.wso2.com/display/CEP400/Inbuilt+Windows#InbuiltWindows-externalTime
- *
- */
- @Override
- protected synchronized void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner) {
- // event incoming trigger process. No events means no action
- if (!streamEventChunk.hasNext()) {
- return;
- }
-
- // for window beginning, if window is empty, set lastSendTime to incomingChunk first.
- if (currentEventChunk.getFirst() == null && lastSendTime < 0) {
- lastSendTime = (Long) streamEventChunk.getFirst().getAttribute(timeStampVariableExpressionExecutor.getPosition());
- }
-
- while(streamEventChunk.hasNext()) {
- StreamEvent currStreamEvent = streamEventChunk.next();
- if (currStreamEvent.getType() != ComplexEvent.Type.CURRENT) {
- continue;
- }
-
- long currentTime = (Long) currStreamEvent.getAttribute(timeStampVariableExpressionExecutor.getPosition());
- if (currentTime < lastSendTime + timeToKeep) {
- cloneAppend(streamEventCloner, currStreamEvent);
- } else if (currentTime >= lastSendTime + timeToKeep) {
- flushCurentChunk(nextProcessor, streamEventCloner, currentTime);
- cloneAppend(streamEventCloner, currStreamEvent);
- }
- }
- }
-
- private void cloneAppend(StreamEventCloner streamEventCloner, StreamEvent currStreamEvent) {
- StreamEvent clonedStreamEvent = streamEventCloner.copyStreamEvent(currStreamEvent);
- currentEventChunk.add(clonedStreamEvent);
- }
-
- private void flushCurentChunk(Processor nextProcessor, StreamEventCloner streamEventCloner, long currentTime) {
- // need flush the currentEventChunk
- currentEventChunk.reset();
- ComplexEventChunk<StreamEvent> newEventChunk = new ComplexEventChunk<StreamEvent>();
-
- // mark the timestamp for the expiredType event
- while (expiredEventChunk.hasNext()) {
- StreamEvent expiredEvent = expiredEventChunk.next();
- expiredEvent.setTimestamp(currentTime);
- }
- // add expired event to newEventChunk too.
- if (expiredEventChunk.getFirst() != null) {
- newEventChunk.add(expiredEventChunk.getFirst());
- }
-
- // make current event chunk as expired in expiredChunk
- expiredEventChunk.clear();
- while (currentEventChunk.hasNext()) {
- StreamEvent currentEvent = currentEventChunk.next();
- StreamEvent toExpireEvent = streamEventCloner.copyStreamEvent(currentEvent);
- toExpireEvent.setType(StreamEvent.Type.EXPIRED);
- expiredEventChunk.add(toExpireEvent);
- }
-
- // add current event chunk to next processor
- if (currentEventChunk.getFirst() != null) {
- newEventChunk.add(currentEventChunk.getFirst());
- }
- currentEventChunk.clear();
-
- // update timestamp, call next processor
- lastSendTime = currentTime;
- if (newEventChunk.getFirst() != null) {
- nextProcessor.process(newEventChunk);
- }
- }
-
- public void start() {
- //Do nothing
- }
-
- public void stop() {
- //Do nothing
- }
-
- public Object[] currentState() {
- return new Object[]{currentEventChunk, expiredEventChunk};
- }
-
- @SuppressWarnings("unchecked")
- public void restoreState(Object[] state) {
- currentEventChunk = (ComplexEventChunk<StreamEvent>) state[0];
- expiredEventChunk = (ComplexEventChunk<StreamEvent>) state[1];
- }
-
- public synchronized StreamEvent find(ComplexEvent matchingEvent, Finder finder) {
- return finder.find(matchingEvent, expiredEventChunk, streamEventCloner);
- }
-
- public Finder constructFinder(Expression expression, MetaComplexEvent metaComplexEvent, ExecutionPlanContext executionPlanContext, List<VariableExpressionExecutor> variableExpressionExecutors, Map<String, EventTable> eventTableMap, int matchingStreamIndex, long withinTime) {
- return CollectionOperatorParser.parse(expression, metaComplexEvent, executionPlanContext, variableExpressionExecutors, eventTableMap, matchingStreamIndex, inputDefinition, withinTime);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/eagle.siddhiext
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/eagle.siddhiext b/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/eagle.siddhiext
index 2671c31..cce3aca 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/eagle.siddhiext
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/eagle.siddhiext
@@ -14,5 +14,3 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
-externalTimeBatch=org.apache.eagle.policy.siddhi.extension.ExternalTimeBatchWindowProcessor
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-hadoop-metric/src/test/java/org/apache/eagle/hadoop/metric/TestHadoopMetricSiddhiQL.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/test/java/org/apache/eagle/hadoop/metric/TestHadoopMetricSiddhiQL.java b/eagle-hadoop-metric/src/test/java/org/apache/eagle/hadoop/metric/TestHadoopMetricSiddhiQL.java
index 1a18655..c8096d6 100644
--- a/eagle-hadoop-metric/src/test/java/org/apache/eagle/hadoop/metric/TestHadoopMetricSiddhiQL.java
+++ b/eagle-hadoop-metric/src/test/java/org/apache/eagle/hadoop/metric/TestHadoopMetricSiddhiQL.java
@@ -117,7 +117,7 @@ public class TestHadoopMetricSiddhiQL {
latch.await(10, TimeUnit.SECONDS);
Thread.sleep(3000);
- System.out.println(count.get());
+ System.out.println("callback count=" + count.get());
if (eventHappenCount >= 0) {
Assert.assertEquals(eventHappenCount, count.get());
} else {
@@ -310,4 +310,45 @@ public class TestHadoopMetricSiddhiQL {
return events;
}
+ @Test
+ public void testNoActiveNamenodeFor3Times() throws Exception {
+ String sql = " define stream s (host string, timestamp long, metric string, component string, site string, value double); " +
+ " @info(name='query') " +
+ " from s[metric == \"hadoop.namenode.hastate.active.count\"]#window.length(3) select metric, host, value, timestamp, component, site, avg(convert(value, \"long\")) as avgValue, count() as cnt having avgValue==0 and cnt==3 insert into tmp;";
+// " from s[metric == \"hadoop.namenode.hastate.active.count\"]#window.length(3) select metric, host, value, timestamp, component, site, min(convert(value, \"long\")) as minValue, max(convert(value, \"long\")) as maxValue, count() as cnt having minValue==0 and maxValue==0 and cnt==3 insert into tmp;";
+
+ System.out.println(sql);
+
+ testQL(sql, generateMBEvents_times_0(1), 0);
+ testQL(sql, generateMBEvents_times_0(2), 0);
+ testQL(sql, generateMBEvents_times_0(3), 1);
+ }
+
+ private List<Event> generateMBEvents_times_0(int times_0) {
+ List<Event> events = new LinkedList<>();
+
+ long base1 = System.currentTimeMillis();
+ double[] values = new double[3];
+ if(times_0 == 1){
+ values[0] = 1.0;
+ values[1] = 0.0;
+ values[2] = 1.0;
+ }else if(times_0 == 2){
+ values[0] = 1.0;
+ values[1] = 0.0;
+ values[2] = 0.0;
+ }else if(times_0 == 3){
+ values[0] = 0.0;
+ values[1] = 0.0;
+ values[2] = 0.0;
+ }
+ for(int i=0; i<3; i++) {
+ // master / slave in sync
+ base1 = base1 + 1000;
+ Event e = new Event();
+ e.setData(new Object[]{"a", base1, "hadoop.namenode.hastate.active.count", "namenode", "sandbox", values[i]});
+ events.add(e);
+ }
+ return events;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-security/eagle-security-hbase-auditlog/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/pom.xml b/eagle-security/eagle-security-hbase-auditlog/pom.xml
index a8b82c7..46d67b5 100644
--- a/eagle-security/eagle-security-hbase-auditlog/pom.xml
+++ b/eagle-security/eagle-security-hbase-auditlog/pom.xml
@@ -39,5 +39,10 @@
<artifactId>eagle-stream-application-manager</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-app-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
new file mode 100644
index 0000000..49393e1
--- /dev/null
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.security.hbase;
+
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.security.topo.NewKafkaSourcedSpoutProvider;
+import storm.kafka.StringScheme;
+import storm.kafka.bolt.KafkaBolt;
+
+/**
+ * Since 7/27/16.
+ */
+public class HBaseAuditLogApplication{
+ public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
+ public final static String PARSER_TASK_NUM = "topology.numOfParserTasks";
+ public final static String JOIN_TASK_NUM = "topology.numOfJoinTasks";
+ public final static String SINK_TASK_NUM = "topology.numOfSinkTasks";
+ protected void buildApp(TopologyBuilder builder) {
+ System.setProperty("config.resource", "/application.conf");
+ Config config = ConfigFactory.load();
+ NewKafkaSourcedSpoutProvider provider = new NewKafkaSourcedSpoutProvider();
+ IRichSpout spout = provider.getSpout(config);
+
+ HBaseAuditLogParserBolt bolt = new HBaseAuditLogParserBolt();
+
+ int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
+ int numOfParserTasks = config.getInt(PARSER_TASK_NUM);
+ int numOfJoinTasks = config.getInt(JOIN_TASK_NUM);
+ int numOfSinkTasks = config.getInt(SINK_TASK_NUM);
+
+ builder.setSpout("ingest", spout, numOfSpoutTasks);
+ BoltDeclarer boltDeclarer = builder.setBolt("parserBolt", bolt, numOfParserTasks);
+ boltDeclarer.fieldsGrouping("ingest", new Fields(StringScheme.STRING_SCHEME_KEY));
+
+ HbaseResourceSensitivityDataJoinBolt joinBolt = new HbaseResourceSensitivityDataJoinBolt(config);
+ BoltDeclarer joinBoltDeclarer = builder.setBolt("joinBolt", joinBolt, numOfJoinTasks);
+ joinBoltDeclarer.fieldsGrouping("parserBolt", new Fields("f1"));
+
+ KafkaBolt kafkaBolt = new KafkaBolt();
+ BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", kafkaBolt, numOfSinkTasks);
+ kafkaBoltDeclarer.shuffleGrouping("joinBolt");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-security/eagle-security-hbase-auditlog/src/main/resources/metadata.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/resources/metadata.xml b/eagle-security/eagle-security-hbase-auditlog/src/main/resources/metadata.xml
new file mode 100644
index 0000000..4b48c0a
--- /dev/null
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/resources/metadata.xml
@@ -0,0 +1,91 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<application>
+ <type>HBaseAuditLogApplication</type>
+ <name>HBase Audit Log Monitoring Application</name>
+ <version>0.5.0-incubating</version>
+ <appClass>org.apache.eagle.security.hbase.HBaseAuditLogApplication</appClass>
+ <viewPath>/apps/example</viewPath>
+ <configuration>
+ <property>
+ <name>message</name>
+ <displayName>Message</displayName>
+ <value>Hello, example application!</value>
+ <description>Just an sample configuration property</description>
+ </property>
+ </configuration>
+ <streams>
+ <stream>
+ <streamId>hbase_audit_log_stream</streamId>
+ <description>HBase Audit Log Stream</description>
+ <validate>true</validate>
+ <timeseries>true</timeseries>
+ <columns>
+ <column>
+ <name>action</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>host</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>status</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>timestamp</name>
+ <type>long</type>
+ </column>
+ </columns>
+ </stream>
+ </streams>
+ <docs>
+ <install>
+# Step 1: Create source kafka topic named "${site}_example_source_topic"
+
+./bin/kafka-topics.sh --create --topic example_source_topic --replication-factor 1 --replication 1
+
+# Step 2: Set up data collector to flow data into kafka topic in
+
+./bin/logstash -f log_collector.conf
+
+## `log_collector.conf` sample as following:
+
+input {
+
+}
+filter {
+
+}
+output{
+
+}
+
+# Step 3: start application
+
+# Step 4: monitor with featured portal or alert with policies
+ </install>
+ <uninstall>
+# Step 1: stop and uninstall application
+# Step 2: delete kafka topic named "${site}_example_source_topic"
+# Step 3: stop logstash
+ </uninstall>
+ </docs>
+</application>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/eagle-server/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-server/pom.xml b/eagle-server/pom.xml
index b2fb0d2..6617693 100644
--- a/eagle-server/pom.xml
+++ b/eagle-server/pom.xml
@@ -146,4 +146,4 @@
</resource>
</resources>
</build>
-</project>
\ No newline at end of file
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1d842563/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ccae27f..d103c88 100755
--- a/pom.xml
+++ b/pom.xml
@@ -273,7 +273,7 @@
<javax.mail.version>1.4</javax.mail.version>
<extcos4.version>0.4b</extcos4.version>
<extcos3.version>0.3b</extcos3.version>
- <siddhi.version>3.0.5</siddhi.version>
+ <siddhi.version>3.1.1</siddhi.version>
<!-- Testing -->
<junit.version>4.12</junit.version>