You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by re...@apache.org on 2013/11/01 13:58:40 UTC

git commit: adding the cep_artifacts proided back to reflect the signature of Nirmal

Updated Branches:
  refs/heads/master 1c65c79a7 -> e1cecd1a4


adding the cep_artifacts proided back to reflect the signature of Nirmal


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/e1cecd1a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/e1cecd1a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/e1cecd1a

Branch: refs/heads/master
Commit: e1cecd1a4ddefab3c44560a60b3c979f9704a631
Parents: 1c65c79
Author: Nirmal <ni...@apache.org>
Authored: Fri Nov 1 18:28:22 2013 +0530
Committer: Nirmal <ni...@apache.org>
Committed: Fri Nov 1 18:28:22 2013 +0530

----------------------------------------------------------------------
 .../InstanceStatisticsEventBuilder.xml          |  22 ++
 .../LoadBalancerStatisticsEventBuilder.xml      |  10 +
 .../stratos.instance.stats_1.0.0_builder.xml    |  10 +
 .../AverageRequestsInflightEventFormatter.xml   |  11 +
 .../eventformatters/GradientEventFormatter.xml  |  11 +
 .../SecondDerivativeEventFormatter.xml          |  11 +
 .../AverageRequestsInflightFinder.xml           |  19 ++
 .../GradientOfRequestsInFlightFinder.xml        |  18 ++
 ...SecondDerivativeOfRequestsInFlightFinder.xml |  19 ++
 .../DefaultWSO2EventInputAdaptor.xml            |   3 +
 .../InstanceStatsInputAdaptor.xml               |   9 +
 .../DefaultWSO2EventOutputAdaptor.xml           |   8 +
 .../outputeventadaptors/JMSOutputAdaptor.xml    |   8 +
 .../stratos-cep-extensions/pom.xml              |  57 ++++
 .../GradientFinderWindowProcessor.java          | 268 +++++++++++++++++
 .../SecondDerivativeFinderWindowProcessor.java  | 286 +++++++++++++++++++
 16 files changed, 770 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/eventbuilders/InstanceStatisticsEventBuilder.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/eventbuilders/InstanceStatisticsEventBuilder.xml b/tools/cep_artifacts/eventbuilders/InstanceStatisticsEventBuilder.xml
new file mode 100644
index 0000000..4b83f73
--- /dev/null
+++ b/tools/cep_artifacts/eventbuilders/InstanceStatisticsEventBuilder.xml
@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<eventBuilder name="InstanceStatisticsEventBuilder" statistics="disable"
+    trace="disable" xmlns="http://wso2.org/carbon/eventbuilder">
+    <from eventAdaptorName="InstanceStatsInputAdaptor" eventAdaptorType="jms">
+        <property name="transport.jms.Destination">instance-stats</property>
+    </from>
+    <mapping customMapping="disable" type="json">
+        <property>
+            <from jsonPath="$.instance.memberId"/>
+            <to name="memberId" type="string"/>
+        </property>
+        <property>
+            <from jsonPath="$.instance.clusterId"/>
+            <to name="clusterId" type="string"/>
+        </property>
+        <property>
+            <from jsonPath="$.instance.loadAverage"/>
+            <to name="loadAverage" type="double"/>
+        </property>
+    </mapping>
+    <to streamName="stratos.instance.stats" version="1.0.0"/>
+</eventBuilder>

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/eventbuilders/LoadBalancerStatisticsEventBuilder.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/eventbuilders/LoadBalancerStatisticsEventBuilder.xml b/tools/cep_artifacts/eventbuilders/LoadBalancerStatisticsEventBuilder.xml
new file mode 100644
index 0000000..bef34f9
--- /dev/null
+++ b/tools/cep_artifacts/eventbuilders/LoadBalancerStatisticsEventBuilder.xml
@@ -0,0 +1,10 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<eventBuilder name="LoadBalancerStatisticsEventBuilder"
+    statistics="disable" trace="enable" xmlns="http://wso2.org/carbon/eventbuilder">
+    <from eventAdaptorName="DefaultWSO2EventInputAdaptor" eventAdaptorType="wso2event">
+        <property name="stream">stratos.lb.stats</property>
+        <property name="version">1.0.0</property>
+    </from>
+    <mapping customMapping="disable" type="wso2event"/>
+    <to streamName="stratos.lb.stats" version="1.0.0"/>
+</eventBuilder>

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/eventbuilders/stratos.instance.stats_1.0.0_builder.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/eventbuilders/stratos.instance.stats_1.0.0_builder.xml b/tools/cep_artifacts/eventbuilders/stratos.instance.stats_1.0.0_builder.xml
new file mode 100644
index 0000000..ed6b588
--- /dev/null
+++ b/tools/cep_artifacts/eventbuilders/stratos.instance.stats_1.0.0_builder.xml
@@ -0,0 +1,10 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<eventBuilder name="stratos.instance.stats_1.0.0_builder"
+    statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventbuilder">
+    <from eventAdaptorName="DefaultWSO2EventInputAdaptor" eventAdaptorType="wso2event">
+        <property name="stream">stratos.instance.stats</property>
+        <property name="version">1.0.0</property>
+    </from>
+    <mapping customMapping="disable" type="wso2event"/>
+    <to streamName="stratos.instance.stats" version="1.0.0"/>
+</eventBuilder>

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/eventformatters/AverageRequestsInflightEventFormatter.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/eventformatters/AverageRequestsInflightEventFormatter.xml b/tools/cep_artifacts/eventformatters/AverageRequestsInflightEventFormatter.xml
new file mode 100644
index 0000000..19a4f30
--- /dev/null
+++ b/tools/cep_artifacts/eventformatters/AverageRequestsInflightEventFormatter.xml
@@ -0,0 +1,11 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<eventFormatter name="AverageRequestsInflightEventFormatter"
+  statistics="disable" trace="enable" xmlns="http://wso2.org/carbon/eventformatter">
+  <from streamName="aggregated_requests_stats" version="1.0.0"/>
+  <mapping customMapping="enable" type="json">
+    <inline>{"average_requests_in_flight":{"cluster_id":"{{cluster_id}}","value":"{{count}}"}}</inline>
+  </mapping>
+  <to eventAdaptorName="JMSOutputAdaptor" eventAdaptorType="jms">
+    <property name="transport.jms.Destination">summarized-health-stats</property>
+  </to>
+</eventFormatter>

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/eventformatters/GradientEventFormatter.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/eventformatters/GradientEventFormatter.xml b/tools/cep_artifacts/eventformatters/GradientEventFormatter.xml
new file mode 100644
index 0000000..e3d9e82
--- /dev/null
+++ b/tools/cep_artifacts/eventformatters/GradientEventFormatter.xml
@@ -0,0 +1,11 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<eventFormatter name="GradientEventFormatter" statistics="disable"
+  trace="disable" xmlns="http://wso2.org/carbon/eventformatter">
+  <from streamName="gradient_stats" version="1.0.0"/>
+  <mapping customMapping="enable" type="json">
+    <inline>{"gradient_of_requests_in_flight":{"cluster_id":"{{cluster_id}}","value":"{{count}}"}}</inline>
+  </mapping>
+  <to eventAdaptorName="JMSOutputAdaptor" eventAdaptorType="jms">
+    <property name="transport.jms.Destination">summarized-health-stats</property>
+  </to>
+</eventFormatter>

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/eventformatters/SecondDerivativeEventFormatter.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/eventformatters/SecondDerivativeEventFormatter.xml b/tools/cep_artifacts/eventformatters/SecondDerivativeEventFormatter.xml
new file mode 100644
index 0000000..ddb1cc4
--- /dev/null
+++ b/tools/cep_artifacts/eventformatters/SecondDerivativeEventFormatter.xml
@@ -0,0 +1,11 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<eventFormatter name="SecondDerivativeEventFormatter"
+  statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventformatter">
+  <from streamName="second_derivative_of_requests_stats" version="1.0.0"/>
+  <mapping customMapping="enable" type="json">
+    <inline>{"second_derivative_of_requests_in_flight":{"cluster_id":"{{cluster_id}}","value":"{{count}}"}}</inline>
+  </mapping>
+  <to eventAdaptorName="JMSOutputAdaptor" eventAdaptorType="jms">
+    <property name="transport.jms.Destination">summarized-health-stats</property>
+  </to>
+</eventFormatter>

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/executionplans/AverageRequestsInflightFinder.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/executionplans/AverageRequestsInflightFinder.xml b/tools/cep_artifacts/executionplans/AverageRequestsInflightFinder.xml
new file mode 100644
index 0000000..d81d859
--- /dev/null
+++ b/tools/cep_artifacts/executionplans/AverageRequestsInflightFinder.xml
@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<executionPlan name="AverageRequestsInflightFinder" statistics="disable"
+  trace="enable" xmlns="http://wso2.org/carbon/eventprocessor">
+  <description>This will average the number of requests in flight over a minute.</description>
+  <siddhiConfiguration>
+    <property name="siddhi.enable.distributed.processing">false</property>
+    <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
+  </siddhiConfiguration>
+  <importedStreams>
+    <stream as="lbStats1" name="stratos.lb.stats" version="1.0.0"/>
+  </importedStreams>
+  <queryExpressions><![CDATA[define partition lbStats1Partition by cluster_id; 
+from lbStats1#window.timeBatch(1 min)
+select cluster_id, avg(in_flight_requests) as count insert into aggregated_requests_stats partition by lbStats1Partition;]]></queryExpressions>
+  <exportedStreams>
+    <stream name="aggregated_requests_stats"
+      valueOf="aggregated_requests_stats" version="1.0.0"/>
+  </exportedStreams>
+</executionPlan>

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/executionplans/GradientOfRequestsInFlightFinder.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/executionplans/GradientOfRequestsInFlightFinder.xml b/tools/cep_artifacts/executionplans/GradientOfRequestsInFlightFinder.xml
new file mode 100644
index 0000000..7775de0
--- /dev/null
+++ b/tools/cep_artifacts/executionplans/GradientOfRequestsInFlightFinder.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<executionPlan name="GradientOfRequestsInFlightFinder"
+  statistics="disable" trace="enable" xmlns="http://wso2.org/carbon/eventprocessor">
+  <description>This will find the gradient of the number of requests in flight over a minute.</description>
+  <siddhiConfiguration>
+    <property name="siddhi.enable.distributed.processing">false</property>
+    <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
+  </siddhiConfiguration>
+  <importedStreams>
+    <stream as="lbStats2" name="stratos.lb.stats" version="1.0.0"/>
+  </importedStreams>
+  <queryExpressions><![CDATA[define partition lbStats1Partition by cluster_id; 
+from lbStats2#window.stratos:gradient(1 min, in_flight_requests)
+select cluster_id, in_flight_requests as count insert into gradient_stats partition by lbStats1Partition;]]></queryExpressions>
+  <exportedStreams>
+    <stream name="gradient_stats" valueOf="gradient_stats" version="1.0.0"/>
+  </exportedStreams>
+</executionPlan>

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/executionplans/SecondDerivativeOfRequestsInFlightFinder.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/executionplans/SecondDerivativeOfRequestsInFlightFinder.xml b/tools/cep_artifacts/executionplans/SecondDerivativeOfRequestsInFlightFinder.xml
new file mode 100644
index 0000000..af2bd0a
--- /dev/null
+++ b/tools/cep_artifacts/executionplans/SecondDerivativeOfRequestsInFlightFinder.xml
@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<executionPlan name="SecondDerivativeOfRequestsInFlightFinder"
+  statistics="disable" trace="enable" xmlns="http://wso2.org/carbon/eventprocessor">
+  <description>This will find the second derivative of the number of requests in flight over a minute.</description>
+  <siddhiConfiguration>
+    <property name="siddhi.enable.distributed.processing">false</property>
+    <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
+  </siddhiConfiguration>
+  <importedStreams>
+    <stream as="lbStats3" name="stratos.lb.stats" version="1.0.0"/>
+  </importedStreams>
+  <queryExpressions><![CDATA[define partition lbStats1Partition by cluster_id; 
+from lbStats3#window.stratos:secondDerivative(1 min, in_flight_requests)
+select cluster_id, in_flight_requests as count insert into second_derivative_of_requests_stats partition by lbStats1Partition;]]></queryExpressions>
+  <exportedStreams>
+    <stream name="second_derivative_of_requests_stats"
+      valueOf="second_derivative_of_requests_stats" version="1.0.0"/>
+  </exportedStreams>
+</executionPlan>

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/inputeventadaptors/DefaultWSO2EventInputAdaptor.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/inputeventadaptors/DefaultWSO2EventInputAdaptor.xml b/tools/cep_artifacts/inputeventadaptors/DefaultWSO2EventInputAdaptor.xml
new file mode 100644
index 0000000..8cc5e89
--- /dev/null
+++ b/tools/cep_artifacts/inputeventadaptors/DefaultWSO2EventInputAdaptor.xml
@@ -0,0 +1,3 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<inputEventAdaptor name="DefaultWSO2EventInputAdaptor"
+  statistics="disable" trace="enable" type="wso2event" xmlns="http://wso2.org/carbon/eventadaptormanager"/>

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/inputeventadaptors/InstanceStatsInputAdaptor.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/inputeventadaptors/InstanceStatsInputAdaptor.xml b/tools/cep_artifacts/inputeventadaptors/InstanceStatsInputAdaptor.xml
new file mode 100644
index 0000000..a687546
--- /dev/null
+++ b/tools/cep_artifacts/inputeventadaptors/InstanceStatsInputAdaptor.xml
@@ -0,0 +1,9 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<inputEventAdaptor name="InstanceStatsInputAdaptor" statistics="disable"
+  trace="disable" type="jms" xmlns="http://wso2.org/carbon/eventadaptormanager">
+  <property name="java.naming.provider.url">/home/ubuntu/packages/cep/wso2cep-3.0.0/repository/conf/jndi.properties</property>
+  <property name="transport.jms.SubscriptionDurable">false</property>
+  <property name="java.naming.factory.initial">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</property>
+  <property name="transport.jms.ConnectionFactoryJNDIName">topicConnectionfactory</property>
+  <property name="transport.jms.DestinationType">topic</property>
+</inputEventAdaptor>

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/outputeventadaptors/DefaultWSO2EventOutputAdaptor.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/outputeventadaptors/DefaultWSO2EventOutputAdaptor.xml b/tools/cep_artifacts/outputeventadaptors/DefaultWSO2EventOutputAdaptor.xml
new file mode 100644
index 0000000..59ba20d
--- /dev/null
+++ b/tools/cep_artifacts/outputeventadaptors/DefaultWSO2EventOutputAdaptor.xml
@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<outputEventAdaptor name="DefaultWSO2EventOutputAdaptor"
+  statistics="disable" trace="disable" type="wso2event" xmlns="http://wso2.org/carbon/eventadaptormanager">
+  <property name="username">admin</property>
+  <property name="receiverURL">tcp://localhost:7661</property>
+  <property name="password">admin</property>
+  <property name="authenticatorURL">ssl://localhost:7761</property>
+</outputEventAdaptor>

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/outputeventadaptors/JMSOutputAdaptor.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/outputeventadaptors/JMSOutputAdaptor.xml b/tools/cep_artifacts/outputeventadaptors/JMSOutputAdaptor.xml
new file mode 100644
index 0000000..bb4d6d7
--- /dev/null
+++ b/tools/cep_artifacts/outputeventadaptors/JMSOutputAdaptor.xml
@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<outputEventAdaptor name="JMSOutputAdaptor" statistics="disable"
+  trace="enable" type="jms" xmlns="http://wso2.org/carbon/eventadaptormanager">
+  <property name="java.naming.provider.url">/home/ubuntu/packages/cep/wso2cep-3.0.0/repository/conf/jndi.properties</property>
+  <property name="java.naming.factory.initial">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</property>
+  <property name="transport.jms.ConnectionFactoryJNDIName">topicConnectionfactory</property>
+  <property name="transport.jms.DestinationType">topic</property>
+</outputEventAdaptor>

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/stratos-cep-extensions/pom.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/stratos-cep-extensions/pom.xml b/tools/cep_artifacts/stratos-cep-extensions/pom.xml
new file mode 100644
index 0000000..e765538
--- /dev/null
+++ b/tools/cep_artifacts/stratos-cep-extensions/pom.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+  #  Licensed to the Apache Software Foundation (ASF) under one
+  #  or more contributor license agreements.  See the NOTICE file
+  #  distributed with this work for additional information
+  #  regarding copyright ownership.  The ASF licenses this file
+  #  to you under the Apache License, Version 2.0 (the
+  #  "License"); you may not use this file except in compliance
+  #  with the License.  You may obtain a copy of the License at
+  #
+  #  http://www.apache.org/licenses/LICENSE-2.0
+  #
+  #  Unless required by applicable law or agreed to in writing,
+  #  software distributed under the License is distributed on an
+  #  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  #  KIND, either express or implied.  See the License for the
+  #  specific language governing permissions and limitations
+  #  under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.stratos</groupId>
+    <artifactId>org.apache.stratos.cep.extension</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+
+    <repositories>
+        <repository>
+            <id>wso2-maven2-repository</id>
+            <name>WSO2 Maven2 Repository</name>
+            <url>http://dist.wso2.org/maven2</url>
+        </repository>
+    </repositories>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.wso2.siddhi</groupId>
+            <artifactId>siddhi-core</artifactId>
+            <version>2.0.0-wso2v4</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>1.6</source>
+                    <target>1.6</target>
+                </configuration>
+            </plugin>
+        </plugins>
+
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/stratos-cep-extensions/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/stratos-cep-extensions/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java b/tools/cep_artifacts/stratos-cep-extensions/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java
new file mode 100644
index 0000000..6ad1e87
--- /dev/null
+++ b/tools/cep_artifacts/stratos-cep-extensions/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cep.extension;
+
+import org.apache.log4j.Logger;
+import org.wso2.siddhi.core.config.SiddhiContext;
+import org.wso2.siddhi.core.event.StreamEvent;
+import org.wso2.siddhi.core.event.in.InEvent;
+import org.wso2.siddhi.core.event.in.InListEvent;
+import org.wso2.siddhi.core.event.remove.RemoveEvent;
+import org.wso2.siddhi.core.event.remove.RemoveListEvent;
+import org.wso2.siddhi.core.persistence.ThreadBarrier;
+import org.wso2.siddhi.core.query.QueryPostProcessingElement;
+import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor;
+import org.wso2.siddhi.core.query.processor.window.WindowProcessor;
+import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue;
+import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue;
+import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.definition.Attribute.Type;
+import org.wso2.siddhi.query.api.expression.Expression;
+import org.wso2.siddhi.query.api.expression.Variable;
+import org.wso2.siddhi.query.api.expression.constant.IntConstant;
+import org.wso2.siddhi.query.api.expression.constant.LongConstant;
+import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@SiddhiExtension(namespace = "stratos", function = "gradient")
+public class GradientFinderWindowProcessor extends WindowProcessor implements RunnableWindowProcessor {
+
+    static final Logger log = Logger.getLogger(GradientFinderWindowProcessor.class);
+    private ScheduledExecutorService eventRemoverScheduler;
+    private long timeToKeep;
+    private int subjectedAttrIndex;
+    private Attribute.Type subjectedAttrType;
+    private List<InEvent> newEventList;
+    private List<RemoveEvent> oldEventList;
+    private ThreadBarrier threadBarrier;
+    private ISchedulerSiddhiQueue<StreamEvent> window;
+
+    @Override
+    protected void processEvent(InEvent event) {
+        acquireLock();
+        try {
+            newEventList.add(event);
+        } finally {
+            releaseLock();
+        }
+    }
+
+    @Override
+    protected void processEvent(InListEvent listEvent) {
+        acquireLock();
+        try {
+            System.out.println(listEvent);
+            for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
+                newEventList.add((InEvent) listEvent.getEvent(i));
+            }
+        } finally {
+            releaseLock();
+        }
+    }
+
+    @Override
+    public Iterator<StreamEvent> iterator() {
+        return window.iterator();
+    }
+
+    @Override
+    public Iterator<StreamEvent> iterator(String predicate) {
+        if (siddhiContext.isDistributedProcessingEnabled()) {
+            return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate);
+        } else {
+            return window.iterator();
+        }
+    }
+
+
+    @Override
+	public void run() {
+		acquireLock();
+		try {
+			long scheduledTime = System.currentTimeMillis();
+			try {
+				oldEventList.clear();
+				while (true) {
+					threadBarrier.pass();
+					RemoveEvent removeEvent = (RemoveEvent) window.poll();
+					if (removeEvent == null) {
+						if (oldEventList.size() > 0) {
+							nextProcessor.process(new RemoveListEvent(
+							                                          oldEventList.toArray(new RemoveEvent[oldEventList.size()])));
+							oldEventList.clear();
+						}
+
+						if (newEventList.size() > 0) {
+							InEvent[] inEvents =
+							                     newEventList.toArray(new InEvent[newEventList.size()]);
+							for (InEvent inEvent : inEvents) {
+								window.put(new RemoveEvent(inEvent, -1));
+							}
+							
+							InEvent[] gradientEvents = gradient(inEvents[0], inEvents[newEventList.size() - 1]);
+							
+							for (InEvent inEvent : gradientEvents) {
+	                            window.put(new RemoveEvent(inEvent, -1));
+                            }
+							nextProcessor.process(new InListEvent(gradientEvents));
+
+							newEventList.clear();
+						}
+
+						long diff = timeToKeep - (System.currentTimeMillis() - scheduledTime);
+						if (diff > 0) {
+							try {
+								eventRemoverScheduler.schedule(this, diff, TimeUnit.MILLISECONDS);
+							} catch (RejectedExecutionException ex) {
+								log.warn("scheduling cannot be accepted for execution: elementID " +
+								         elementId);
+							}
+							break;
+						}
+						scheduledTime = System.currentTimeMillis();
+					} else {
+						oldEventList.add(new RemoveEvent(removeEvent, System.currentTimeMillis()));
+					}
+				}
+			} catch (Throwable t) {
+				log.error(t.getMessage(), t);
+			}
+		} finally {
+			releaseLock();
+		}
+	}
+
+
+    /**
+     * This function will calculate the linear gradient (per second) of the events received during
+     * a specified time period.
+     */
+	private InEvent[] gradient(InEvent firstInEvent, InEvent lastInEvent) {
+		double firstVal = 0.0, lastVal = 0.0;
+		// FIXME I'm not sure whether there's some other good way to do correct casting,
+		// based on the type.
+		if (Type.DOUBLE.equals(subjectedAttrType)) {
+			firstVal = (Double) firstInEvent.getData()[subjectedAttrIndex];
+			lastVal = (Double) lastInEvent.getData()[subjectedAttrIndex];
+		} else if (Type.INT.equals(subjectedAttrType)) {
+			firstVal = (Integer) firstInEvent.getData()[subjectedAttrIndex];
+			lastVal = (Integer) lastInEvent.getData()[subjectedAttrIndex];
+		} else if (Type.LONG.equals(subjectedAttrType)) {
+			firstVal = (Long) firstInEvent.getData()[subjectedAttrIndex];
+			lastVal = (Long) lastInEvent.getData()[subjectedAttrIndex];
+		} else if (Type.FLOAT.equals(subjectedAttrType)) {
+			firstVal = (Float) firstInEvent.getData()[subjectedAttrIndex];
+			lastVal = (Float) lastInEvent.getData()[subjectedAttrIndex];
+		}
+		
+		long t1 = firstInEvent.getTimeStamp();
+		long t2 = lastInEvent.getTimeStamp();
+		long tGap = t2 - t1;
+		double gradient = 0.0;
+		if (tGap > 0) {
+			gradient = ((lastVal - firstVal) * 1000) / tGap;
+		}
+		log.debug("Gradient: " + gradient + " Last val: " + lastVal +
+		          " First val: " + firstVal + " Time Gap: " + tGap );
+		Object[] data = firstInEvent.getData().clone();
+		data[subjectedAttrIndex] = gradient;
+		InEvent gradientEvent =
+		                        new InEvent(firstInEvent.getStreamId(), (t1+t2)/2,
+		                                    data);
+		InEvent[] output = new InEvent[1];
+		output[0] = gradientEvent;
+		return output;
+	}
+
+	@Override
+    protected Object[] currentState() {
+        return new Object[]{window.currentState(), oldEventList, newEventList};
+    }
+
+    @Override
+    protected void restoreState(Object[] data) {
+        window.restoreState(data);
+        window.restoreState((Object[]) data[0]);
+        oldEventList = ((ArrayList<RemoveEvent>) data[1]);
+        newEventList = ((ArrayList<InEvent>) data[2]);
+        window.reSchedule();
+    }
+
+    @Override
+    protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {
+        if (parameters[0] instanceof IntConstant) {
+            timeToKeep = ((IntConstant) parameters[0]).getValue();
+        } else {
+            timeToKeep = ((LongConstant) parameters[0]).getValue();
+        }
+        
+        String subjectedAttr = ((Variable)parameters[1]).getAttributeName();
+        subjectedAttrIndex = streamDefinition.getAttributePosition(subjectedAttr);
+        subjectedAttrType = streamDefinition.getAttributeType(subjectedAttr);
+
+        oldEventList = new ArrayList<RemoveEvent>();
+        if (this.siddhiContext.isDistributedProcessingEnabled()) {
+            newEventList = this.siddhiContext.getHazelcastInstance().getList(elementId + "-newEventList");
+        } else {
+            newEventList = new ArrayList<InEvent>();
+        }
+
+        if (this.siddhiContext.isDistributedProcessingEnabled()) {
+            window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async);
+        } else {
+            window = new SchedulerSiddhiQueue<StreamEvent>(this);
+        }
+        //Ordinary scheduling
+        window.schedule();
+
+    }
+
+    @Override
+    public void schedule() {
+        eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
+    }
+
+    public void scheduleNow() {
+        eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
+        this.eventRemoverScheduler = scheduledExecutorService;
+    }
+
+    public void setThreadBarrier(ThreadBarrier threadBarrier) {
+        this.threadBarrier = threadBarrier;
+    }
+
+    @Override
+    public void destroy(){
+    	oldEventList = null;
+    	newEventList = null;
+    	window = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e1cecd1a/tools/cep_artifacts/stratos-cep-extensions/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/stratos-cep-extensions/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java b/tools/cep_artifacts/stratos-cep-extensions/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java
new file mode 100644
index 0000000..4fb7018
--- /dev/null
+++ b/tools/cep_artifacts/stratos-cep-extensions/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cep.extension;
+
+import org.apache.log4j.Logger;
+import org.wso2.siddhi.core.config.SiddhiContext;
+import org.wso2.siddhi.core.event.StreamEvent;
+import org.wso2.siddhi.core.event.in.InEvent;
+import org.wso2.siddhi.core.event.in.InListEvent;
+import org.wso2.siddhi.core.event.remove.RemoveEvent;
+import org.wso2.siddhi.core.event.remove.RemoveListEvent;
+import org.wso2.siddhi.core.persistence.ThreadBarrier;
+import org.wso2.siddhi.core.query.QueryPostProcessingElement;
+import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor;
+import org.wso2.siddhi.core.query.processor.window.WindowProcessor;
+import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue;
+import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue;
+import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.definition.Attribute.Type;
+import org.wso2.siddhi.query.api.expression.Expression;
+import org.wso2.siddhi.query.api.expression.Variable;
+import org.wso2.siddhi.query.api.expression.constant.IntConstant;
+import org.wso2.siddhi.query.api.expression.constant.LongConstant;
+import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@SiddhiExtension(namespace = "stratos", function = "secondDerivative")
+public class SecondDerivativeFinderWindowProcessor extends WindowProcessor implements RunnableWindowProcessor {
+
+    static final Logger log = Logger.getLogger(SecondDerivativeFinderWindowProcessor.class);
+    private ScheduledExecutorService eventRemoverScheduler;
+    private long timeToKeep;
+    private int subjectedAttrIndex;
+    private Attribute.Type subjectedAttrType;
+    private List<InEvent> newEventList;
+    private List<RemoveEvent> oldEventList;
+    private ThreadBarrier threadBarrier;
+    private ISchedulerSiddhiQueue<StreamEvent> window;
+
+    @Override
+    protected void processEvent(InEvent event) {
+        acquireLock();
+        try {
+            newEventList.add(event);
+        } finally {
+            releaseLock();
+        }
+    }
+
+    @Override
+    protected void processEvent(InListEvent listEvent) {
+        acquireLock();
+        try {
+            System.out.println(listEvent);
+            for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
+                newEventList.add((InEvent) listEvent.getEvent(i));
+            }
+        } finally {
+            releaseLock();
+        }
+    }
+
+    @Override
+    public Iterator<StreamEvent> iterator() {
+        return window.iterator();
+    }
+
+    @Override
+    public Iterator<StreamEvent> iterator(String predicate) {
+        if (siddhiContext.isDistributedProcessingEnabled()) {
+            return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate);
+        } else {
+            return window.iterator();
+        }
+    }
+
+
+    @Override
+	public void run() {
+		acquireLock();
+		try {
+			long scheduledTime = System.currentTimeMillis();
+			try {
+				oldEventList.clear();
+				while (true) {
+					threadBarrier.pass();
+					RemoveEvent removeEvent = (RemoveEvent) window.poll();
+					if (removeEvent == null) {
+						if (oldEventList.size() > 0) {
+							nextProcessor.process(new RemoveListEvent(
+							                                          oldEventList.toArray(new RemoveEvent[oldEventList.size()])));
+							oldEventList.clear();
+						}
+
+						if (newEventList.size() > 0) {
+							InEvent[] inEvents =
+							                     newEventList.toArray(new InEvent[newEventList.size()]);
+							for (InEvent inEvent : inEvents) {
+								window.put(new RemoveEvent(inEvent, -1));
+							}
+							
+							// in order to find second derivative, we need at least 3 events.
+							if (newEventList.size() > 2) {
+
+								InEvent firstDerivative1 =
+								                           gradient(inEvents[0],
+								                                    inEvents[(newEventList.size() / 2) - 1],
+								                                    null)[0];
+								InEvent firstDerivative2 =
+								                           gradient(inEvents[newEventList.size() / 2],
+								                                    inEvents[newEventList.size() - 1],
+								                                    null)[0];
+								InEvent[] secondDerivative =
+								                             gradient(firstDerivative1,
+								                                      firstDerivative2, Type.DOUBLE);
+
+								for (InEvent inEvent : secondDerivative) {
+									window.put(new RemoveEvent(inEvent, -1));
+								}
+								nextProcessor.process(new InListEvent(secondDerivative));
+							} else {
+								log.debug("Insufficient events to calculate second derivative. We need at least 3 events. Current event count: " +
+								          newEventList.size());
+							}
+
+							newEventList.clear();
+						}
+
+						long diff = timeToKeep - (System.currentTimeMillis() - scheduledTime);
+						if (diff > 0) {
+							try {
+								eventRemoverScheduler.schedule(this, diff, TimeUnit.MILLISECONDS);
+							} catch (RejectedExecutionException ex) {
+								log.warn("scheduling cannot be accepted for execution: elementID " +
+								         elementId);
+							}
+							break;
+						}
+						scheduledTime = System.currentTimeMillis();
+					} else {
+						oldEventList.add(new RemoveEvent(removeEvent, System.currentTimeMillis()));
+					}
+				}
+			} catch (Throwable t) {
+				log.error(t.getMessage(), t);
+			}
+		} finally {
+			releaseLock();
+		}
+	}
+
+
+    /**
+     * This function will calculate the linear gradient (per second) of the events received during
+     * a specified time period.
+     */
+	private InEvent[] gradient(InEvent firstInEvent, InEvent lastInEvent, Type type) {
+		Type attrType = type == null ? subjectedAttrType : type;
+		double firstVal = 0.0, lastVal = 0.0;
+		// FIXME I'm not sure whether there's some other good way to do correct casting,
+		// based on the type.
+		if (Type.DOUBLE.equals(attrType)) {
+			firstVal = (Double) firstInEvent.getData()[subjectedAttrIndex];
+			lastVal = (Double) lastInEvent.getData()[subjectedAttrIndex];
+		} else if (Type.INT.equals(attrType)) {
+			firstVal = (Integer) firstInEvent.getData()[subjectedAttrIndex];
+			lastVal = (Integer) lastInEvent.getData()[subjectedAttrIndex];
+		} else if (Type.LONG.equals(attrType)) {
+			firstVal = (Long) firstInEvent.getData()[subjectedAttrIndex];
+			lastVal = (Long) lastInEvent.getData()[subjectedAttrIndex];
+		} else if (Type.FLOAT.equals(attrType)) {
+			firstVal = (Float) firstInEvent.getData()[subjectedAttrIndex];
+			lastVal = (Float) lastInEvent.getData()[subjectedAttrIndex];
+		}
+		
+		long t1 = firstInEvent.getTimeStamp();
+		long t2 = lastInEvent.getTimeStamp();
+		long tGap = t2 - t1;
+		double gradient = 0.0;
+		if (tGap > 0) {
+			gradient = ((lastVal - firstVal) * 1000) / tGap;
+		}
+		log.debug("Gradient: " + gradient + " Last val: " + lastVal +
+		          " First val: " + firstVal + " Time Gap: " + tGap );
+		Object[] data = firstInEvent.getData().clone();
+		data[subjectedAttrIndex] = gradient;
+		InEvent gradientEvent =
+		                        new InEvent(firstInEvent.getStreamId(), t1+((t2-t1)/2),
+		                                    data);
+		InEvent[] output = new InEvent[1];
+		output[0] = gradientEvent;
+		return output;
+	}
+
+	@Override
+    protected Object[] currentState() {
+        return new Object[]{window.currentState(), oldEventList, newEventList};
+    }
+
+    @Override
+    protected void restoreState(Object[] data) {
+        window.restoreState(data);
+        window.restoreState((Object[]) data[0]);
+        oldEventList = ((ArrayList<RemoveEvent>) data[1]);
+        newEventList = ((ArrayList<InEvent>) data[2]);
+        window.reSchedule();
+    }
+
+    @Override
+    protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {
+        if (parameters[0] instanceof IntConstant) {
+            timeToKeep = ((IntConstant) parameters[0]).getValue();
+        } else {
+            timeToKeep = ((LongConstant) parameters[0]).getValue();
+        }
+        
+        String subjectedAttr = ((Variable)parameters[1]).getAttributeName();
+        subjectedAttrIndex = streamDefinition.getAttributePosition(subjectedAttr);
+        subjectedAttrType = streamDefinition.getAttributeType(subjectedAttr);
+
+        oldEventList = new ArrayList<RemoveEvent>();
+        if (this.siddhiContext.isDistributedProcessingEnabled()) {
+            newEventList = this.siddhiContext.getHazelcastInstance().getList(elementId + "-newEventList");
+        } else {
+            newEventList = new ArrayList<InEvent>();
+        }
+
+        if (this.siddhiContext.isDistributedProcessingEnabled()) {
+            window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async);
+        } else {
+            window = new SchedulerSiddhiQueue<StreamEvent>(this);
+        }
+        //Ordinary scheduling
+        window.schedule();
+
+    }
+
+    @Override
+    public void schedule() {
+        eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
+    }
+
+    public void scheduleNow() {
+        eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
+        this.eventRemoverScheduler = scheduledExecutorService;
+    }
+
+    public void setThreadBarrier(ThreadBarrier threadBarrier) {
+        this.threadBarrier = threadBarrier;
+    }
+
+    @Override
+    public void destroy(){
+    	oldEventList = null;
+    	newEventList = null;
+    	window = null;
+    }
+}