You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/12/05 09:38:32 UTC
incubator-eagle git commit: [EAGLE-818] implement CEPFunction
Repository: incubator-eagle
Updated Branches:
refs/heads/master 1c81c0865 -> f899ca1c2
[EAGLE-818] implement CEPFunction
https://issues.apache.org/jira/browse/EAGLE-818
Author: Qingwen Zhao <qi...@gmail.com>
Author: Zhao, Qingwen <qi...@apache.org>
Closes #709 from qingwen220/EAGLE-818.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/f899ca1c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/f899ca1c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/f899ca1c
Branch: refs/heads/master
Commit: f899ca1c29533f571e1d2abcfa93e164519937c6
Parents: 1c81c08
Author: Qingwen Zhao <qi...@gmail.com>
Authored: Mon Dec 5 17:38:24 2016 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Mon Dec 5 17:38:24 2016 +0800
----------------------------------------------------------------------
.../app/environment/builder/CEPFunction.java | 69 ++++++++++++++++++--
.../eagle/app/stream/CEPFunctionTest.java | 20 ++++--
2 files changed, 78 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f899ca1c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CEPFunction.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CEPFunction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CEPFunction.java
index dd3b214..1c85b58 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CEPFunction.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CEPFunction.java
@@ -16,13 +16,27 @@
*/
package org.apache.eagle.app.environment.builder;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.core.stream.output.StreamCallback;
+
+import java.util.HashMap;
import java.util.Map;
-/**
- * TODO: Not implemented yet.
- */
+import static org.apache.eagle.alert.engine.evaluator.impl.SiddhiDefinitionAdapter.convertFromSiddiDefinition;
+
public class CEPFunction implements TransformFunction {
+ private static final Logger LOG = LoggerFactory.getLogger(CEPFunction.class);
+
+ private ExecutionPlanRuntime runtime;
+ private SiddhiManager siddhiManager;
private final CEPDefinition cepDefinition;
private Collector collector;
@@ -41,17 +55,60 @@ public class CEPFunction implements TransformFunction {
@Override
public void open(Collector collector) {
- throw new IllegalStateException("TODO: Not implemented yet");
+ this.collector = collector;
+ this.siddhiManager = new SiddhiManager();
+ this.runtime = siddhiManager.createExecutionPlanRuntime(cepDefinition.getSiddhiQuery());
+ if (runtime.getStreamDefinitionMap().containsKey(cepDefinition.outputStreamId)) {
+ runtime.addCallback(cepDefinition.outputStreamId, new StreamCallback() {
+ @Override
+ public void receive(Event[] events) {
+ for (Event e : events) {
+ StreamDefinition schema = convertFromSiddiDefinition(runtime.getStreamDefinitionMap().get(cepDefinition.outputStreamId));
+ Map<String, Object> event = new HashMap<>();
+ for (StreamColumn column : schema.getColumns()) {
+ Object obj = e.getData()[schema.getColumnIndex(column.getName())];
+ if (obj == null) {
+ event.put(column.getName(), null);
+ continue;
+ }
+ event.put(column.getName(), obj);
+ }
+ collector.collect(event.toString(), event);
+ }
+ }
+ });
+ } else {
+ throw new IllegalStateException("Undefined output stream " + cepDefinition.outputStreamId);
+ }
+ runtime.start();
}
@Override
public void transform(Map event) {
- throw new IllegalStateException("TODO: Not implemented yet");
+ String streamId = cepDefinition.getInputStreamId();
+ InputHandler inputHandler = runtime.getInputHandler(streamId);
+
+ if (inputHandler != null) {
+ try {
+ inputHandler.send(event.values().toArray());
+ } catch (InterruptedException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("sent event to siddhi stream {} ", streamId);
+ }
+ } else {
+ LOG.warn("No input handler found for stream {}", streamId);
+ }
}
@Override
public void close() {
- throw new IllegalStateException("TODO: Not implemented yet");
+ LOG.info("Closing handler for query {}", this.cepDefinition.getSiddhiQuery());
+ this.runtime.shutdown();
+ LOG.info("Shutdown siddhi runtime {}", this.runtime.getName());
+ this.siddhiManager.shutdown();
+ LOG.info("Shutdown siddhi manager {}", this.siddhiManager);
}
public static class CEPDefinition {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f899ca1c/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java
index b1a4193..a8613df 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java
@@ -18,18 +18,21 @@ package org.apache.eagle.app.stream;
import org.apache.eagle.app.environment.builder.CEPFunction;
import org.apache.eagle.app.environment.builder.Collector;
-import org.junit.Ignore;
+import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
public class CEPFunctionTest {
- @Test @Ignore("TODO: Not implement yet")
- public void testSiddhiFunction() {
+ @Test
+ public void testSiddhiFunction() throws InterruptedException {
+ Semaphore semaphore = new Semaphore(0);
CEPFunction function = new CEPFunction(
"define stream inputStream (name string, value double);\n "
- + "from inputStream#window.timeBatch( 1 min ) \n" +
+ + "from inputStream#window.timeBatch( 5 sec ) \n" +
"select name, avg(value) as avgValue\n" +
"group by name \n" +
"insert into outputStream ",
@@ -37,7 +40,9 @@ public class CEPFunctionTest {
Collector collector = new Collector() {
@Override
public void collect(Object key, Map event) {
-
+ Assert.assertTrue(event.get("avgValue") instanceof Double);
+ Assert.assertTrue(Double.valueOf(event.get("avgValue").toString()) == 0.97);
+ semaphore.release();
}
};
function.open(collector);
@@ -45,6 +50,11 @@ public class CEPFunctionTest {
put("name","cpu.usage");
put("value", 0.98);
}});
+ function.transform(new HashMap<String,Object>() {{
+ put("name","cpu.usage");
+ put("value", 0.96);
+ }});
+ Assert.assertTrue("Should get result in 5 s", semaphore.tryAcquire(5, TimeUnit.SECONDS));
function.close();
}
}