You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2013/11/06 07:26:27 UTC
git commit: Moved cep related content to
incubator-stratos/extensions/cep
Updated Branches:
refs/heads/master 73d4a873f -> 50e45d8bd
Moved cep related content to incubator-stratos/extensions/cep
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/50e45d8b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/50e45d8b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/50e45d8b
Branch: refs/heads/master
Commit: 50e45d8bd4af89e5b89880022f6bf4e839d6423c
Parents: 73d4a87
Author: Imesh Gunaratne <im...@apache.org>
Authored: Wed Nov 6 11:56:16 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Wed Nov 6 11:56:16 2013 +0530
----------------------------------------------------------------------
.../InstanceStatisticsEventBuilder.xml | 22 ++
.../LoadBalancerStatisticsEventBuilder.xml | 10 +
.../stratos.instance.stats_1.0.0_builder.xml | 10 +
.../AverageRequestsInflightEventFormatter.xml | 11 +
.../eventformatters/GradientEventFormatter.xml | 11 +
.../SecondDerivativeEventFormatter.xml | 11 +
.../AverageRequestsInflightFinder.xml | 19 ++
.../GradientOfRequestsInFlightFinder.xml | 18 ++
...SecondDerivativeOfRequestsInFlightFinder.xml | 19 ++
.../DefaultWSO2EventInputAdaptor.xml | 3 +
.../InstanceStatsInputAdaptor.xml | 9 +
.../DefaultWSO2EventOutputAdaptor.xml | 8 +
.../outputeventadaptors/JMSOutputAdaptor.xml | 8 +
extensions/cep/stratos-cep-extension/pom.xml | 57 ++++
.../GradientFinderWindowProcessor.java | 268 +++++++++++++++++
.../SecondDerivativeFinderWindowProcessor.java | 286 +++++++++++++++++++
.../InstanceStatisticsEventBuilder.xml | 22 --
.../LoadBalancerStatisticsEventBuilder.xml | 10 -
.../stratos.instance.stats_1.0.0_builder.xml | 10 -
.../AverageRequestsInflightEventFormatter.xml | 11 -
.../eventformatters/GradientEventFormatter.xml | 11 -
.../SecondDerivativeEventFormatter.xml | 11 -
.../AverageRequestsInflightFinder.xml | 19 --
.../GradientOfRequestsInFlightFinder.xml | 18 --
...SecondDerivativeOfRequestsInFlightFinder.xml | 19 --
.../DefaultWSO2EventInputAdaptor.xml | 3 -
.../InstanceStatsInputAdaptor.xml | 9 -
.../DefaultWSO2EventOutputAdaptor.xml | 8 -
.../outputeventadaptors/JMSOutputAdaptor.xml | 8 -
.../stratos-cep-extensions/pom.xml | 57 ----
.../GradientFinderWindowProcessor.java | 268 -----------------
.../SecondDerivativeFinderWindowProcessor.java | 286 -------------------
32 files changed, 770 insertions(+), 770 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/extensions/cep/artifacts/eventbuilders/InstanceStatisticsEventBuilder.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/artifacts/eventbuilders/InstanceStatisticsEventBuilder.xml b/extensions/cep/artifacts/eventbuilders/InstanceStatisticsEventBuilder.xml
new file mode 100644
index 0000000..4b83f73
--- /dev/null
+++ b/extensions/cep/artifacts/eventbuilders/InstanceStatisticsEventBuilder.xml
@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<eventBuilder name="InstanceStatisticsEventBuilder" statistics="disable"
+ trace="disable" xmlns="http://wso2.org/carbon/eventbuilder">
+ <from eventAdaptorName="InstanceStatsInputAdaptor" eventAdaptorType="jms">
+ <property name="transport.jms.Destination">instance-stats</property>
+ </from>
+ <mapping customMapping="disable" type="json">
+ <property>
+ <from jsonPath="$.instance.memberId"/>
+ <to name="memberId" type="string"/>
+ </property>
+ <property>
+ <from jsonPath="$.instance.clusterId"/>
+ <to name="clusterId" type="string"/>
+ </property>
+ <property>
+ <from jsonPath="$.instance.loadAverage"/>
+ <to name="loadAverage" type="double"/>
+ </property>
+ </mapping>
+ <to streamName="stratos.instance.stats" version="1.0.0"/>
+</eventBuilder>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/extensions/cep/artifacts/eventbuilders/LoadBalancerStatisticsEventBuilder.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/artifacts/eventbuilders/LoadBalancerStatisticsEventBuilder.xml b/extensions/cep/artifacts/eventbuilders/LoadBalancerStatisticsEventBuilder.xml
new file mode 100644
index 0000000..bef34f9
--- /dev/null
+++ b/extensions/cep/artifacts/eventbuilders/LoadBalancerStatisticsEventBuilder.xml
@@ -0,0 +1,10 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<eventBuilder name="LoadBalancerStatisticsEventBuilder"
+ statistics="disable" trace="enable" xmlns="http://wso2.org/carbon/eventbuilder">
+ <from eventAdaptorName="DefaultWSO2EventInputAdaptor" eventAdaptorType="wso2event">
+ <property name="stream">stratos.lb.stats</property>
+ <property name="version">1.0.0</property>
+ </from>
+ <mapping customMapping="disable" type="wso2event"/>
+ <to streamName="stratos.lb.stats" version="1.0.0"/>
+</eventBuilder>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/extensions/cep/artifacts/eventbuilders/stratos.instance.stats_1.0.0_builder.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/artifacts/eventbuilders/stratos.instance.stats_1.0.0_builder.xml b/extensions/cep/artifacts/eventbuilders/stratos.instance.stats_1.0.0_builder.xml
new file mode 100644
index 0000000..ed6b588
--- /dev/null
+++ b/extensions/cep/artifacts/eventbuilders/stratos.instance.stats_1.0.0_builder.xml
@@ -0,0 +1,10 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<eventBuilder name="stratos.instance.stats_1.0.0_builder"
+ statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventbuilder">
+ <from eventAdaptorName="DefaultWSO2EventInputAdaptor" eventAdaptorType="wso2event">
+ <property name="stream">stratos.instance.stats</property>
+ <property name="version">1.0.0</property>
+ </from>
+ <mapping customMapping="disable" type="wso2event"/>
+ <to streamName="stratos.instance.stats" version="1.0.0"/>
+</eventBuilder>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/extensions/cep/artifacts/eventformatters/AverageRequestsInflightEventFormatter.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/artifacts/eventformatters/AverageRequestsInflightEventFormatter.xml b/extensions/cep/artifacts/eventformatters/AverageRequestsInflightEventFormatter.xml
new file mode 100644
index 0000000..19a4f30
--- /dev/null
+++ b/extensions/cep/artifacts/eventformatters/AverageRequestsInflightEventFormatter.xml
@@ -0,0 +1,11 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<eventFormatter name="AverageRequestsInflightEventFormatter"
+ statistics="disable" trace="enable" xmlns="http://wso2.org/carbon/eventformatter">
+ <from streamName="aggregated_requests_stats" version="1.0.0"/>
+ <mapping customMapping="enable" type="json">
+ <inline>{"average_requests_in_flight":{"cluster_id":"{{cluster_id}}","value":"{{count}}"}}</inline>
+ </mapping>
+ <to eventAdaptorName="JMSOutputAdaptor" eventAdaptorType="jms">
+ <property name="transport.jms.Destination">summarized-health-stats</property>
+ </to>
+</eventFormatter>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/extensions/cep/artifacts/eventformatters/GradientEventFormatter.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/artifacts/eventformatters/GradientEventFormatter.xml b/extensions/cep/artifacts/eventformatters/GradientEventFormatter.xml
new file mode 100644
index 0000000..e3d9e82
--- /dev/null
+++ b/extensions/cep/artifacts/eventformatters/GradientEventFormatter.xml
@@ -0,0 +1,11 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<eventFormatter name="GradientEventFormatter" statistics="disable"
+ trace="disable" xmlns="http://wso2.org/carbon/eventformatter">
+ <from streamName="gradient_stats" version="1.0.0"/>
+ <mapping customMapping="enable" type="json">
+ <inline>{"gradient_of_requests_in_flight":{"cluster_id":"{{cluster_id}}","value":"{{count}}"}}</inline>
+ </mapping>
+ <to eventAdaptorName="JMSOutputAdaptor" eventAdaptorType="jms">
+ <property name="transport.jms.Destination">summarized-health-stats</property>
+ </to>
+</eventFormatter>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/extensions/cep/artifacts/eventformatters/SecondDerivativeEventFormatter.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/artifacts/eventformatters/SecondDerivativeEventFormatter.xml b/extensions/cep/artifacts/eventformatters/SecondDerivativeEventFormatter.xml
new file mode 100644
index 0000000..ddb1cc4
--- /dev/null
+++ b/extensions/cep/artifacts/eventformatters/SecondDerivativeEventFormatter.xml
@@ -0,0 +1,11 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<eventFormatter name="SecondDerivativeEventFormatter"
+ statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventformatter">
+ <from streamName="second_derivative_of_requests_stats" version="1.0.0"/>
+ <mapping customMapping="enable" type="json">
+ <inline>{"second_derivative_of_requests_in_flight":{"cluster_id":"{{cluster_id}}","value":"{{count}}"}}</inline>
+ </mapping>
+ <to eventAdaptorName="JMSOutputAdaptor" eventAdaptorType="jms">
+ <property name="transport.jms.Destination">summarized-health-stats</property>
+ </to>
+</eventFormatter>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/extensions/cep/artifacts/executionplans/AverageRequestsInflightFinder.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/artifacts/executionplans/AverageRequestsInflightFinder.xml b/extensions/cep/artifacts/executionplans/AverageRequestsInflightFinder.xml
new file mode 100644
index 0000000..d81d859
--- /dev/null
+++ b/extensions/cep/artifacts/executionplans/AverageRequestsInflightFinder.xml
@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<executionPlan name="AverageRequestsInflightFinder" statistics="disable"
+ trace="enable" xmlns="http://wso2.org/carbon/eventprocessor">
+ <description>This will average the number of requests in flight over a minute.</description>
+ <siddhiConfiguration>
+ <property name="siddhi.enable.distributed.processing">false</property>
+ <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
+ </siddhiConfiguration>
+ <importedStreams>
+ <stream as="lbStats1" name="stratos.lb.stats" version="1.0.0"/>
+ </importedStreams>
+ <queryExpressions><![CDATA[define partition lbStats1Partition by cluster_id;
+from lbStats1#window.timeBatch(1 min)
+select cluster_id, avg(in_flight_requests) as count insert into aggregated_requests_stats partition by lbStats1Partition;]]></queryExpressions>
+ <exportedStreams>
+ <stream name="aggregated_requests_stats"
+ valueOf="aggregated_requests_stats" version="1.0.0"/>
+ </exportedStreams>
+</executionPlan>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/extensions/cep/artifacts/executionplans/GradientOfRequestsInFlightFinder.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/artifacts/executionplans/GradientOfRequestsInFlightFinder.xml b/extensions/cep/artifacts/executionplans/GradientOfRequestsInFlightFinder.xml
new file mode 100644
index 0000000..7775de0
--- /dev/null
+++ b/extensions/cep/artifacts/executionplans/GradientOfRequestsInFlightFinder.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<executionPlan name="GradientOfRequestsInFlightFinder"
+ statistics="disable" trace="enable" xmlns="http://wso2.org/carbon/eventprocessor">
+ <description>This will find the gradient of the number of requests in flight over a minute.</description>
+ <siddhiConfiguration>
+ <property name="siddhi.enable.distributed.processing">false</property>
+ <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
+ </siddhiConfiguration>
+ <importedStreams>
+ <stream as="lbStats2" name="stratos.lb.stats" version="1.0.0"/>
+ </importedStreams>
+ <queryExpressions><![CDATA[define partition lbStats1Partition by cluster_id;
+from lbStats2#window.stratos:gradient(1 min, in_flight_requests)
+select cluster_id, in_flight_requests as count insert into gradient_stats partition by lbStats1Partition;]]></queryExpressions>
+ <exportedStreams>
+ <stream name="gradient_stats" valueOf="gradient_stats" version="1.0.0"/>
+ </exportedStreams>
+</executionPlan>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/extensions/cep/artifacts/executionplans/SecondDerivativeOfRequestsInFlightFinder.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/artifacts/executionplans/SecondDerivativeOfRequestsInFlightFinder.xml b/extensions/cep/artifacts/executionplans/SecondDerivativeOfRequestsInFlightFinder.xml
new file mode 100644
index 0000000..af2bd0a
--- /dev/null
+++ b/extensions/cep/artifacts/executionplans/SecondDerivativeOfRequestsInFlightFinder.xml
@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<executionPlan name="SecondDerivativeOfRequestsInFlightFinder"
+ statistics="disable" trace="enable" xmlns="http://wso2.org/carbon/eventprocessor">
+ <description>This will find the second derivative of the number of requests in flight over a minute.</description>
+ <siddhiConfiguration>
+ <property name="siddhi.enable.distributed.processing">false</property>
+ <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
+ </siddhiConfiguration>
+ <importedStreams>
+ <stream as="lbStats3" name="stratos.lb.stats" version="1.0.0"/>
+ </importedStreams>
+ <queryExpressions><![CDATA[define partition lbStats1Partition by cluster_id;
+from lbStats3#window.stratos:secondDerivative(1 min, in_flight_requests)
+select cluster_id, in_flight_requests as count insert into second_derivative_of_requests_stats partition by lbStats1Partition;]]></queryExpressions>
+ <exportedStreams>
+ <stream name="second_derivative_of_requests_stats"
+ valueOf="second_derivative_of_requests_stats" version="1.0.0"/>
+ </exportedStreams>
+</executionPlan>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/extensions/cep/artifacts/inputeventadaptors/DefaultWSO2EventInputAdaptor.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/artifacts/inputeventadaptors/DefaultWSO2EventInputAdaptor.xml b/extensions/cep/artifacts/inputeventadaptors/DefaultWSO2EventInputAdaptor.xml
new file mode 100644
index 0000000..8cc5e89
--- /dev/null
+++ b/extensions/cep/artifacts/inputeventadaptors/DefaultWSO2EventInputAdaptor.xml
@@ -0,0 +1,3 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<inputEventAdaptor name="DefaultWSO2EventInputAdaptor"
+ statistics="disable" trace="enable" type="wso2event" xmlns="http://wso2.org/carbon/eventadaptormanager"/>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/extensions/cep/artifacts/inputeventadaptors/InstanceStatsInputAdaptor.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/artifacts/inputeventadaptors/InstanceStatsInputAdaptor.xml b/extensions/cep/artifacts/inputeventadaptors/InstanceStatsInputAdaptor.xml
new file mode 100644
index 0000000..a687546
--- /dev/null
+++ b/extensions/cep/artifacts/inputeventadaptors/InstanceStatsInputAdaptor.xml
@@ -0,0 +1,9 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<inputEventAdaptor name="InstanceStatsInputAdaptor" statistics="disable"
+ trace="disable" type="jms" xmlns="http://wso2.org/carbon/eventadaptormanager">
+ <property name="java.naming.provider.url">/home/ubuntu/packages/cep/wso2cep-3.0.0/repository/conf/jndi.properties</property>
+ <property name="transport.jms.SubscriptionDurable">false</property>
+ <property name="java.naming.factory.initial">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</property>
+ <property name="transport.jms.ConnectionFactoryJNDIName">topicConnectionfactory</property>
+ <property name="transport.jms.DestinationType">topic</property>
+</inputEventAdaptor>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/extensions/cep/artifacts/outputeventadaptors/DefaultWSO2EventOutputAdaptor.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/artifacts/outputeventadaptors/DefaultWSO2EventOutputAdaptor.xml b/extensions/cep/artifacts/outputeventadaptors/DefaultWSO2EventOutputAdaptor.xml
new file mode 100644
index 0000000..59ba20d
--- /dev/null
+++ b/extensions/cep/artifacts/outputeventadaptors/DefaultWSO2EventOutputAdaptor.xml
@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<outputEventAdaptor name="DefaultWSO2EventOutputAdaptor"
+ statistics="disable" trace="disable" type="wso2event" xmlns="http://wso2.org/carbon/eventadaptormanager">
+ <property name="username">admin</property>
+ <property name="receiverURL">tcp://localhost:7661</property>
+ <property name="password">admin</property>
+ <property name="authenticatorURL">ssl://localhost:7761</property>
+</outputEventAdaptor>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/extensions/cep/artifacts/outputeventadaptors/JMSOutputAdaptor.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/artifacts/outputeventadaptors/JMSOutputAdaptor.xml b/extensions/cep/artifacts/outputeventadaptors/JMSOutputAdaptor.xml
new file mode 100644
index 0000000..bb4d6d7
--- /dev/null
+++ b/extensions/cep/artifacts/outputeventadaptors/JMSOutputAdaptor.xml
@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<outputEventAdaptor name="JMSOutputAdaptor" statistics="disable"
+ trace="enable" type="jms" xmlns="http://wso2.org/carbon/eventadaptormanager">
+ <property name="java.naming.provider.url">/home/ubuntu/packages/cep/wso2cep-3.0.0/repository/conf/jndi.properties</property>
+ <property name="java.naming.factory.initial">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</property>
+ <property name="transport.jms.ConnectionFactoryJNDIName">topicConnectionfactory</property>
+ <property name="transport.jms.DestinationType">topic</property>
+</outputEventAdaptor>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/extensions/cep/stratos-cep-extension/pom.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/stratos-cep-extension/pom.xml b/extensions/cep/stratos-cep-extension/pom.xml
new file mode 100644
index 0000000..e765538
--- /dev/null
+++ b/extensions/cep/stratos-cep-extension/pom.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+ # Licensed to the Apache Software Foundation (ASF) under one
+ # or more contributor license agreements. See the NOTICE file
+ # distributed with this work for additional information
+ # regarding copyright ownership. The ASF licenses this file
+ # to you under the Apache License, Version 2.0 (the
+ # "License"); you may not use this file except in compliance
+ # with the License. You may obtain a copy of the License at
+ #
+ # http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing,
+ # software distributed under the License is distributed on an
+ # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ # KIND, either express or implied. See the License for the
+ # specific language governing permissions and limitations
+ # under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.stratos</groupId>
+ <artifactId>org.apache.stratos.cep.extension</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+
+ <repositories>
+ <repository>
+ <id>wso2-maven2-repository</id>
+ <name>WSO2 Maven2 Repository</name>
+ <url>http://dist.wso2.org/maven2</url>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.wso2.siddhi</groupId>
+ <artifactId>siddhi-core</artifactId>
+ <version>2.0.0-wso2v4</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java
----------------------------------------------------------------------
diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java
new file mode 100644
index 0000000..6ad1e87
--- /dev/null
+++ b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cep.extension;
+
+import org.apache.log4j.Logger;
+import org.wso2.siddhi.core.config.SiddhiContext;
+import org.wso2.siddhi.core.event.StreamEvent;
+import org.wso2.siddhi.core.event.in.InEvent;
+import org.wso2.siddhi.core.event.in.InListEvent;
+import org.wso2.siddhi.core.event.remove.RemoveEvent;
+import org.wso2.siddhi.core.event.remove.RemoveListEvent;
+import org.wso2.siddhi.core.persistence.ThreadBarrier;
+import org.wso2.siddhi.core.query.QueryPostProcessingElement;
+import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor;
+import org.wso2.siddhi.core.query.processor.window.WindowProcessor;
+import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue;
+import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue;
+import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.definition.Attribute.Type;
+import org.wso2.siddhi.query.api.expression.Expression;
+import org.wso2.siddhi.query.api.expression.Variable;
+import org.wso2.siddhi.query.api.expression.constant.IntConstant;
+import org.wso2.siddhi.query.api.expression.constant.LongConstant;
+import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@SiddhiExtension(namespace = "stratos", function = "gradient")
+public class GradientFinderWindowProcessor extends WindowProcessor implements RunnableWindowProcessor {
+
+ static final Logger log = Logger.getLogger(GradientFinderWindowProcessor.class);
+ private ScheduledExecutorService eventRemoverScheduler;
+ private long timeToKeep;
+ private int subjectedAttrIndex;
+ private Attribute.Type subjectedAttrType;
+ private List<InEvent> newEventList;
+ private List<RemoveEvent> oldEventList;
+ private ThreadBarrier threadBarrier;
+ private ISchedulerSiddhiQueue<StreamEvent> window;
+
+ @Override
+ protected void processEvent(InEvent event) {
+ acquireLock();
+ try {
+ newEventList.add(event);
+ } finally {
+ releaseLock();
+ }
+ }
+
+ @Override
+ protected void processEvent(InListEvent listEvent) {
+ acquireLock();
+ try {
+ System.out.println(listEvent);
+ for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
+ newEventList.add((InEvent) listEvent.getEvent(i));
+ }
+ } finally {
+ releaseLock();
+ }
+ }
+
+ @Override
+ public Iterator<StreamEvent> iterator() {
+ return window.iterator();
+ }
+
+ @Override
+ public Iterator<StreamEvent> iterator(String predicate) {
+ if (siddhiContext.isDistributedProcessingEnabled()) {
+ return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate);
+ } else {
+ return window.iterator();
+ }
+ }
+
+
+ @Override
+ public void run() {
+ acquireLock();
+ try {
+ long scheduledTime = System.currentTimeMillis();
+ try {
+ oldEventList.clear();
+ while (true) {
+ threadBarrier.pass();
+ RemoveEvent removeEvent = (RemoveEvent) window.poll();
+ if (removeEvent == null) {
+ if (oldEventList.size() > 0) {
+ nextProcessor.process(new RemoveListEvent(
+ oldEventList.toArray(new RemoveEvent[oldEventList.size()])));
+ oldEventList.clear();
+ }
+
+ if (newEventList.size() > 0) {
+ InEvent[] inEvents =
+ newEventList.toArray(new InEvent[newEventList.size()]);
+ for (InEvent inEvent : inEvents) {
+ window.put(new RemoveEvent(inEvent, -1));
+ }
+
+ InEvent[] gradientEvents = gradient(inEvents[0], inEvents[newEventList.size() - 1]);
+
+ for (InEvent inEvent : gradientEvents) {
+ window.put(new RemoveEvent(inEvent, -1));
+ }
+ nextProcessor.process(new InListEvent(gradientEvents));
+
+ newEventList.clear();
+ }
+
+ long diff = timeToKeep - (System.currentTimeMillis() - scheduledTime);
+ if (diff > 0) {
+ try {
+ eventRemoverScheduler.schedule(this, diff, TimeUnit.MILLISECONDS);
+ } catch (RejectedExecutionException ex) {
+ log.warn("scheduling cannot be accepted for execution: elementID " +
+ elementId);
+ }
+ break;
+ }
+ scheduledTime = System.currentTimeMillis();
+ } else {
+ oldEventList.add(new RemoveEvent(removeEvent, System.currentTimeMillis()));
+ }
+ }
+ } catch (Throwable t) {
+ log.error(t.getMessage(), t);
+ }
+ } finally {
+ releaseLock();
+ }
+ }
+
+
+ /**
+ * This function will calculate the linear gradient (per second) of the events received during
+ * a specified time period.
+ */
+ private InEvent[] gradient(InEvent firstInEvent, InEvent lastInEvent) {
+ double firstVal = 0.0, lastVal = 0.0;
+ // FIXME I'm not sure whether there's some other good way to do correct casting,
+ // based on the type.
+ if (Type.DOUBLE.equals(subjectedAttrType)) {
+ firstVal = (Double) firstInEvent.getData()[subjectedAttrIndex];
+ lastVal = (Double) lastInEvent.getData()[subjectedAttrIndex];
+ } else if (Type.INT.equals(subjectedAttrType)) {
+ firstVal = (Integer) firstInEvent.getData()[subjectedAttrIndex];
+ lastVal = (Integer) lastInEvent.getData()[subjectedAttrIndex];
+ } else if (Type.LONG.equals(subjectedAttrType)) {
+ firstVal = (Long) firstInEvent.getData()[subjectedAttrIndex];
+ lastVal = (Long) lastInEvent.getData()[subjectedAttrIndex];
+ } else if (Type.FLOAT.equals(subjectedAttrType)) {
+ firstVal = (Float) firstInEvent.getData()[subjectedAttrIndex];
+ lastVal = (Float) lastInEvent.getData()[subjectedAttrIndex];
+ }
+
+ long t1 = firstInEvent.getTimeStamp();
+ long t2 = lastInEvent.getTimeStamp();
+ long tGap = t2 - t1;
+ double gradient = 0.0;
+ if (tGap > 0) {
+ gradient = ((lastVal - firstVal) * 1000) / tGap;
+ }
+ log.debug("Gradient: " + gradient + " Last val: " + lastVal +
+ " First val: " + firstVal + " Time Gap: " + tGap );
+ Object[] data = firstInEvent.getData().clone();
+ data[subjectedAttrIndex] = gradient;
+ InEvent gradientEvent =
+ new InEvent(firstInEvent.getStreamId(), (t1+t2)/2,
+ data);
+ InEvent[] output = new InEvent[1];
+ output[0] = gradientEvent;
+ return output;
+ }
+
+ @Override
+ protected Object[] currentState() {
+ return new Object[]{window.currentState(), oldEventList, newEventList};
+ }
+
+ @Override
+ protected void restoreState(Object[] data) {
+ window.restoreState(data);
+ window.restoreState((Object[]) data[0]);
+ oldEventList = ((ArrayList<RemoveEvent>) data[1]);
+ newEventList = ((ArrayList<InEvent>) data[2]);
+ window.reSchedule();
+ }
+
+ @Override
+ protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {
+ if (parameters[0] instanceof IntConstant) {
+ timeToKeep = ((IntConstant) parameters[0]).getValue();
+ } else {
+ timeToKeep = ((LongConstant) parameters[0]).getValue();
+ }
+
+ String subjectedAttr = ((Variable)parameters[1]).getAttributeName();
+ subjectedAttrIndex = streamDefinition.getAttributePosition(subjectedAttr);
+ subjectedAttrType = streamDefinition.getAttributeType(subjectedAttr);
+
+ oldEventList = new ArrayList<RemoveEvent>();
+ if (this.siddhiContext.isDistributedProcessingEnabled()) {
+ newEventList = this.siddhiContext.getHazelcastInstance().getList(elementId + "-newEventList");
+ } else {
+ newEventList = new ArrayList<InEvent>();
+ }
+
+ if (this.siddhiContext.isDistributedProcessingEnabled()) {
+ window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async);
+ } else {
+ window = new SchedulerSiddhiQueue<StreamEvent>(this);
+ }
+ //Ordinary scheduling
+ window.schedule();
+
+ }
+
+ @Override
+ public void schedule() {
+ eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
+ }
+
+ public void scheduleNow() {
+ eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
+ this.eventRemoverScheduler = scheduledExecutorService;
+ }
+
+ public void setThreadBarrier(ThreadBarrier threadBarrier) {
+ this.threadBarrier = threadBarrier;
+ }
+
+ @Override
+ public void destroy(){
+ oldEventList = null;
+ newEventList = null;
+ window = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java
----------------------------------------------------------------------
diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java
new file mode 100644
index 0000000..4fb7018
--- /dev/null
+++ b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cep.extension;
+
+import org.apache.log4j.Logger;
+import org.wso2.siddhi.core.config.SiddhiContext;
+import org.wso2.siddhi.core.event.StreamEvent;
+import org.wso2.siddhi.core.event.in.InEvent;
+import org.wso2.siddhi.core.event.in.InListEvent;
+import org.wso2.siddhi.core.event.remove.RemoveEvent;
+import org.wso2.siddhi.core.event.remove.RemoveListEvent;
+import org.wso2.siddhi.core.persistence.ThreadBarrier;
+import org.wso2.siddhi.core.query.QueryPostProcessingElement;
+import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor;
+import org.wso2.siddhi.core.query.processor.window.WindowProcessor;
+import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue;
+import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue;
+import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.definition.Attribute.Type;
+import org.wso2.siddhi.query.api.expression.Expression;
+import org.wso2.siddhi.query.api.expression.Variable;
+import org.wso2.siddhi.query.api.expression.constant.IntConstant;
+import org.wso2.siddhi.query.api.expression.constant.LongConstant;
+import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@SiddhiExtension(namespace = "stratos", function = "secondDerivative")
+public class SecondDerivativeFinderWindowProcessor extends WindowProcessor implements RunnableWindowProcessor {
+
+ static final Logger log = Logger.getLogger(SecondDerivativeFinderWindowProcessor.class);
+ private ScheduledExecutorService eventRemoverScheduler;
+ private long timeToKeep;
+ private int subjectedAttrIndex;
+ private Attribute.Type subjectedAttrType;
+ private List<InEvent> newEventList;
+ private List<RemoveEvent> oldEventList;
+ private ThreadBarrier threadBarrier;
+ private ISchedulerSiddhiQueue<StreamEvent> window;
+
+ @Override
+ protected void processEvent(InEvent event) {
+ acquireLock();
+ try {
+ newEventList.add(event);
+ } finally {
+ releaseLock();
+ }
+ }
+
+ @Override
+ protected void processEvent(InListEvent listEvent) {
+ acquireLock();
+ try {
+ System.out.println(listEvent);
+ for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
+ newEventList.add((InEvent) listEvent.getEvent(i));
+ }
+ } finally {
+ releaseLock();
+ }
+ }
+
+ @Override
+ public Iterator<StreamEvent> iterator() {
+ return window.iterator();
+ }
+
+ @Override
+ public Iterator<StreamEvent> iterator(String predicate) {
+ if (siddhiContext.isDistributedProcessingEnabled()) {
+ return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate);
+ } else {
+ return window.iterator();
+ }
+ }
+
+
+ @Override
+ public void run() {
+ acquireLock();
+ try {
+ long scheduledTime = System.currentTimeMillis();
+ try {
+ oldEventList.clear();
+ while (true) {
+ threadBarrier.pass();
+ RemoveEvent removeEvent = (RemoveEvent) window.poll();
+ if (removeEvent == null) {
+ if (oldEventList.size() > 0) {
+ nextProcessor.process(new RemoveListEvent(
+ oldEventList.toArray(new RemoveEvent[oldEventList.size()])));
+ oldEventList.clear();
+ }
+
+ if (newEventList.size() > 0) {
+ InEvent[] inEvents =
+ newEventList.toArray(new InEvent[newEventList.size()]);
+ for (InEvent inEvent : inEvents) {
+ window.put(new RemoveEvent(inEvent, -1));
+ }
+
+ // in order to find second derivative, we need at least 3 events.
+ if (newEventList.size() > 2) {
+
+ InEvent firstDerivative1 =
+ gradient(inEvents[0],
+ inEvents[(newEventList.size() / 2) - 1],
+ null)[0];
+ InEvent firstDerivative2 =
+ gradient(inEvents[newEventList.size() / 2],
+ inEvents[newEventList.size() - 1],
+ null)[0];
+ InEvent[] secondDerivative =
+ gradient(firstDerivative1,
+ firstDerivative2, Type.DOUBLE);
+
+ for (InEvent inEvent : secondDerivative) {
+ window.put(new RemoveEvent(inEvent, -1));
+ }
+ nextProcessor.process(new InListEvent(secondDerivative));
+ } else {
+ log.debug("Insufficient events to calculate second derivative. We need at least 3 events. Current event count: " +
+ newEventList.size());
+ }
+
+ newEventList.clear();
+ }
+
+ long diff = timeToKeep - (System.currentTimeMillis() - scheduledTime);
+ if (diff > 0) {
+ try {
+ eventRemoverScheduler.schedule(this, diff, TimeUnit.MILLISECONDS);
+ } catch (RejectedExecutionException ex) {
+ log.warn("scheduling cannot be accepted for execution: elementID " +
+ elementId);
+ }
+ break;
+ }
+ scheduledTime = System.currentTimeMillis();
+ } else {
+ oldEventList.add(new RemoveEvent(removeEvent, System.currentTimeMillis()));
+ }
+ }
+ } catch (Throwable t) {
+ log.error(t.getMessage(), t);
+ }
+ } finally {
+ releaseLock();
+ }
+ }
+
+
+ /**
+ * This function will calculate the linear gradient (per second) of the events received during
+ * a specified time period.
+ */
+ private InEvent[] gradient(InEvent firstInEvent, InEvent lastInEvent, Type type) {
+ Type attrType = type == null ? subjectedAttrType : type;
+ double firstVal = 0.0, lastVal = 0.0;
+ // FIXME I'm not sure whether there's some other good way to do correct casting,
+ // based on the type.
+ if (Type.DOUBLE.equals(attrType)) {
+ firstVal = (Double) firstInEvent.getData()[subjectedAttrIndex];
+ lastVal = (Double) lastInEvent.getData()[subjectedAttrIndex];
+ } else if (Type.INT.equals(attrType)) {
+ firstVal = (Integer) firstInEvent.getData()[subjectedAttrIndex];
+ lastVal = (Integer) lastInEvent.getData()[subjectedAttrIndex];
+ } else if (Type.LONG.equals(attrType)) {
+ firstVal = (Long) firstInEvent.getData()[subjectedAttrIndex];
+ lastVal = (Long) lastInEvent.getData()[subjectedAttrIndex];
+ } else if (Type.FLOAT.equals(attrType)) {
+ firstVal = (Float) firstInEvent.getData()[subjectedAttrIndex];
+ lastVal = (Float) lastInEvent.getData()[subjectedAttrIndex];
+ }
+
+ long t1 = firstInEvent.getTimeStamp();
+ long t2 = lastInEvent.getTimeStamp();
+ long tGap = t2 - t1;
+ double gradient = 0.0;
+ if (tGap > 0) {
+ gradient = ((lastVal - firstVal) * 1000) / tGap;
+ }
+ log.debug("Gradient: " + gradient + " Last val: " + lastVal +
+ " First val: " + firstVal + " Time Gap: " + tGap );
+ Object[] data = firstInEvent.getData().clone();
+ data[subjectedAttrIndex] = gradient;
+ InEvent gradientEvent =
+ new InEvent(firstInEvent.getStreamId(), t1+((t2-t1)/2),
+ data);
+ InEvent[] output = new InEvent[1];
+ output[0] = gradientEvent;
+ return output;
+ }
+
+ @Override
+ protected Object[] currentState() {
+ return new Object[]{window.currentState(), oldEventList, newEventList};
+ }
+
+ @Override
+ protected void restoreState(Object[] data) {
+ window.restoreState(data);
+ window.restoreState((Object[]) data[0]);
+ oldEventList = ((ArrayList<RemoveEvent>) data[1]);
+ newEventList = ((ArrayList<InEvent>) data[2]);
+ window.reSchedule();
+ }
+
+ @Override
+ protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {
+ if (parameters[0] instanceof IntConstant) {
+ timeToKeep = ((IntConstant) parameters[0]).getValue();
+ } else {
+ timeToKeep = ((LongConstant) parameters[0]).getValue();
+ }
+
+ String subjectedAttr = ((Variable)parameters[1]).getAttributeName();
+ subjectedAttrIndex = streamDefinition.getAttributePosition(subjectedAttr);
+ subjectedAttrType = streamDefinition.getAttributeType(subjectedAttr);
+
+ oldEventList = new ArrayList<RemoveEvent>();
+ if (this.siddhiContext.isDistributedProcessingEnabled()) {
+ newEventList = this.siddhiContext.getHazelcastInstance().getList(elementId + "-newEventList");
+ } else {
+ newEventList = new ArrayList<InEvent>();
+ }
+
+ if (this.siddhiContext.isDistributedProcessingEnabled()) {
+ window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async);
+ } else {
+ window = new SchedulerSiddhiQueue<StreamEvent>(this);
+ }
+ //Ordinary scheduling
+ window.schedule();
+
+ }
+
+ @Override
+ public void schedule() {
+ eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
+ }
+
+ public void scheduleNow() {
+ eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
+ this.eventRemoverScheduler = scheduledExecutorService;
+ }
+
+ public void setThreadBarrier(ThreadBarrier threadBarrier) {
+ this.threadBarrier = threadBarrier;
+ }
+
+ @Override
+ public void destroy(){
+ oldEventList = null;
+ newEventList = null;
+ window = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/tools/cep_artifacts/eventbuilders/InstanceStatisticsEventBuilder.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/eventbuilders/InstanceStatisticsEventBuilder.xml b/tools/cep_artifacts/eventbuilders/InstanceStatisticsEventBuilder.xml
deleted file mode 100644
index 4b83f73..0000000
--- a/tools/cep_artifacts/eventbuilders/InstanceStatisticsEventBuilder.xml
+++ /dev/null
@@ -1,22 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<eventBuilder name="InstanceStatisticsEventBuilder" statistics="disable"
- trace="disable" xmlns="http://wso2.org/carbon/eventbuilder">
- <from eventAdaptorName="InstanceStatsInputAdaptor" eventAdaptorType="jms">
- <property name="transport.jms.Destination">instance-stats</property>
- </from>
- <mapping customMapping="disable" type="json">
- <property>
- <from jsonPath="$.instance.memberId"/>
- <to name="memberId" type="string"/>
- </property>
- <property>
- <from jsonPath="$.instance.clusterId"/>
- <to name="clusterId" type="string"/>
- </property>
- <property>
- <from jsonPath="$.instance.loadAverage"/>
- <to name="loadAverage" type="double"/>
- </property>
- </mapping>
- <to streamName="stratos.instance.stats" version="1.0.0"/>
-</eventBuilder>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/tools/cep_artifacts/eventbuilders/LoadBalancerStatisticsEventBuilder.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/eventbuilders/LoadBalancerStatisticsEventBuilder.xml b/tools/cep_artifacts/eventbuilders/LoadBalancerStatisticsEventBuilder.xml
deleted file mode 100644
index bef34f9..0000000
--- a/tools/cep_artifacts/eventbuilders/LoadBalancerStatisticsEventBuilder.xml
+++ /dev/null
@@ -1,10 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<eventBuilder name="LoadBalancerStatisticsEventBuilder"
- statistics="disable" trace="enable" xmlns="http://wso2.org/carbon/eventbuilder">
- <from eventAdaptorName="DefaultWSO2EventInputAdaptor" eventAdaptorType="wso2event">
- <property name="stream">stratos.lb.stats</property>
- <property name="version">1.0.0</property>
- </from>
- <mapping customMapping="disable" type="wso2event"/>
- <to streamName="stratos.lb.stats" version="1.0.0"/>
-</eventBuilder>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/tools/cep_artifacts/eventbuilders/stratos.instance.stats_1.0.0_builder.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/eventbuilders/stratos.instance.stats_1.0.0_builder.xml b/tools/cep_artifacts/eventbuilders/stratos.instance.stats_1.0.0_builder.xml
deleted file mode 100644
index ed6b588..0000000
--- a/tools/cep_artifacts/eventbuilders/stratos.instance.stats_1.0.0_builder.xml
+++ /dev/null
@@ -1,10 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<eventBuilder name="stratos.instance.stats_1.0.0_builder"
- statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventbuilder">
- <from eventAdaptorName="DefaultWSO2EventInputAdaptor" eventAdaptorType="wso2event">
- <property name="stream">stratos.instance.stats</property>
- <property name="version">1.0.0</property>
- </from>
- <mapping customMapping="disable" type="wso2event"/>
- <to streamName="stratos.instance.stats" version="1.0.0"/>
-</eventBuilder>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/tools/cep_artifacts/eventformatters/AverageRequestsInflightEventFormatter.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/eventformatters/AverageRequestsInflightEventFormatter.xml b/tools/cep_artifacts/eventformatters/AverageRequestsInflightEventFormatter.xml
deleted file mode 100644
index 19a4f30..0000000
--- a/tools/cep_artifacts/eventformatters/AverageRequestsInflightEventFormatter.xml
+++ /dev/null
@@ -1,11 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<eventFormatter name="AverageRequestsInflightEventFormatter"
- statistics="disable" trace="enable" xmlns="http://wso2.org/carbon/eventformatter">
- <from streamName="aggregated_requests_stats" version="1.0.0"/>
- <mapping customMapping="enable" type="json">
- <inline>{"average_requests_in_flight":{"cluster_id":"{{cluster_id}}","value":"{{count}}"}}</inline>
- </mapping>
- <to eventAdaptorName="JMSOutputAdaptor" eventAdaptorType="jms">
- <property name="transport.jms.Destination">summarized-health-stats</property>
- </to>
-</eventFormatter>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/tools/cep_artifacts/eventformatters/GradientEventFormatter.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/eventformatters/GradientEventFormatter.xml b/tools/cep_artifacts/eventformatters/GradientEventFormatter.xml
deleted file mode 100644
index e3d9e82..0000000
--- a/tools/cep_artifacts/eventformatters/GradientEventFormatter.xml
+++ /dev/null
@@ -1,11 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<eventFormatter name="GradientEventFormatter" statistics="disable"
- trace="disable" xmlns="http://wso2.org/carbon/eventformatter">
- <from streamName="gradient_stats" version="1.0.0"/>
- <mapping customMapping="enable" type="json">
- <inline>{"gradient_of_requests_in_flight":{"cluster_id":"{{cluster_id}}","value":"{{count}}"}}</inline>
- </mapping>
- <to eventAdaptorName="JMSOutputAdaptor" eventAdaptorType="jms">
- <property name="transport.jms.Destination">summarized-health-stats</property>
- </to>
-</eventFormatter>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/tools/cep_artifacts/eventformatters/SecondDerivativeEventFormatter.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/eventformatters/SecondDerivativeEventFormatter.xml b/tools/cep_artifacts/eventformatters/SecondDerivativeEventFormatter.xml
deleted file mode 100644
index ddb1cc4..0000000
--- a/tools/cep_artifacts/eventformatters/SecondDerivativeEventFormatter.xml
+++ /dev/null
@@ -1,11 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<eventFormatter name="SecondDerivativeEventFormatter"
- statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventformatter">
- <from streamName="second_derivative_of_requests_stats" version="1.0.0"/>
- <mapping customMapping="enable" type="json">
- <inline>{"second_derivative_of_requests_in_flight":{"cluster_id":"{{cluster_id}}","value":"{{count}}"}}</inline>
- </mapping>
- <to eventAdaptorName="JMSOutputAdaptor" eventAdaptorType="jms">
- <property name="transport.jms.Destination">summarized-health-stats</property>
- </to>
-</eventFormatter>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/tools/cep_artifacts/executionplans/AverageRequestsInflightFinder.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/executionplans/AverageRequestsInflightFinder.xml b/tools/cep_artifacts/executionplans/AverageRequestsInflightFinder.xml
deleted file mode 100644
index d81d859..0000000
--- a/tools/cep_artifacts/executionplans/AverageRequestsInflightFinder.xml
+++ /dev/null
@@ -1,19 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<executionPlan name="AverageRequestsInflightFinder" statistics="disable"
- trace="enable" xmlns="http://wso2.org/carbon/eventprocessor">
- <description>This will average the number of requests in flight over a minute.</description>
- <siddhiConfiguration>
- <property name="siddhi.enable.distributed.processing">false</property>
- <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
- </siddhiConfiguration>
- <importedStreams>
- <stream as="lbStats1" name="stratos.lb.stats" version="1.0.0"/>
- </importedStreams>
- <queryExpressions><![CDATA[define partition lbStats1Partition by cluster_id;
-from lbStats1#window.timeBatch(1 min)
-select cluster_id, avg(in_flight_requests) as count insert into aggregated_requests_stats partition by lbStats1Partition;]]></queryExpressions>
- <exportedStreams>
- <stream name="aggregated_requests_stats"
- valueOf="aggregated_requests_stats" version="1.0.0"/>
- </exportedStreams>
-</executionPlan>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/tools/cep_artifacts/executionplans/GradientOfRequestsInFlightFinder.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/executionplans/GradientOfRequestsInFlightFinder.xml b/tools/cep_artifacts/executionplans/GradientOfRequestsInFlightFinder.xml
deleted file mode 100644
index 7775de0..0000000
--- a/tools/cep_artifacts/executionplans/GradientOfRequestsInFlightFinder.xml
+++ /dev/null
@@ -1,18 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<executionPlan name="GradientOfRequestsInFlightFinder"
- statistics="disable" trace="enable" xmlns="http://wso2.org/carbon/eventprocessor">
- <description>This will find the gradient of the number of requests in flight over a minute.</description>
- <siddhiConfiguration>
- <property name="siddhi.enable.distributed.processing">false</property>
- <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
- </siddhiConfiguration>
- <importedStreams>
- <stream as="lbStats2" name="stratos.lb.stats" version="1.0.0"/>
- </importedStreams>
- <queryExpressions><![CDATA[define partition lbStats1Partition by cluster_id;
-from lbStats2#window.stratos:gradient(1 min, in_flight_requests)
-select cluster_id, in_flight_requests as count insert into gradient_stats partition by lbStats1Partition;]]></queryExpressions>
- <exportedStreams>
- <stream name="gradient_stats" valueOf="gradient_stats" version="1.0.0"/>
- </exportedStreams>
-</executionPlan>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/tools/cep_artifacts/executionplans/SecondDerivativeOfRequestsInFlightFinder.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/executionplans/SecondDerivativeOfRequestsInFlightFinder.xml b/tools/cep_artifacts/executionplans/SecondDerivativeOfRequestsInFlightFinder.xml
deleted file mode 100644
index af2bd0a..0000000
--- a/tools/cep_artifacts/executionplans/SecondDerivativeOfRequestsInFlightFinder.xml
+++ /dev/null
@@ -1,19 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<executionPlan name="SecondDerivativeOfRequestsInFlightFinder"
- statistics="disable" trace="enable" xmlns="http://wso2.org/carbon/eventprocessor">
- <description>This will find the second derivative of the number of requests in flight over a minute.</description>
- <siddhiConfiguration>
- <property name="siddhi.enable.distributed.processing">false</property>
- <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
- </siddhiConfiguration>
- <importedStreams>
- <stream as="lbStats3" name="stratos.lb.stats" version="1.0.0"/>
- </importedStreams>
- <queryExpressions><![CDATA[define partition lbStats1Partition by cluster_id;
-from lbStats3#window.stratos:secondDerivative(1 min, in_flight_requests)
-select cluster_id, in_flight_requests as count insert into second_derivative_of_requests_stats partition by lbStats1Partition;]]></queryExpressions>
- <exportedStreams>
- <stream name="second_derivative_of_requests_stats"
- valueOf="second_derivative_of_requests_stats" version="1.0.0"/>
- </exportedStreams>
-</executionPlan>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/tools/cep_artifacts/inputeventadaptors/DefaultWSO2EventInputAdaptor.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/inputeventadaptors/DefaultWSO2EventInputAdaptor.xml b/tools/cep_artifacts/inputeventadaptors/DefaultWSO2EventInputAdaptor.xml
deleted file mode 100644
index 8cc5e89..0000000
--- a/tools/cep_artifacts/inputeventadaptors/DefaultWSO2EventInputAdaptor.xml
+++ /dev/null
@@ -1,3 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<inputEventAdaptor name="DefaultWSO2EventInputAdaptor"
- statistics="disable" trace="enable" type="wso2event" xmlns="http://wso2.org/carbon/eventadaptormanager"/>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/tools/cep_artifacts/inputeventadaptors/InstanceStatsInputAdaptor.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/inputeventadaptors/InstanceStatsInputAdaptor.xml b/tools/cep_artifacts/inputeventadaptors/InstanceStatsInputAdaptor.xml
deleted file mode 100644
index a687546..0000000
--- a/tools/cep_artifacts/inputeventadaptors/InstanceStatsInputAdaptor.xml
+++ /dev/null
@@ -1,9 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<inputEventAdaptor name="InstanceStatsInputAdaptor" statistics="disable"
- trace="disable" type="jms" xmlns="http://wso2.org/carbon/eventadaptormanager">
- <property name="java.naming.provider.url">/home/ubuntu/packages/cep/wso2cep-3.0.0/repository/conf/jndi.properties</property>
- <property name="transport.jms.SubscriptionDurable">false</property>
- <property name="java.naming.factory.initial">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</property>
- <property name="transport.jms.ConnectionFactoryJNDIName">topicConnectionfactory</property>
- <property name="transport.jms.DestinationType">topic</property>
-</inputEventAdaptor>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/tools/cep_artifacts/outputeventadaptors/DefaultWSO2EventOutputAdaptor.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/outputeventadaptors/DefaultWSO2EventOutputAdaptor.xml b/tools/cep_artifacts/outputeventadaptors/DefaultWSO2EventOutputAdaptor.xml
deleted file mode 100644
index 59ba20d..0000000
--- a/tools/cep_artifacts/outputeventadaptors/DefaultWSO2EventOutputAdaptor.xml
+++ /dev/null
@@ -1,8 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<outputEventAdaptor name="DefaultWSO2EventOutputAdaptor"
- statistics="disable" trace="disable" type="wso2event" xmlns="http://wso2.org/carbon/eventadaptormanager">
- <property name="username">admin</property>
- <property name="receiverURL">tcp://localhost:7661</property>
- <property name="password">admin</property>
- <property name="authenticatorURL">ssl://localhost:7761</property>
-</outputEventAdaptor>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/tools/cep_artifacts/outputeventadaptors/JMSOutputAdaptor.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/outputeventadaptors/JMSOutputAdaptor.xml b/tools/cep_artifacts/outputeventadaptors/JMSOutputAdaptor.xml
deleted file mode 100644
index bb4d6d7..0000000
--- a/tools/cep_artifacts/outputeventadaptors/JMSOutputAdaptor.xml
+++ /dev/null
@@ -1,8 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<outputEventAdaptor name="JMSOutputAdaptor" statistics="disable"
- trace="enable" type="jms" xmlns="http://wso2.org/carbon/eventadaptormanager">
- <property name="java.naming.provider.url">/home/ubuntu/packages/cep/wso2cep-3.0.0/repository/conf/jndi.properties</property>
- <property name="java.naming.factory.initial">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</property>
- <property name="transport.jms.ConnectionFactoryJNDIName">topicConnectionfactory</property>
- <property name="transport.jms.DestinationType">topic</property>
-</outputEventAdaptor>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/tools/cep_artifacts/stratos-cep-extensions/pom.xml
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/stratos-cep-extensions/pom.xml b/tools/cep_artifacts/stratos-cep-extensions/pom.xml
deleted file mode 100644
index e765538..0000000
--- a/tools/cep_artifacts/stratos-cep-extensions/pom.xml
+++ /dev/null
@@ -1,57 +0,0 @@
-<?xml version="1.0" encoding="utf-8"?>
-<!--
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>org.apache.stratos</groupId>
- <artifactId>org.apache.stratos.cep.extension</artifactId>
- <version>1.0.0-SNAPSHOT</version>
-
- <repositories>
- <repository>
- <id>wso2-maven2-repository</id>
- <name>WSO2 Maven2 Repository</name>
- <url>http://dist.wso2.org/maven2</url>
- </repository>
- </repositories>
-
- <dependencies>
- <dependency>
- <groupId>org.wso2.siddhi</groupId>
- <artifactId>siddhi-core</artifactId>
- <version>2.0.0-wso2v4</version>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>1.6</source>
- <target>1.6</target>
- </configuration>
- </plugin>
- </plugins>
-
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/tools/cep_artifacts/stratos-cep-extensions/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/stratos-cep-extensions/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java b/tools/cep_artifacts/stratos-cep-extensions/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java
deleted file mode 100644
index 6ad1e87..0000000
--- a/tools/cep_artifacts/stratos-cep-extensions/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.cep.extension;
-
-import org.apache.log4j.Logger;
-import org.wso2.siddhi.core.config.SiddhiContext;
-import org.wso2.siddhi.core.event.StreamEvent;
-import org.wso2.siddhi.core.event.in.InEvent;
-import org.wso2.siddhi.core.event.in.InListEvent;
-import org.wso2.siddhi.core.event.remove.RemoveEvent;
-import org.wso2.siddhi.core.event.remove.RemoveListEvent;
-import org.wso2.siddhi.core.persistence.ThreadBarrier;
-import org.wso2.siddhi.core.query.QueryPostProcessingElement;
-import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor;
-import org.wso2.siddhi.core.query.processor.window.WindowProcessor;
-import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue;
-import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue;
-import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid;
-import org.wso2.siddhi.query.api.definition.AbstractDefinition;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.definition.Attribute.Type;
-import org.wso2.siddhi.query.api.expression.Expression;
-import org.wso2.siddhi.query.api.expression.Variable;
-import org.wso2.siddhi.query.api.expression.constant.IntConstant;
-import org.wso2.siddhi.query.api.expression.constant.LongConstant;
-import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-@SiddhiExtension(namespace = "stratos", function = "gradient")
-public class GradientFinderWindowProcessor extends WindowProcessor implements RunnableWindowProcessor {
-
- static final Logger log = Logger.getLogger(GradientFinderWindowProcessor.class);
- private ScheduledExecutorService eventRemoverScheduler;
- private long timeToKeep;
- private int subjectedAttrIndex;
- private Attribute.Type subjectedAttrType;
- private List<InEvent> newEventList;
- private List<RemoveEvent> oldEventList;
- private ThreadBarrier threadBarrier;
- private ISchedulerSiddhiQueue<StreamEvent> window;
-
- @Override
- protected void processEvent(InEvent event) {
- acquireLock();
- try {
- newEventList.add(event);
- } finally {
- releaseLock();
- }
- }
-
- @Override
- protected void processEvent(InListEvent listEvent) {
- acquireLock();
- try {
- System.out.println(listEvent);
- for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
- newEventList.add((InEvent) listEvent.getEvent(i));
- }
- } finally {
- releaseLock();
- }
- }
-
- @Override
- public Iterator<StreamEvent> iterator() {
- return window.iterator();
- }
-
- @Override
- public Iterator<StreamEvent> iterator(String predicate) {
- if (siddhiContext.isDistributedProcessingEnabled()) {
- return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate);
- } else {
- return window.iterator();
- }
- }
-
-
- @Override
- public void run() {
- acquireLock();
- try {
- long scheduledTime = System.currentTimeMillis();
- try {
- oldEventList.clear();
- while (true) {
- threadBarrier.pass();
- RemoveEvent removeEvent = (RemoveEvent) window.poll();
- if (removeEvent == null) {
- if (oldEventList.size() > 0) {
- nextProcessor.process(new RemoveListEvent(
- oldEventList.toArray(new RemoveEvent[oldEventList.size()])));
- oldEventList.clear();
- }
-
- if (newEventList.size() > 0) {
- InEvent[] inEvents =
- newEventList.toArray(new InEvent[newEventList.size()]);
- for (InEvent inEvent : inEvents) {
- window.put(new RemoveEvent(inEvent, -1));
- }
-
- InEvent[] gradientEvents = gradient(inEvents[0], inEvents[newEventList.size() - 1]);
-
- for (InEvent inEvent : gradientEvents) {
- window.put(new RemoveEvent(inEvent, -1));
- }
- nextProcessor.process(new InListEvent(gradientEvents));
-
- newEventList.clear();
- }
-
- long diff = timeToKeep - (System.currentTimeMillis() - scheduledTime);
- if (diff > 0) {
- try {
- eventRemoverScheduler.schedule(this, diff, TimeUnit.MILLISECONDS);
- } catch (RejectedExecutionException ex) {
- log.warn("scheduling cannot be accepted for execution: elementID " +
- elementId);
- }
- break;
- }
- scheduledTime = System.currentTimeMillis();
- } else {
- oldEventList.add(new RemoveEvent(removeEvent, System.currentTimeMillis()));
- }
- }
- } catch (Throwable t) {
- log.error(t.getMessage(), t);
- }
- } finally {
- releaseLock();
- }
- }
-
-
- /**
- * This function will calculate the linear gradient (per second) of the events received during
- * a specified time period.
- */
- private InEvent[] gradient(InEvent firstInEvent, InEvent lastInEvent) {
- double firstVal = 0.0, lastVal = 0.0;
- // FIXME I'm not sure whether there's some other good way to do correct casting,
- // based on the type.
- if (Type.DOUBLE.equals(subjectedAttrType)) {
- firstVal = (Double) firstInEvent.getData()[subjectedAttrIndex];
- lastVal = (Double) lastInEvent.getData()[subjectedAttrIndex];
- } else if (Type.INT.equals(subjectedAttrType)) {
- firstVal = (Integer) firstInEvent.getData()[subjectedAttrIndex];
- lastVal = (Integer) lastInEvent.getData()[subjectedAttrIndex];
- } else if (Type.LONG.equals(subjectedAttrType)) {
- firstVal = (Long) firstInEvent.getData()[subjectedAttrIndex];
- lastVal = (Long) lastInEvent.getData()[subjectedAttrIndex];
- } else if (Type.FLOAT.equals(subjectedAttrType)) {
- firstVal = (Float) firstInEvent.getData()[subjectedAttrIndex];
- lastVal = (Float) lastInEvent.getData()[subjectedAttrIndex];
- }
-
- long t1 = firstInEvent.getTimeStamp();
- long t2 = lastInEvent.getTimeStamp();
- long tGap = t2 - t1;
- double gradient = 0.0;
- if (tGap > 0) {
- gradient = ((lastVal - firstVal) * 1000) / tGap;
- }
- log.debug("Gradient: " + gradient + " Last val: " + lastVal +
- " First val: " + firstVal + " Time Gap: " + tGap );
- Object[] data = firstInEvent.getData().clone();
- data[subjectedAttrIndex] = gradient;
- InEvent gradientEvent =
- new InEvent(firstInEvent.getStreamId(), (t1+t2)/2,
- data);
- InEvent[] output = new InEvent[1];
- output[0] = gradientEvent;
- return output;
- }
-
- @Override
- protected Object[] currentState() {
- return new Object[]{window.currentState(), oldEventList, newEventList};
- }
-
- @Override
- protected void restoreState(Object[] data) {
- window.restoreState(data);
- window.restoreState((Object[]) data[0]);
- oldEventList = ((ArrayList<RemoveEvent>) data[1]);
- newEventList = ((ArrayList<InEvent>) data[2]);
- window.reSchedule();
- }
-
- @Override
- protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {
- if (parameters[0] instanceof IntConstant) {
- timeToKeep = ((IntConstant) parameters[0]).getValue();
- } else {
- timeToKeep = ((LongConstant) parameters[0]).getValue();
- }
-
- String subjectedAttr = ((Variable)parameters[1]).getAttributeName();
- subjectedAttrIndex = streamDefinition.getAttributePosition(subjectedAttr);
- subjectedAttrType = streamDefinition.getAttributeType(subjectedAttr);
-
- oldEventList = new ArrayList<RemoveEvent>();
- if (this.siddhiContext.isDistributedProcessingEnabled()) {
- newEventList = this.siddhiContext.getHazelcastInstance().getList(elementId + "-newEventList");
- } else {
- newEventList = new ArrayList<InEvent>();
- }
-
- if (this.siddhiContext.isDistributedProcessingEnabled()) {
- window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async);
- } else {
- window = new SchedulerSiddhiQueue<StreamEvent>(this);
- }
- //Ordinary scheduling
- window.schedule();
-
- }
-
- @Override
- public void schedule() {
- eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
- }
-
- public void scheduleNow() {
- eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
- this.eventRemoverScheduler = scheduledExecutorService;
- }
-
- public void setThreadBarrier(ThreadBarrier threadBarrier) {
- this.threadBarrier = threadBarrier;
- }
-
- @Override
- public void destroy(){
- oldEventList = null;
- newEventList = null;
- window = null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/50e45d8b/tools/cep_artifacts/stratos-cep-extensions/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java
----------------------------------------------------------------------
diff --git a/tools/cep_artifacts/stratos-cep-extensions/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java b/tools/cep_artifacts/stratos-cep-extensions/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java
deleted file mode 100644
index 4fb7018..0000000
--- a/tools/cep_artifacts/stratos-cep-extensions/src/main/java/org/apache/stratos/cep/extension/SecondDerivativeFinderWindowProcessor.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.cep.extension;
-
-import org.apache.log4j.Logger;
-import org.wso2.siddhi.core.config.SiddhiContext;
-import org.wso2.siddhi.core.event.StreamEvent;
-import org.wso2.siddhi.core.event.in.InEvent;
-import org.wso2.siddhi.core.event.in.InListEvent;
-import org.wso2.siddhi.core.event.remove.RemoveEvent;
-import org.wso2.siddhi.core.event.remove.RemoveListEvent;
-import org.wso2.siddhi.core.persistence.ThreadBarrier;
-import org.wso2.siddhi.core.query.QueryPostProcessingElement;
-import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor;
-import org.wso2.siddhi.core.query.processor.window.WindowProcessor;
-import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue;
-import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue;
-import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid;
-import org.wso2.siddhi.query.api.definition.AbstractDefinition;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.definition.Attribute.Type;
-import org.wso2.siddhi.query.api.expression.Expression;
-import org.wso2.siddhi.query.api.expression.Variable;
-import org.wso2.siddhi.query.api.expression.constant.IntConstant;
-import org.wso2.siddhi.query.api.expression.constant.LongConstant;
-import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-@SiddhiExtension(namespace = "stratos", function = "secondDerivative")
-public class SecondDerivativeFinderWindowProcessor extends WindowProcessor implements RunnableWindowProcessor {
-
- static final Logger log = Logger.getLogger(SecondDerivativeFinderWindowProcessor.class);
- private ScheduledExecutorService eventRemoverScheduler;
- private long timeToKeep;
- private int subjectedAttrIndex;
- private Attribute.Type subjectedAttrType;
- private List<InEvent> newEventList;
- private List<RemoveEvent> oldEventList;
- private ThreadBarrier threadBarrier;
- private ISchedulerSiddhiQueue<StreamEvent> window;
-
- @Override
- protected void processEvent(InEvent event) {
- acquireLock();
- try {
- newEventList.add(event);
- } finally {
- releaseLock();
- }
- }
-
- @Override
- protected void processEvent(InListEvent listEvent) {
- acquireLock();
- try {
- System.out.println(listEvent);
- for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
- newEventList.add((InEvent) listEvent.getEvent(i));
- }
- } finally {
- releaseLock();
- }
- }
-
- @Override
- public Iterator<StreamEvent> iterator() {
- return window.iterator();
- }
-
- @Override
- public Iterator<StreamEvent> iterator(String predicate) {
- if (siddhiContext.isDistributedProcessingEnabled()) {
- return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate);
- } else {
- return window.iterator();
- }
- }
-
-
- @Override
- public void run() {
- acquireLock();
- try {
- long scheduledTime = System.currentTimeMillis();
- try {
- oldEventList.clear();
- while (true) {
- threadBarrier.pass();
- RemoveEvent removeEvent = (RemoveEvent) window.poll();
- if (removeEvent == null) {
- if (oldEventList.size() > 0) {
- nextProcessor.process(new RemoveListEvent(
- oldEventList.toArray(new RemoveEvent[oldEventList.size()])));
- oldEventList.clear();
- }
-
- if (newEventList.size() > 0) {
- InEvent[] inEvents =
- newEventList.toArray(new InEvent[newEventList.size()]);
- for (InEvent inEvent : inEvents) {
- window.put(new RemoveEvent(inEvent, -1));
- }
-
- // in order to find second derivative, we need at least 3 events.
- if (newEventList.size() > 2) {
-
- InEvent firstDerivative1 =
- gradient(inEvents[0],
- inEvents[(newEventList.size() / 2) - 1],
- null)[0];
- InEvent firstDerivative2 =
- gradient(inEvents[newEventList.size() / 2],
- inEvents[newEventList.size() - 1],
- null)[0];
- InEvent[] secondDerivative =
- gradient(firstDerivative1,
- firstDerivative2, Type.DOUBLE);
-
- for (InEvent inEvent : secondDerivative) {
- window.put(new RemoveEvent(inEvent, -1));
- }
- nextProcessor.process(new InListEvent(secondDerivative));
- } else {
- log.debug("Insufficient events to calculate second derivative. We need at least 3 events. Current event count: " +
- newEventList.size());
- }
-
- newEventList.clear();
- }
-
- long diff = timeToKeep - (System.currentTimeMillis() - scheduledTime);
- if (diff > 0) {
- try {
- eventRemoverScheduler.schedule(this, diff, TimeUnit.MILLISECONDS);
- } catch (RejectedExecutionException ex) {
- log.warn("scheduling cannot be accepted for execution: elementID " +
- elementId);
- }
- break;
- }
- scheduledTime = System.currentTimeMillis();
- } else {
- oldEventList.add(new RemoveEvent(removeEvent, System.currentTimeMillis()));
- }
- }
- } catch (Throwable t) {
- log.error(t.getMessage(), t);
- }
- } finally {
- releaseLock();
- }
- }
-
-
- /**
- * This function will calculate the linear gradient (per second) of the events received during
- * a specified time period.
- */
- private InEvent[] gradient(InEvent firstInEvent, InEvent lastInEvent, Type type) {
- Type attrType = type == null ? subjectedAttrType : type;
- double firstVal = 0.0, lastVal = 0.0;
- // FIXME I'm not sure whether there's some other good way to do correct casting,
- // based on the type.
- if (Type.DOUBLE.equals(attrType)) {
- firstVal = (Double) firstInEvent.getData()[subjectedAttrIndex];
- lastVal = (Double) lastInEvent.getData()[subjectedAttrIndex];
- } else if (Type.INT.equals(attrType)) {
- firstVal = (Integer) firstInEvent.getData()[subjectedAttrIndex];
- lastVal = (Integer) lastInEvent.getData()[subjectedAttrIndex];
- } else if (Type.LONG.equals(attrType)) {
- firstVal = (Long) firstInEvent.getData()[subjectedAttrIndex];
- lastVal = (Long) lastInEvent.getData()[subjectedAttrIndex];
- } else if (Type.FLOAT.equals(attrType)) {
- firstVal = (Float) firstInEvent.getData()[subjectedAttrIndex];
- lastVal = (Float) lastInEvent.getData()[subjectedAttrIndex];
- }
-
- long t1 = firstInEvent.getTimeStamp();
- long t2 = lastInEvent.getTimeStamp();
- long tGap = t2 - t1;
- double gradient = 0.0;
- if (tGap > 0) {
- gradient = ((lastVal - firstVal) * 1000) / tGap;
- }
- log.debug("Gradient: " + gradient + " Last val: " + lastVal +
- " First val: " + firstVal + " Time Gap: " + tGap );
- Object[] data = firstInEvent.getData().clone();
- data[subjectedAttrIndex] = gradient;
- InEvent gradientEvent =
- new InEvent(firstInEvent.getStreamId(), t1+((t2-t1)/2),
- data);
- InEvent[] output = new InEvent[1];
- output[0] = gradientEvent;
- return output;
- }
-
- @Override
- protected Object[] currentState() {
- return new Object[]{window.currentState(), oldEventList, newEventList};
- }
-
- @Override
- protected void restoreState(Object[] data) {
- window.restoreState(data);
- window.restoreState((Object[]) data[0]);
- oldEventList = ((ArrayList<RemoveEvent>) data[1]);
- newEventList = ((ArrayList<InEvent>) data[2]);
- window.reSchedule();
- }
-
- @Override
- protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {
- if (parameters[0] instanceof IntConstant) {
- timeToKeep = ((IntConstant) parameters[0]).getValue();
- } else {
- timeToKeep = ((LongConstant) parameters[0]).getValue();
- }
-
- String subjectedAttr = ((Variable)parameters[1]).getAttributeName();
- subjectedAttrIndex = streamDefinition.getAttributePosition(subjectedAttr);
- subjectedAttrType = streamDefinition.getAttributeType(subjectedAttr);
-
- oldEventList = new ArrayList<RemoveEvent>();
- if (this.siddhiContext.isDistributedProcessingEnabled()) {
- newEventList = this.siddhiContext.getHazelcastInstance().getList(elementId + "-newEventList");
- } else {
- newEventList = new ArrayList<InEvent>();
- }
-
- if (this.siddhiContext.isDistributedProcessingEnabled()) {
- window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async);
- } else {
- window = new SchedulerSiddhiQueue<StreamEvent>(this);
- }
- //Ordinary scheduling
- window.schedule();
-
- }
-
- @Override
- public void schedule() {
- eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
- }
-
- public void scheduleNow() {
- eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
- this.eventRemoverScheduler = scheduledExecutorService;
- }
-
- public void setThreadBarrier(ThreadBarrier threadBarrier) {
- this.threadBarrier = threadBarrier;
- }
-
- @Override
- public void destroy(){
- oldEventList = null;
- newEventList = null;
- window = null;
- }
-}