You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/12/05 09:38:32 UTC

incubator-eagle git commit: [EAGLE-818] implement CEPFunction

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 1c81c0865 -> f899ca1c2


[EAGLE-818] implement CEPFunction

https://issues.apache.org/jira/browse/EAGLE-818

Author: Qingwen Zhao <qi...@gmail.com>
Author: Zhao, Qingwen <qi...@apache.org>

Closes #709 from qingwen220/EAGLE-818.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/f899ca1c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/f899ca1c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/f899ca1c

Branch: refs/heads/master
Commit: f899ca1c29533f571e1d2abcfa93e164519937c6
Parents: 1c81c08
Author: Qingwen Zhao <qi...@gmail.com>
Authored: Mon Dec 5 17:38:24 2016 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Mon Dec 5 17:38:24 2016 +0800

----------------------------------------------------------------------
 .../app/environment/builder/CEPFunction.java    | 69 ++++++++++++++++++--
 .../eagle/app/stream/CEPFunctionTest.java       | 20 ++++--
 2 files changed, 78 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f899ca1c/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CEPFunction.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CEPFunction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CEPFunction.java
index dd3b214..1c85b58 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CEPFunction.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CEPFunction.java
@@ -16,13 +16,27 @@
  */
 package org.apache.eagle.app.environment.builder;
 
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.core.stream.output.StreamCallback;
+
+import java.util.HashMap;
 import java.util.Map;
 
-/**
- * TODO: Not implemented yet.
- */
+import static org.apache.eagle.alert.engine.evaluator.impl.SiddhiDefinitionAdapter.convertFromSiddiDefinition;
+
 public class CEPFunction implements TransformFunction {
 
+    private static final Logger LOG = LoggerFactory.getLogger(CEPFunction.class);
+
+    private ExecutionPlanRuntime runtime;
+    private SiddhiManager siddhiManager;
     private final CEPDefinition cepDefinition;
     private Collector collector;
 
@@ -41,17 +55,60 @@ public class CEPFunction implements TransformFunction {
 
     @Override
     public void open(Collector collector) {
-        throw new IllegalStateException("TODO: Not implemented yet");
+        this.collector = collector;
+        this.siddhiManager = new SiddhiManager();
+        this.runtime = siddhiManager.createExecutionPlanRuntime(cepDefinition.getSiddhiQuery());
+        if (runtime.getStreamDefinitionMap().containsKey(cepDefinition.outputStreamId)) {
+            runtime.addCallback(cepDefinition.outputStreamId, new StreamCallback() {
+                @Override
+                public void receive(Event[] events) {
+                    for (Event e : events) {
+                        StreamDefinition schema = convertFromSiddiDefinition(runtime.getStreamDefinitionMap().get(cepDefinition.outputStreamId));
+                        Map<String, Object> event = new HashMap<>();
+                        for (StreamColumn column : schema.getColumns()) {
+                            Object obj = e.getData()[schema.getColumnIndex(column.getName())];
+                            if (obj == null) {
+                                event.put(column.getName(), null);
+                                continue;
+                            }
+                            event.put(column.getName(), obj);
+                        }
+                        collector.collect(event.toString(), event);
+                    }
+                }
+            });
+        } else {
+            throw new IllegalStateException("Undefined output stream " + cepDefinition.outputStreamId);
+        }
+        runtime.start();
     }
 
     @Override
     public void transform(Map event) {
-        throw new IllegalStateException("TODO: Not implemented yet");
+        String streamId = cepDefinition.getInputStreamId();
+        InputHandler inputHandler = runtime.getInputHandler(streamId);
+
+        if (inputHandler != null) {
+            try {
+                inputHandler.send(event.values().toArray());
+            } catch (InterruptedException e) {
+                LOG.error(e.getMessage(), e);
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("sent event to siddhi stream {} ", streamId);
+            }
+        } else {
+            LOG.warn("No input handler found for stream {}", streamId);
+        }
     }
 
     @Override
     public void close() {
-        throw new IllegalStateException("TODO: Not implemented yet");
+        LOG.info("Closing handler for query {}", this.cepDefinition.getSiddhiQuery());
+        this.runtime.shutdown();
+        LOG.info("Shutdown siddhi runtime {}", this.runtime.getName());
+        this.siddhiManager.shutdown();
+        LOG.info("Shutdown siddhi manager {}", this.siddhiManager);
     }
 
     public static class CEPDefinition {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f899ca1c/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java
index b1a4193..a8613df 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java
@@ -18,18 +18,21 @@ package org.apache.eagle.app.stream;
 
 import org.apache.eagle.app.environment.builder.CEPFunction;
 import org.apache.eagle.app.environment.builder.Collector;
-import org.junit.Ignore;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 public class CEPFunctionTest {
-    @Test @Ignore("TODO: Not implement yet")
-    public void testSiddhiFunction() {
+    @Test
+    public void testSiddhiFunction() throws InterruptedException {
+        Semaphore semaphore = new Semaphore(0);
         CEPFunction function = new CEPFunction(
             "define stream inputStream (name string, value double);\n "
-                + "from inputStream#window.timeBatch( 1 min ) \n" +
+                + "from inputStream#window.timeBatch( 5 sec ) \n" +
                 "select name, avg(value) as avgValue\n" +
                 "group by name \n" +
                 "insert into outputStream ",
@@ -37,7 +40,9 @@ public class CEPFunctionTest {
         Collector collector = new Collector() {
             @Override
             public void collect(Object key, Map event) {
-
+                Assert.assertTrue(event.get("avgValue") instanceof Double);
+                Assert.assertTrue(Double.valueOf(event.get("avgValue").toString()) == 0.97);
+                semaphore.release();
             }
         };
         function.open(collector);
@@ -45,6 +50,11 @@ public class CEPFunctionTest {
             put("name","cpu.usage");
             put("value", 0.98);
         }});
+        function.transform(new HashMap<String,Object>() {{
+            put("name","cpu.usage");
+            put("value", 0.96);
+        }});
+        Assert.assertTrue("Should get result in 5 s", semaphore.tryAcquire(5, TimeUnit.SECONDS));
         function.close();
     }
 }