You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by re...@apache.org on 2013/11/01 13:58:40 UTC
git commit: adding the cep_artifacts proided back to reflect the
signature of Nirmal
Updated Branches:
refs/heads/master 1c65c79a7 -> e1cecd1a4
adding the cep_artifacts proided back to reflect the signature of Nirmal
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/e1cecd1a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/e1cecd1a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/e1cecd1a
Branch: refs/heads/master
Commit: e1cecd1a4ddefab3c44560a60b3c979f9704a631
Parents: 1c65c79
Author: Nirmal <ni...@apache.org>
Authored: Fri Nov 1 18:28:22 2013 +0530
Committer: Nirmal <ni...@apache.org>
Committed: Fri Nov 1 18:28:22 2013 +0530
----------------------------------------------------------------------
.../InstanceStatisticsEventBuilder.xml | 22 ++
.../LoadBalancerStatisticsEventBuilder.xml | 10 +
.../stratos.instance.stats_1.0.0_builder.xml | 10 +
.../AverageRequestsInflightEventFormatter.xml | 11 +
.../eventformatters/GradientEventFormatter.xml | 11 +
.../SecondDerivativeEventFormatter.xml | 11 +
.../AverageRequestsInflightFinder.xml | 19 ++
.../GradientOfRequestsInFlightFinder.xml | 18 ++
...SecondDerivativeOfRequestsInFlightFinder.xml | 19 ++
.../DefaultWSO2EventInputAdaptor.xml | 3 +
.../InstanceStatsInputAdaptor.xml | 9 +
.../DefaultWSO2EventOutputAdaptor.xml | 8 +
.../outputeventadaptors/JMSOutputAdaptor.xml | 8 +
.../stratos-cep-extensions/pom.xml | 57 ++++
.../GradientFinderWindowProcessor.java | 268 +++++++++++++++++
.../SecondDerivativeFinderWindowProcessor.java | 286 +++++++++++++++++++
16 files changed, 770 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/eventbuilders/InstanceStatisticsEventBuilder.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/eventbuilders/InstanceStatisticsEventBuilder.xml b/tools/cep_artifacts/eventbuilders/InstanceStatisticsEventBuilder.xml
new file mode 100644
index 0000000..4b83f73
--- /dev/null
+++ b/tools/cep_artifacts/eventbuilders/InstanceStatisticsEventBuilder.xml
@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<eventBuilder name="InstanceStatisticsEventBuilder" statistics="disable"
+ trace="disable" xmlns="http://wso2.org/carbon/eventbuilder">
+ <from eventAdaptorName="InstanceStatsInputAdaptor" eventAdaptorType="jms">
+ <property name="transport.jms.Destination">instance-stats</property>
+ </from>
+ <mapping customMapping="disable" type="json">
+ <property>
+ <from jsonPath="$.instance.memberId"/>
+ <to name="memberId" type="string"/>
+ </property>
+ <property>
+ <from jsonPath="$.instance.clusterId"/>
+ <to name="clusterId" type="string"/>
+ </property>
+ <property>
+ <from jsonPath="$.instance.loadAverage"/>
+ <to name="loadAverage" type="double"/>
+ </property>
+ </mapping>
+ <to streamName="stratos.instance.stats" version="1.0.0"/>
+</eventBuilder>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/eventbuilders/LoadBalancerStatisticsEventBuilder.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/eventbuilders/LoadBalancerStatisticsEventBuilder.xml b/tools/cep_artifacts/eventbuilders/LoadBalancerStatisticsEventBuilder.xml
new file mode 100644
index 0000000..bef34f9
--- /dev/null
+++ b/tools/cep_artifacts/eventbuilders/LoadBalancerStatisticsEventBuilder.xml
@@ -0,0 +1,10 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<eventBuilder name="LoadBalancerStatisticsEventBuilder"
+ statistics="disable" trace="enable" xmlns="http://wso2.org/carbon/eventbuilder">
+ <from eventAdaptorName="DefaultWSO2EventInputAdaptor" eventAdaptorType="wso2event">
+ <property name="stream">stratos.lb.stats</property>
+ <property name="version">1.0.0</property>
+ </from>
+ <mapping customMapping="disable" type="wso2event"/>
+ <to streamName="stratos.lb.stats" version="1.0.0"/>
+</eventBuilder>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/eventbuilders/stratos.instance.stats_1.0.0_builder.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/eventbuilders/stratos.instance.stats_1.0.0_builder.xml b/tools/cep_artifacts/eventbuilders/stratos.instance.stats_1.0.0_builder.xml
new file mode 100644
index 0000000..ed6b588
--- /dev/null
+++ b/tools/cep_artifacts/eventbuilders/stratos.instance.stats_1.0.0_builder.xml
@@ -0,0 +1,10 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<eventBuilder name="stratos.instance.stats_1.0.0_builder"
+ statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventbuilder">
+ <from eventAdaptorName="DefaultWSO2EventInputAdaptor" eventAdaptorType="wso2event">
+ <property name="stream">stratos.instance.stats</property>
+ <property name="version">1.0.0</property>
+ </from>
+ <mapping customMapping="disable" type="wso2event"/>
+ <to streamName="stratos.instance.stats" version="1.0.0"/>
+</eventBuilder>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/eventformatters/AverageRequestsInflightEventFormatter.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/eventformatters/AverageRequestsInflightEventFormatter.xml b/tools/cep_artifacts/eventformatters/AverageRequestsInflightEventFormatter.xml
new file mode 100644
index 0000000..19a4f30
--- /dev/null
+++ b/tools/cep_artifacts/eventformatters/AverageRequestsInflightEventFormatter.xml
@@ -0,0 +1,11 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<eventFormatter name="AverageRequestsInflightEventFormatter"
+ statistics="disable" trace="enable" xmlns="http://wso2.org/carbon/eventformatter">
+ <from streamName="aggregated_requests_stats" version="1.0.0"/>
+ <mapping customMapping="enable" type="json">
+ <inline>{"average_requests_in_flight":{"cluster_id":"{{cluster_id}}","value":"{{count}}"}}</inline>
+ </mapping>
+ <to eventAdaptorName="JMSOutputAdaptor" eventAdaptorType="jms">
+ <property name="transport.jms.Destination">summarized-health-stats</property>
+ </to>
+</eventFormatter>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/eventformatters/GradientEventFormatter.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/eventformatters/GradientEventFormatter.xml b/tools/cep_artifacts/eventformatters/GradientEventFormatter.xml
new file mode 100644
index 0000000..e3d9e82
--- /dev/null
+++ b/tools/cep_artifacts/eventformatters/GradientEventFormatter.xml
@@ -0,0 +1,11 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<eventFormatter name="GradientEventFormatter" statistics="disable"
+ trace="disable" xmlns="http://wso2.org/carbon/eventformatter">
+ <from streamName="gradient_stats" version="1.0.0"/>
+ <mapping customMapping="enable" type="json">
+ <inline>{"gradient_of_requests_in_flight":{"cluster_id":"{{cluster_id}}","value":"{{count}}"}}</inline>
+ </mapping>
+ <to eventAdaptorName="JMSOutputAdaptor" eventAdaptorType="jms">
+ <property name="transport.jms.Destination">summarized-health-stats</property>
+ </to>
+</eventFormatter>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/eventformatters/SecondDerivativeEventFormatter.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/eventformatters/SecondDerivativeEventFormatter.xml b/tools/cep_artifacts/eventformatters/SecondDerivativeEventFormatter.xml
new file mode 100644
index 0000000..ddb1cc4
--- /dev/null
+++ b/tools/cep_artifacts/eventformatters/SecondDerivativeEventFormatter.xml
@@ -0,0 +1,11 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<eventFormatter name="SecondDerivativeEventFormatter"
+ statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventformatter">
+ <from streamName="second_derivative_of_requests_stats" version="1.0.0"/>
+ <mapping customMapping="enable" type="json">
+ <inline>{"second_derivative_of_requests_in_flight":{"cluster_id":"{{cluster_id}}","value":"{{count}}"}}</inline>
+ </mapping>
+ <to eventAdaptorName="JMSOutputAdaptor" eventAdaptorType="jms">
+ <property name="transport.jms.Destination">summarized-health-stats</property>
+ </to>
+</eventFormatter>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/executionplans/AverageRequestsInflightFinder.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/executionplans/AverageRequestsInflightFinder.xml b/tools/cep_artifacts/executionplans/AverageRequestsInflightFinder.xml
new file mode 100644
index 0000000..d81d859
--- /dev/null
+++ b/tools/cep_artifacts/executionplans/AverageRequestsInflightFinder.xml
@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<executionPlan name="AverageRequestsInflightFinder" statistics="disable"
+ trace="enable" xmlns="http://wso2.org/carbon/eventprocessor">
+ <description>This will average the number of requests in flight over a minute.</description>
+ <siddhiConfiguration>
+ <property name="siddhi.enable.distributed.processing">false</property>
+ <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
+ </siddhiConfiguration>
+ <importedStreams>
+ <stream as="lbStats1" name="stratos.lb.stats" version="1.0.0"/>
+ </importedStreams>
+ <queryExpressions><![CDATA[define partition lbStats1Partition by cluster_id;
+from lbStats1#window.timeBatch(1 min)
+select cluster_id, avg(in_flight_requests) as count insert into aggregated_requests_stats partition by lbStats1Partition;]]></queryExpressions>
+ <exportedStreams>
+ <stream name="aggregated_requests_stats"
+ valueOf="aggregated_requests_stats" version="1.0.0"/>
+ </exportedStreams>
+</executionPlan>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/executionplans/GradientOfRequestsInFlightFinder.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/executionplans/GradientOfRequestsInFlightFinder.xml b/tools/cep_artifacts/executionplans/GradientOfRequestsInFlightFinder.xml
new file mode 100644
index 0000000..7775de0
--- /dev/null
+++ b/tools/cep_artifacts/executionplans/GradientOfRequestsInFlightFinder.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<executionPlan name="GradientOfRequestsInFlightFinder"
+ statistics="disable" trace="enable" xmlns="http://wso2.org/carbon/eventprocessor">
+ <description>This will find the gradient of the number of requests in flight over a minute.</description>
+ <siddhiConfiguration>
+ <property name="siddhi.enable.distributed.processing">false</property>
+ <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
+ </siddhiConfiguration>
+ <importedStreams>
+ <stream as="lbStats2" name="stratos.lb.stats" version="1.0.0"/>
+ </importedStreams>
+ <queryExpressions><![CDATA[define partition lbStats1Partition by cluster_id;
+from lbStats2#window.stratos:gradient(1 min, in_flight_requests)
+select cluster_id, in_flight_requests as count insert into gradient_stats partition by lbStats1Partition;]]></queryExpressions>
+ <exportedStreams>
+ <stream name="gradient_stats" valueOf="gradient_stats" version="1.0.0"/>
+ </exportedStreams>
+</executionPlan>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/executionplans/SecondDerivativeOfRequestsInFlightFinder.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/executionplans/SecondDerivativeOfRequestsInFlightFinder.xml b/tools/cep_artifacts/executionplans/SecondDerivativeOfRequestsInFlightFinder.xml
new file mode 100644
index 0000000..af2bd0a
--- /dev/null
+++ b/tools/cep_artifacts/executionplans/SecondDerivativeOfRequestsInFlightFinder.xml
@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<executionPlan name="SecondDerivativeOfRequestsInFlightFinder"
+ statistics="disable" trace="enable" xmlns="http://wso2.org/carbon/eventprocessor">
+ <description>This will find the second derivative of the number of requests in flight over a minute.</description>
+ <siddhiConfiguration>
+ <property name="siddhi.enable.distributed.processing">false</property>
+ <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
+ </siddhiConfiguration>
+ <importedStreams>
+ <stream as="lbStats3" name="stratos.lb.stats" version="1.0.0"/>
+ </importedStreams>
+ <queryExpressions><![CDATA[define partition lbStats1Partition by cluster_id;
+from lbStats3#window.stratos:secondDerivative(1 min, in_flight_requests)
+select cluster_id, in_flight_requests as count insert into second_derivative_of_requests_stats partition by lbStats1Partition;]]></queryExpressions>
+ <exportedStreams>
+ <stream name="second_derivative_of_requests_stats"
+ valueOf="second_derivative_of_requests_stats" version="1.0.0"/>
+ </exportedStreams>
+</executionPlan>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/inputeventadaptors/DefaultWSO2EventInputAdaptor.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/inputeventadaptors/DefaultWSO2EventInputAdaptor.xml b/tools/cep_artifacts/inputeventadaptors/DefaultWSO2EventInputAdaptor.xml
new file mode 100644
index 0000000..8cc5e89
--- /dev/null
+++ b/tools/cep_artifacts/inputeventadaptors/DefaultWSO2EventInputAdaptor.xml
@@ -0,0 +1,3 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<inputEventAdaptor name="DefaultWSO2EventInputAdaptor"
+ statistics="disable" trace="enable" type="wso2event" xmlns="http://wso2.org/carbon/eventadaptormanager"/>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/inputeventadaptors/InstanceStatsInputAdaptor.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/inputeventadaptors/InstanceStatsInputAdaptor.xml b/tools/cep_artifacts/inputeventadaptors/InstanceStatsInputAdaptor.xml
new file mode 100644
index 0000000..a687546
--- /dev/null
+++ b/tools/cep_artifacts/inputeventadaptors/InstanceStatsInputAdaptor.xml
@@ -0,0 +1,9 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<inputEventAdaptor name="InstanceStatsInputAdaptor" statistics="disable"
+ trace="disable" type="jms" xmlns="http://wso2.org/carbon/eventadaptormanager">
+ <property name="java.naming.provider.url">/home/ubuntu/packages/cep/wso2cep-3.0.0/repository/conf/jndi.properties</property>
+ <property name="transport.jms.SubscriptionDurable">false</property>
+ <property name="java.naming.factory.initial">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</property>
+ <property name="transport.jms.ConnectionFactoryJNDIName">topicConnectionfactory</property>
+ <property name="transport.jms.DestinationType">topic</property>
+</inputEventAdaptor>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/outputeventadaptors/DefaultWSO2EventOutputAdaptor.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/outputeventadaptors/DefaultWSO2EventOutputAdaptor.xml b/tools/cep_artifacts/outputeventadaptors/DefaultWSO2EventOutputAdaptor.xml
new file mode 100644
index 0000000..59ba20d
--- /dev/null
+++ b/tools/cep_artifacts/outputeventadaptors/DefaultWSO2EventOutputAdaptor.xml
@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<outputEventAdaptor name="DefaultWSO2EventOutputAdaptor"
+ statistics="disable" trace="disable" type="wso2event" xmlns="http://wso2.org/carbon/eventadaptormanager">
+ <property name="username">admin</property>
+ <property name="receiverURL">tcp://localhost:7661</property>
+ <property name="password">admin</property>
+ <property name="authenticatorURL">ssl://localhost:7761</property>
+</outputEventAdaptor>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/outputeventadaptors/JMSOutputAdaptor.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/outputeventadaptors/JMSOutputAdaptor.xml b/tools/cep_artifacts/outputeventadaptors/JMSOutputAdaptor.xml
new file mode 100644
index 0000000..bb4d6d7
--- /dev/null
+++ b/tools/cep_artifacts/outputeventadaptors/JMSOutputAdaptor.xml
@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<outputEventAdaptor name="JMSOutputAdaptor" statistics="disable"
+ trace="enable" type="jms" xmlns="http://wso2.org/carbon/eventadaptormanager">
+ <property name="java.naming.provider.url">/home/ubuntu/packages/cep/wso2cep-3.0.0/repository/conf/jndi.properties</property>
+ <property name="java.naming.factory.initial">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</property>
+ <property name="transport.jms.ConnectionFactoryJNDIName">topicConnectionfactory</property>
+ <property name="transport.jms.DestinationType">topic</property>
+</outputEventAdaptor>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/stratos-cep-extensions/pom.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/stratos-cep-extensions/pom.xml b/tools/cep_artifacts/stratos-cep-extensions/pom.xml
new file mode 100644
index 0000000..e765538
--- /dev/null
+++ b/tools/cep_artifacts/stratos-cep-extensions/pom.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+ # Licensed to the Apache Software Foundation (ASF) under one
+ # or more contributor license agreements. See the NOTICE file
+ # distributed with this work for additional information
+ # regarding copyright ownership. The ASF licenses this file
+ # to you under the Apache License, Version 2.0 (the
+ # "License"); you may not use this file except in compliance
+ # with the License. You may obtain a copy of the License at
+ #
+ # http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing,
+ # software distributed under the License is distributed on an
+ # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ # KIND, either express or implied. See the License for the
+ # specific language governing permissions and limitations
+ # under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.stratos</groupId>
+ <artifactId>org.apache.stratos.cep.extension</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+
+ <repositories>
+ <repository>
+ <id>wso2-maven2-repository</id>
+ <name>WSO2 Maven2 Repository</name>
+ <url>http://dist.wso2.org/maven2</url>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.wso2.siddhi</groupId>
+ <artifactId>siddhi-core</artifactId>
+ <version>2.0.0-wso2v4</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/stratos-cep-extensions/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/stratos-cep-extensions/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java b/tools/cep_artifacts/stratos-cep-extensions/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java
new file mode 100644
index 0000000..6ad1e87
--- /dev/null
+++ b/tools/cep_artifacts/stratos-cep-extensions/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cep.extension;
+
+import org.apache.log4j.Logger;
+import org.wso2.siddhi.core.config.SiddhiContext;
+import org.wso2.siddhi.core.event.StreamEvent;
+import org.wso2.siddhi.core.event.in.InEvent;
+import org.wso2.siddhi.core.event.in.InListEvent;
+import org.wso2.siddhi.core.event.remove.RemoveEvent;
+import org.wso2.siddhi.core.event.remove.RemoveListEvent;
+import org.wso2.siddhi.core.persistence.ThreadBarrier;
+import org.wso2.siddhi.core.query.QueryPostProcessingElement;
+import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor;
+import org.wso2.siddhi.core.query.processor.window.WindowProcessor;
+import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue;
+import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue;
+import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.definition.Attribute.Type;
+import org.wso2.siddhi.query.api.expression.Expression;
+import org.wso2.siddhi.query.api.expression.Variable;
+import org.wso2.siddhi.query.api.expression.constant.IntConstant;
+import org.wso2.siddhi.query.api.expression.constant.LongConstant;
+import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@SiddhiExtension(namespace = "stratos", function = "gradient")
+public class GradientFinderWindowProcessor extends WindowProcessor implements RunnableWindowProcessor {
+
+ static final Logger log = Logger.getLogger(GradientFinderWindowProcessor.class);
+ private ScheduledExecutorService eventRemoverScheduler;
+ private long timeToKeep;
+ private int subjectedAttrIndex;
+ private Attribute.Type subjectedAttrType;
+ private List<InEvent> newEventList;
+ private List<RemoveEvent> oldEventList;
+ private ThreadBarrier threadBarrier;
+ private ISchedulerSiddhiQueue<StreamEvent> window;
+
+ @Override
+ protected void processEvent(InEvent event) {
+ acquireLock();
+ try {
+ newEventList.add(event);
+ } finally {
+ releaseLock();
+ }
+ }
+
+ @Override
+ protected void processEvent(InListEvent listEvent) {
+ acquireLock();
+ try {
+ System.out.println(listEvent);
+ for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
+ newEventList.add((InEvent) listEvent.getEvent(i));
+ }
+ } finally {
+ releaseLock();
+ }
+ }
+
+ @Override
+ public Iterator<StreamEvent> iterator() {
+ return window.iterator();
+ }
+
+ @Override
+ public Iterator<StreamEvent> iterator(String predicate) {
+ if (siddhiContext.isDistributedProcessingEnabled()) {
+ return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate);
+ } else {
+ return window.iterator();
+ }
+ }
+
+
+ @Override
+ public void run() {
+ acquireLock();
+ try {
+ long scheduledTime = System.currentTimeMillis();
+ try {
+ oldEventList.clear();
+ while (true) {
+ threadBarrier.pass();
+ RemoveEvent removeEvent = (RemoveEvent) window.poll();
+ if (removeEvent == null) {
+ if (oldEventList.size() > 0) {
+ nextProcessor.process(new RemoveListEvent(
+ oldEventList.toArray(new RemoveEvent[oldEventList.size()])));
+ oldEventList.clear();
+ }
+
+ if (newEventList.size() > 0) {
+ InEvent[] inEvents =
+ newEventList.toArray(new InEvent[newEventList.size()]);
+ for (InEvent inEvent : inEvents) {
+ window.put(new RemoveEvent(inEvent, -1));
+ }
+
+ InEvent[] gradientEvents = gradient(inEvents[0], inEvents[newEventList.size() - 1]);
+
+ for (InEvent inEvent : gradientEvents) {
+ window.put(new RemoveEvent(inEvent, -1));
+ }
+ nextProcessor.process(new InListEvent(gradientEvents));
+
+ newEventList.clear();
+ }
+
+ long diff = timeToKeep - (System.currentTimeMillis() - scheduledTime);
+ if (diff > 0) {
+ try {
+ eventRemoverScheduler.schedule(this, diff, TimeUnit.MILLISECONDS);
+ } catch (RejectedExecutionException ex) {
+ log.warn("scheduling cannot be accepted for execution: elementID " +
+ elementId);
+ }
+ break;
+ }
+ scheduledTime = System.currentTimeMillis();
+ } else {
+ oldEventList.add(new RemoveEvent(removeEvent, System.currentTimeMillis()));
+ }
+ }
+ } catch (Throwable t) {
+ log.error(t.getMessage(), t);
+ }
+ } finally {
+ releaseLock();
+ }
+ }
+
+
+ /**
+ * This function will calculate the linear gradient (per second) of the events received during
+ * a specified time period.
+ */
+ private InEvent[] gradient(InEvent firstInEvent, InEvent lastInEvent) {
+ double firstVal = 0.0, lastVal = 0.0;
+ // FIXME I'm not sure whether there's some other good way to do correct casting,
+ // based on the type.
+ if (Type.DOUBLE.equals(subjectedAttrType)) {
+ firstVal = (Double) firstInEvent.getData()[subjectedAttrIndex];
+ lastVal = (Double) lastInEvent.getData()[subjectedAttrIndex];
+ } else if (Type.INT.equals(subjectedAttrType)) {
+ firstVal = (Integer) firstInEvent.getData()[subjectedAttrIndex];
+ lastVal = (Integer) lastInEvent.getData()[subjectedAttrIndex];
+ } else if (Type.LONG.equals(subjectedAttrType)) {
+ firstVal = (Long) firstInEvent.getData()[subjectedAttrIndex];
+ lastVal = (Long) lastInEvent.getData()[subjectedAttrIndex];
+ } else if (Type.FLOAT.equals(subjectedAttrType)) {
+ firstVal = (Float) firstInEvent.getData()[subjectedAttrIndex];
+ lastVal = (Float) lastInEvent.getData()[subjectedAttrIndex];
+ }
+
+ long t1 = firstInEvent.getTimeStamp();
+ long t2 = lastInEvent.getTimeStamp();
+ long tGap = t2 - t1;
+ double gradient = 0.0;
+ if (tGap > 0) {
+ gradient = ((lastVal - firstVal) * 1000) / tGap;
+ }
+ log.debug("Gradient: " + gradient + " Last val: " + lastVal +
+ " First val: " + firstVal + " Time Gap: " + tGap );
+ Object[] data = firstInEvent.getData().clone();
+ data[subjectedAttrIndex] = gradient;
+ InEvent gradientEvent =
+ new InEvent(firstInEvent.getStreamId(), (t1+t2)/2,
+ data);
+ InEvent[] output = new InEvent[1];
+ output[0] = gradientEvent;
+ return output;
+ }
+
+ @Override
+ protected Object[] currentState() {
+ return new Object[]{window.currentState(), oldEventList, newEventList};
+ }
+
+ @Override
+ protected void restoreState(Object[] data) {
+ window.restoreState(data);
+ window.restoreState((Object[]) data[0]);
+ oldEventList = ((ArrayList<RemoveEvent>) data[1]);
+ newEventList = ((ArrayList<InEvent>) data[2]);
+ window.reSchedule();
+ }
+
+ @Override
+ protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {
+ if (parameters[0] instanceof IntConstant) {
+ timeToKeep = ((IntConstant) parameters[0]).getValue();
+ } else {
+ timeToKeep = ((LongConstant) parameters[0]).getValue();
+ }
+
+ String subjectedAttr = ((Variable)parameters[1]).getAttributeName();
+ subjectedAttrIndex = streamDefinition.getAttributePosition(subjectedAttr);
+ subjectedAttrType = streamDefinition.getAttributeType(subjectedAttr);
+
+ oldEventList = new ArrayList<RemoveEvent>();
+ if (this.siddhiContext.isDistributedProcessingEnabled()) {
+ newEventList = this.siddhiContext.getHazelcastInstance().getList(elementId + "-newEventList");
+ } else {
+ newEventList = new ArrayList<InEvent>();
+ }
+
+ if (this.siddhiContext.isDistributedProcessingEnabled()) {
+ window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async);
+ } else {
+ window = new SchedulerSiddhiQueue<StreamEvent>(this);
+ }
+ //Ordinary scheduling
+ window.schedule();
+
+ }
+
+ @Override
+ public void schedule() {
+ eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
+ }
+
+ public void scheduleNow() {
+ eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
+ this.eventRemoverScheduler = scheduledExecutorService;
+ }
+
+ public void setThreadBarrier(ThreadBarrier threadBarrier) {
+ this.threadBarrier = threadBarrier;
+ }
+
+ @Override
+ public void destroy(){
+ oldEventList = null;
+ newEventList = null;
+ window = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/stratos-cep-extensions/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/stratos-cep-extensions/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java b/tools/cep_artifacts/stratos-cep-extensions/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java
new file mode 100644
index 0000000..4fb7018
--- /dev/null
+++ b/tools/cep_artifacts/stratos-cep-extensions/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cep.extension;
+
+import org.apache.log4j.Logger;
+import org.wso2.siddhi.core.config.SiddhiContext;
+import org.wso2.siddhi.core.event.StreamEvent;
+import org.wso2.siddhi.core.event.in.InEvent;
+import org.wso2.siddhi.core.event.in.InListEvent;
+import org.wso2.siddhi.core.event.remove.RemoveEvent;
+import org.wso2.siddhi.core.event.remove.RemoveListEvent;
+import org.wso2.siddhi.core.persistence.ThreadBarrier;
+import org.wso2.siddhi.core.query.QueryPostProcessingElement;
+import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor;
+import org.wso2.siddhi.core.query.processor.window.WindowProcessor;
+import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue;
+import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue;
+import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.definition.Attribute.Type;
+import org.wso2.siddhi.query.api.expression.Expression;
+import org.wso2.siddhi.query.api.expression.Variable;
+import org.wso2.siddhi.query.api.expression.constant.IntConstant;
+import org.wso2.siddhi.query.api.expression.constant.LongConstant;
+import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@SiddhiExtension(namespace = "stratos", function = "secondDerivative")
+public class SecondDerivativeFinderWindowProcessor extends WindowProcessor implements RunnableWindowProcessor {
+
+ static final Logger log = Logger.getLogger(SecondDerivativeFinderWindowProcessor.class);
+ private ScheduledExecutorService eventRemoverScheduler;
+ private long timeToKeep;
+ private int subjectedAttrIndex;
+ private Attribute.Type subjectedAttrType;
+ private List<InEvent> newEventList;
+ private List<RemoveEvent> oldEventList;
+ private ThreadBarrier threadBarrier;
+ private ISchedulerSiddhiQueue<StreamEvent> window;
+
+ @Override
+ protected void processEvent(InEvent event) {
+ acquireLock();
+ try {
+ newEventList.add(event);
+ } finally {
+ releaseLock();
+ }
+ }
+
+ @Override
+ protected void processEvent(InListEvent listEvent) {
+ acquireLock();
+ try {
+ System.out.println(listEvent);
+ for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
+ newEventList.add((InEvent) listEvent.getEvent(i));
+ }
+ } finally {
+ releaseLock();
+ }
+ }
+
+ @Override
+ public Iterator<StreamEvent> iterator() {
+ return window.iterator();
+ }
+
+ @Override
+ public Iterator<StreamEvent> iterator(String predicate) {
+ if (siddhiContext.isDistributedProcessingEnabled()) {
+ return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate);
+ } else {
+ return window.iterator();
+ }
+ }
+
+
+ @Override
+ public void run() {
+ acquireLock();
+ try {
+ long scheduledTime = System.currentTimeMillis();
+ try {
+ oldEventList.clear();
+ while (true) {
+ threadBarrier.pass();
+ RemoveEvent removeEvent = (RemoveEvent) window.poll();
+ if (removeEvent == null) {
+ if (oldEventList.size() > 0) {
+ nextProcessor.process(new RemoveListEvent(
+ oldEventList.toArray(new RemoveEvent[oldEventList.size()])));
+ oldEventList.clear();
+ }
+
+ if (newEventList.size() > 0) {
+ InEvent[] inEvents =
+ newEventList.toArray(new InEvent[newEventList.size()]);
+ for (InEvent inEvent : inEvents) {
+ window.put(new RemoveEvent(inEvent, -1));
+ }
+
+ // in order to find second derivative, we need at least 3 events.
+ if (newEventList.size() > 2) {
+
+ InEvent firstDerivative1 =
+ gradient(inEvents[0],
+ inEvents[(newEventList.size() / 2) - 1],
+ null)[0];
+ InEvent firstDerivative2 =
+ gradient(inEvents[newEventList.size() / 2],
+ inEvents[newEventList.size() - 1],
+ null)[0];
+ InEvent[] secondDerivative =
+ gradient(firstDerivative1,
+ firstDerivative2, Type.DOUBLE);
+
+ for (InEvent inEvent : secondDerivative) {
+ window.put(new RemoveEvent(inEvent, -1));
+ }
+ nextProcessor.process(new InListEvent(secondDerivative));
+ } else {
+ log.debug("Insufficient events to calculate second derivative. We need at least 3 events. Current event count: " +
+ newEventList.size());
+ }
+
+ newEventList.clear();
+ }
+
+ long diff = timeToKeep - (System.currentTimeMillis() - scheduledTime);
+ if (diff > 0) {
+ try {
+ eventRemoverScheduler.schedule(this, diff, TimeUnit.MILLISECONDS);
+ } catch (RejectedExecutionException ex) {
+ log.warn("scheduling cannot be accepted for execution: elementID " +
+ elementId);
+ }
+ break;
+ }
+ scheduledTime = System.currentTimeMillis();
+ } else {
+ oldEventList.add(new RemoveEvent(removeEvent, System.currentTimeMillis()));
+ }
+ }
+ } catch (Throwable t) {
+ log.error(t.getMessage(), t);
+ }
+ } finally {
+ releaseLock();
+ }
+ }
+
+
+ /**
+ * This function will calculate the linear gradient (per second) of the events received during
+ * a specified time period.
+ */
+ private InEvent[] gradient(InEvent firstInEvent, InEvent lastInEvent, Type type) {
+ Type attrType = type == null ? subjectedAttrType : type;
+ double firstVal = 0.0, lastVal = 0.0;
+ // FIXME I'm not sure whether there's some other good way to do correct casting,
+ // based on the type.
+ if (Type.DOUBLE.equals(attrType)) {
+ firstVal = (Double) firstInEvent.getData()[subjectedAttrIndex];
+ lastVal = (Double) lastInEvent.getData()[subjectedAttrIndex];
+ } else if (Type.INT.equals(attrType)) {
+ firstVal = (Integer) firstInEvent.getData()[subjectedAttrIndex];
+ lastVal = (Integer) lastInEvent.getData()[subjectedAttrIndex];
+ } else if (Type.LONG.equals(attrType)) {
+ firstVal = (Long) firstInEvent.getData()[subjectedAttrIndex];
+ lastVal = (Long) lastInEvent.getData()[subjectedAttrIndex];
+ } else if (Type.FLOAT.equals(attrType)) {
+ firstVal = (Float) firstInEvent.getData()[subjectedAttrIndex];
+ lastVal = (Float) lastInEvent.getData()[subjectedAttrIndex];
+ }
+
+ long t1 = firstInEvent.getTimeStamp();
+ long t2 = lastInEvent.getTimeStamp();
+ long tGap = t2 - t1;
+ double gradient = 0.0;
+ if (tGap > 0) {
+ gradient = ((lastVal - firstVal) * 1000) / tGap;
+ }
+ log.debug("Gradient: " + gradient + " Last val: " + lastVal +
+ " First val: " + firstVal + " Time Gap: " + tGap );
+ Object[] data = firstInEvent.getData().clone();
+ data[subjectedAttrIndex] = gradient;
+ InEvent gradientEvent =
+ new InEvent(firstInEvent.getStreamId(), t1+((t2-t1)/2),
+ data);
+ InEvent[] output = new InEvent[1];
+ output[0] = gradientEvent;
+ return output;
+ }
+
+ @Override
+ protected Object[] currentState() {
+ return new Object[]{window.currentState(), oldEventList, newEventList};
+ }
+
+ @Override
+ protected void restoreState(Object[] data) {
+ window.restoreState(data);
+ window.restoreState((Object[]) data[0]);
+ oldEventList = ((ArrayList<RemoveEvent>) data[1]);
+ newEventList = ((ArrayList<InEvent>) data[2]);
+ window.reSchedule();
+ }
+
+ @Override
+ protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {
+ if (parameters[0] instanceof IntConstant) {
+ timeToKeep = ((IntConstant) parameters[0]).getValue();
+ } else {
+ timeToKeep = ((LongConstant) parameters[0]).getValue();
+ }
+
+ String subjectedAttr = ((Variable)parameters[1]).getAttributeName();
+ subjectedAttrIndex = streamDefinition.getAttributePosition(subjectedAttr);
+ subjectedAttrType = streamDefinition.getAttributeType(subjectedAttr);
+
+ oldEventList = new ArrayList<RemoveEvent>();
+ if (this.siddhiContext.isDistributedProcessingEnabled()) {
+ newEventList = this.siddhiContext.getHazelcastInstance().getList(elementId + "-newEventList");
+ } else {
+ newEventList = new ArrayList<InEvent>();
+ }
+
+ if (this.siddhiContext.isDistributedProcessingEnabled()) {
+ window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async);
+ } else {
+ window = new SchedulerSiddhiQueue<StreamEvent>(this);
+ }
+ //Ordinary scheduling
+ window.schedule();
+
+ }
+
+ @Override
+ public void schedule() {
+ eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
+ }
+
+ public void scheduleNow() {
+ eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
+ this.eventRemoverScheduler = scheduledExecutorService;
+ }
+
+ public void setThreadBarrier(ThreadBarrier threadBarrier) {
+ this.threadBarrier = threadBarrier;
+ }
+
+ @Override
+ public void destroy(){
+ oldEventList = null;
+ newEventList = null;
+ window = null;
+ }
+}