You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ra...@apache.org on 2015/08/30 20:17:35 UTC

[42/50] [abbrv] stratos git commit: Fixing pom modules structure. Make it consistent

http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/artifacts/executionplans/SecondDerivativeOfHealthRequest.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/artifacts/executionplans/SecondDerivativeOfHealthRequest.xml b/extensions/cep/modules/artifacts/executionplans/SecondDerivativeOfHealthRequest.xml
new file mode 100644
index 0000000..0fa4a0a
--- /dev/null
+++ b/extensions/cep/modules/artifacts/executionplans/SecondDerivativeOfHealthRequest.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<executionPlan name="SecondDerivativeOfHealthRequest"
+  statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventprocessor">
+  <description>This will find the second derivative of health stats over a minute.</description>
+  <siddhiConfiguration>
+    <property name="siddhi.enable.distributed.processing">false</property>
+    <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
+  </siddhiConfiguration>
+  <importedStreams>
+    <stream as="health_second_der_request" name="cartridge_agent_health_stats" version="1.0.0"/>
+  </importedStreams>
+  <queryExpressions><![CDATA[ 
+	 from health_second_der_request
+		select member_id, cluster_id, cluster_instance_id, network_partition_id, health_description, value,
+		stratos:concat(cluster_id, '-' ,cluster_instance_id) as health_second_der_cluster_network
+		insert into health_second_der_concat;
+	define partition health_second_der_cluster_partition  by health_second_der_concat.health_second_der_cluster_network;
+	from health_second_der_concat [health_description == 'memory_consumption'] 
+		#window.stratos:secondDerivative(1 min, value) select cluster_id, cluster_instance_id, network_partition_id, value as second_derivative_memory_consumption
+		insert into second_derivative_memory_consumption_stats 
+		partition by health_second_der_cluster_partition;
+	from health_second_der_concat [health_description == 'load_average'] 
+		#window.stratos:secondDerivative(1 min, value) select cluster_id, cluster_instance_id, network_partition_id, value as second_derivative_load_average
+		insert into second_derivative_load_average_stats 
+		partition by health_second_der_cluster_partition;
+	define partition health_second_der_member_partition by health_second_der_request.member_id;
+	from health_second_der_request [health_description == 'memory_consumption'] 
+		#window.stratos:secondDerivative(1 min, value) 
+		select member_id, cluster_id, cluster_instance_id, network_partition_id, value as member_second_derivative_memory_consumption
+		insert into member_second_derivative_memory_consumption_stats 
+		partition by health_second_der_member_partition;
+	from health_second_der_request [health_description == 'load_average'] 
+		#window.stratos:secondDerivative(1 min, value)
+		select member_id, cluster_id, cluster_instance_id, network_partition_id, value as member_second_derivative_load_average
+		 insert into member_second_derivative_load_average_stats 
+		partition by health_second_der_member_partition;]]></queryExpressions>
+  <exportedStreams>
+    <stream name="second_derivative_memory_consumption_stats"
+      valueOf="second_derivative_memory_consumption_stats" version="1.0.0"/>
+    <stream name="second_derivative_load_average_stats"
+      valueOf="second_derivative_load_average_stats" version="1.0.0"/>
+    <stream name="member_second_derivative_memory_consumption_stats"
+      valueOf="member_second_derivative_memory_consumption_stats" version="1.0.0"/>
+    <stream name="member_second_derivative_load_average_stats"
+      valueOf="member_second_derivative_load_average_stats" version="1.0.0"/>
+  </exportedStreams>
+</executionPlan>

http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/artifacts/executionplans/SecondDerivativeOfRequestsInFlightFinder.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/artifacts/executionplans/SecondDerivativeOfRequestsInFlightFinder.xml b/extensions/cep/modules/artifacts/executionplans/SecondDerivativeOfRequestsInFlightFinder.xml
new file mode 100644
index 0000000..c8e4ed5
--- /dev/null
+++ b/extensions/cep/modules/artifacts/executionplans/SecondDerivativeOfRequestsInFlightFinder.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<executionPlan name="SecondDerivativeOfRequestsInFlightFinder"
+  statistics="disable" trace="enable" xmlns="http://wso2.org/carbon/eventprocessor">
+  <description>This will find the second derivative of the number of requests in flight over a minute.</description>
+  <siddhiConfiguration>
+    <property name="siddhi.enable.distributed.processing">false</property>
+    <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
+  </siddhiConfiguration>
+  <importedStreams>
+    <stream as="second_der_rif" name="in_flight_requests" version="1.0.0"/>
+  </importedStreams>
+  <queryExpressions><![CDATA[ 
+	from second_der_rif
+		select cluster_id, cluster_instance_id, network_partition_id, in_flight_request_count,
+		stratos:concat(cluster_id, '-' ,network_partition_id) as rif_second_der_cluster_network
+		insert into rif_second_der_concat;
+	define partition rif_second_der_cluster_partition by rif_second_der_concat.rif_second_der_cluster_network;
+	from rif_second_der_concat#window.stratos:secondDerivative(1 min, in_flight_request_count)
+		select cluster_id, cluster_instance_id, network_partition_id,in_flight_request_count as count
+		insert into second_derivative_in_flight_requests 
+		partition by rif_second_der_cluster_partition;
+  ]]></queryExpressions>
+  <exportedStreams>
+    <stream name="second_derivative_in_flight_requests"
+            valueOf="second_derivative_in_flight_requests" version="1.0.0"/>
+  </exportedStreams>
+</executionPlan>

http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/artifacts/inputeventadaptors/DefaultWSO2EventInputAdaptor.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/artifacts/inputeventadaptors/DefaultWSO2EventInputAdaptor.xml b/extensions/cep/modules/artifacts/inputeventadaptors/DefaultWSO2EventInputAdaptor.xml
new file mode 100644
index 0000000..77af463
--- /dev/null
+++ b/extensions/cep/modules/artifacts/inputeventadaptors/DefaultWSO2EventInputAdaptor.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<inputEventAdaptor name="DefaultWSO2EventInputAdaptor"
+  statistics="disable" trace="enable" type="wso2event" xmlns="http://wso2.org/carbon/eventadaptormanager"/>

http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/artifacts/outputeventadaptors/DefaultWSO2EventOutputAdaptor.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/artifacts/outputeventadaptors/DefaultWSO2EventOutputAdaptor.xml b/extensions/cep/modules/artifacts/outputeventadaptors/DefaultWSO2EventOutputAdaptor.xml
new file mode 100644
index 0000000..4438d2c
--- /dev/null
+++ b/extensions/cep/modules/artifacts/outputeventadaptors/DefaultWSO2EventOutputAdaptor.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<outputEventAdaptor name="DefaultWSO2EventOutputAdaptor"
+  statistics="disable" trace="disable" type="wso2event" xmlns="http://wso2.org/carbon/eventadaptormanager">
+  <property name="username">admin</property>
+  <property name="receiverURL">tcp://localhost:7661</property>
+  <property name="password">admin</property>
+  <property name="authenticatorURL">ssl://localhost:7761</property>
+</outputEventAdaptor>

http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/artifacts/outputeventadaptors/JMSOutputAdaptor.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/artifacts/outputeventadaptors/JMSOutputAdaptor.xml b/extensions/cep/modules/artifacts/outputeventadaptors/JMSOutputAdaptor.xml
new file mode 100644
index 0000000..59c3653
--- /dev/null
+++ b/extensions/cep/modules/artifacts/outputeventadaptors/JMSOutputAdaptor.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<outputEventAdaptor name="JMSOutputAdaptor" statistics="disable"
+  trace="enable" type="jms" xmlns="http://wso2.org/carbon/eventadaptormanager">
+  <!--property name="java.naming.provider.url">CEP_HOME/repository/conf/jndi.properties</property-->
+  <property name="java.naming.provider.url">tcp://localhost:61616</property>
+  <property name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</property>
+  <property name="transport.jms.ConnectionFactoryJNDIName">TopicConnectionFactory</property>
+  <property name="transport.jms.DestinationType">topic</property>
+</outputEventAdaptor>

http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/artifacts/streamdefinitions/stream-manager-config.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/artifacts/streamdefinitions/stream-manager-config.xml b/extensions/cep/modules/artifacts/streamdefinitions/stream-manager-config.xml
new file mode 100644
index 0000000..4c4c7e0
--- /dev/null
+++ b/extensions/cep/modules/artifacts/streamdefinitions/stream-manager-config.xml
@@ -0,0 +1,309 @@
+<?xml version='1.0'?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<streamManagerConfiguration xmlns="http://wso2.org/carbon/streammanager">
+    <!-- in-flight requests stream definitions start -->
+    <streamDefinition name="in_flight_requests" version="1.0.0">
+         <description>in-flight request count</description>
+         <nickName>in-flight requests</nickName>
+         <metaData>
+         </metaData>
+         <correlationData>
+         </correlationData>
+         <payloadData>
+             <property name="cluster_id" type="String"/>
+             <property name="cluster_instance_id" type="String"/>
+             <property name="network_partition_id" type="String"/>
+             <property name="in_flight_request_count" type="double"/>
+         </payloadData>
+     </streamDefinition> 
+
+    <streamDefinition name="gradient_in_flight_requests" version="1.0.0">
+         <description>gradient of in flight request count</description>
+         <nickName>gradient in flight requests</nickName>
+         <metaData>
+         </metaData>
+         <correlationData>
+         </correlationData>
+         <payloadData>
+             <property name="cluster_id" type="String"/>
+             <property name="cluster_instance_id" type="String"/>
+             <property name="network_partition_id" type="String"/>
+             <property name="count" type="double"/>
+         </payloadData>
+     </streamDefinition>
+
+    <streamDefinition name="average_in_flight_requests" version="1.0.0">
+         <description>average of in-flight request count</description>
+         <nickName>average in-flight requests</nickName>
+         <metaData>
+         </metaData>
+         <correlationData>
+         </correlationData>
+         <payloadData>
+             <property name="cluster_id" type="String"/>
+             <property name="cluster_instance_id" type="String"/>
+             <property name="network_partition_id" type="String"/>
+             <property name="count" type="double"/>
+         </payloadData>
+     </streamDefinition>
+
+    <streamDefinition name="second_derivative_in_flight_requests" version="1.0.0">
+         <description>second derivative of in-flight request count</description>
+         <nickName>second derivative in-flight requests</nickName>
+         <metaData>
+         </metaData>
+         <correlationData>
+         </correlationData>
+         <payloadData>
+             <property name="cluster_id" type="String"/>
+             <property name="cluster_instance_id" type="String"/>
+             <property name="network_partition_id" type="String"/>
+             <property name="count" type="double"/>
+         </payloadData>
+    </streamDefinition>
+    <!-- in-flight requests stream definitions end -->
+
+    <!-- cartridge agent health stats stream definitions start -->
+    <streamDefinition name="cartridge_agent_health_stats" version="1.0.0">
+         <description>agent health stats</description>
+         <nickName>agent health stats</nickName>
+         <metaData>
+         </metaData>
+         <correlationData>
+         </correlationData>
+         <payloadData>
+	         <property name="cluster_id" type="String" />
+             <property name="cluster_instance_id" type="String"/>
+             <property name="network_partition_id" type="String"/>
+             <property name="member_id" type="String" />
+             <property name="partition_id" type="String" />
+             <property name="health_description" type="String"/>
+             <property name="value" type="double"/>
+         </payloadData>
+     </streamDefinition>
+
+    <streamDefinition name="average_load_average_stats" version="1.0.0">
+         <description>average load average stats</description>
+         <nickName>average load average stats</nickName>
+         <metaData>
+         </metaData>
+         <correlationData>
+         </correlationData>
+         <payloadData>
+	     <property name="cluster_id" type="String" />
+             <property name="cluster_instance_id" type="String"/>
+             <property name="network_partition_id" type="String"/>
+             <property name="average_load_average" type="double"/>
+         </payloadData>
+     </streamDefinition>
+
+    <streamDefinition name="average_memory_consumption_stats" version="1.0.0">
+         <description>average memory consumption stats</description>
+         <nickName>average memory consumption stats</nickName>
+         <metaData>
+         </metaData>
+         <correlationData>
+         </correlationData>
+         <payloadData>
+             <property name="cluster_id" type="String"/>
+             <property name="cluster_instance_id" type="String"/>
+             <property name="network_partition_id" type="String"/>
+             <property name="average_memory_consumption" type="double"/>
+         </payloadData>
+     </streamDefinition>
+
+    <streamDefinition name="gradient_load_average_stats" version="1.0.0">
+         <description>gradient load average stats</description>
+         <nickName>gradient load average stats</nickName>
+         <metaData>
+         </metaData>
+         <correlationData>
+         </correlationData>
+         <payloadData>
+	     <property name="cluster_id" type="String" />
+             <property name="cluster_instance_id" type="String"/>
+             <property name="network_partition_id" type="String"/>
+             <property name="gradient_load_average" type="double"/>
+         </payloadData>
+     </streamDefinition>
+
+    <streamDefinition name="gradient_memory_consumption_stats" version="1.0.0">
+         <description>gradient memoryconsumption stats</description>
+         <nickName>gradient memoryconsumption stats</nickName>
+         <metaData>
+         </metaData>
+         <correlationData>
+         </correlationData>
+         <payloadData>
+	     <property name="cluster_id" type="String" />
+             <property name="cluster_instance_id" type="String"/>
+             <property name="network_partition_id" type="String"/>
+             <property name="gradient_memory_consumption" type="double"/>
+         </payloadData>
+     </streamDefinition>
+
+    <streamDefinition name="second_derivative_memory_consumption_stats" version="1.0.0">
+         <description>second derivative memory consumption stats</description>
+         <nickName>second derivative memory consumption stats</nickName>
+         <metaData>
+         </metaData>
+         <correlationData>
+         </correlationData>
+         <payloadData>
+	     <property name="cluster_id" type="String" />
+             <property name="cluster_instance_id" type="String"/>
+             <property name="network_partition_id" type="String"/>
+             <property name="second_derivative_memory_consumption" type="double"/>
+         </payloadData>
+     </streamDefinition>
+
+    <streamDefinition name="second_derivative_load_average_stats" version="1.0.0">
+         <description>second derivative load average stats</description>
+         <nickName>second derivative load average stats</nickName>
+         <metaData>
+         </metaData>
+         <correlationData>
+         </correlationData>
+         <payloadData>
+	     <property name="cluster_id" type="String" />
+             <property name="cluster_instance_id" type="String"/>
+             <property name="network_partition_id" type="String"/>
+             <property name="second_derivative_load_average" type="double"/>
+         </payloadData>
+     </streamDefinition>
+
+    <streamDefinition name="fault_message" version="1.0.0">
+         <description>fault message</description>
+         <nickName>fault message</nickName>
+         <metaData>
+         </metaData>
+         <correlationData>
+         </correlationData>
+         <payloadData>
+             <property name="cluster_id" type="String"/>
+             <property name="cluster_instance_id" type="String"/>
+             <property name="network_partition_id" type="String"/>
+             <property name="member_id" type="String"/>
+             <property name="partition_id" type="String"/>
+         </payloadData>
+    </streamDefinition>
+    <!-- cartridge agent health stats stream definitions end -->
+
+    <!-- This is for member_id wise grouping-->
+    <streamDefinition name="member_average_load_average_stats" version="1.0.0">
+         <description>average load average stats</description>
+         <nickName>average load average stats</nickName>
+         <metaData>
+         </metaData>
+         <correlationData>
+         </correlationData>
+         <payloadData>
+             <property name="member_id" type="String" />
+             <property name="cluster_id" type="String" />
+             <property name="cluster_instance_id" type="String"/>
+             <property name="network_partition_id" type="String"/>
+             <property name="member_average_load_average" type="double"/>
+         </payloadData>
+     </streamDefinition>
+
+    <streamDefinition name="member_average_memory_consumption_stats" version="1.0.0">
+         <description>average memory consumption stats</description>
+         <nickName>average memory consumption stats</nickName>
+         <metaData>
+         </metaData>
+         <correlationData>
+         </correlationData>
+         <payloadData>
+             <property name="member_id" type="String"/>
+             <property name="cluster_id" type="String"/>
+             <property name="cluster_instance_id" type="String"/>
+             <property name="network_partition_id" type="String"/>
+             <property name="member_average_memory_consumption" type="double"/>
+         </payloadData>
+     </streamDefinition>
+
+    <streamDefinition name="member_gradient_load_average_stats" version="1.0.0">
+         <description>gradient load average stats</description>
+         <nickName>gradient load average stats</nickName>
+         <metaData>
+         </metaData>
+         <correlationData>
+         </correlationData>
+         <payloadData>
+             <property name="member_id" type="String" />
+             <property name="cluster_id" type="String" />
+             <property name="cluster_instance_id" type="String"/>
+             <property name="network_partition_id" type="String"/>
+             <property name="member_gradient_load_average" type="double"/>
+         </payloadData>
+     </streamDefinition>
+
+    <streamDefinition name="member_gradient_memory_consumption_stats" version="1.0.0">
+         <description>gradient memoryconsumption stats</description>
+         <nickName>gradient memoryconsumption stats</nickName>
+         <metaData>
+         </metaData>
+         <correlationData>
+         </correlationData>
+         <payloadData>
+             <property name="member_id" type="String" />
+             <property name="cluster_id" type="String" />
+             <property name="cluster_instance_id" type="String"/>
+             <property name="network_partition_id" type="String"/>
+             <property name="member_gradient_memory_consumption" type="double"/>
+         </payloadData>
+     </streamDefinition>
+
+    <streamDefinition name="member_second_derivative_memory_consumption_stats" version="1.0.0">
+         <description>second derivative memory consumption stats</description>
+         <nickName>second derivative memory consumption stats</nickName>
+         <metaData>
+         </metaData>
+         <correlationData>
+         </correlationData>
+         <payloadData>
+             <property name="member_id" type="String" />
+             <property name="cluster_id" type="String" />
+             <property name="cluster_instance_id" type="String"/>
+             <property name="network_partition_id" type="String"/>
+             <property name="member_second_derivative_memory_consumption" type="double"/>
+         </payloadData>
+     </streamDefinition>
+
+    <streamDefinition name="member_second_derivative_load_average_stats" version="1.0.0">
+         <description>second derivative load average stats</description>
+         <nickName>second derivative load average stats</nickName>
+         <metaData>
+         </metaData>
+         <correlationData>
+         </correlationData>
+         <payloadData>
+             <property name="member_id" type="String" />
+             <property name="cluster_id" type="String" />
+             <property name="cluster_instance_id" type="String"/>
+             <property name="network_partition_id" type="String"/>
+             <property name="member_second_derivative_load_average" type="double"/>
+         </payloadData>
+     </streamDefinition>
+
+</streamManagerConfiguration>

http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/distribution/README.md
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/distribution/README.md b/extensions/cep/modules/distribution/README.md
new file mode 100755
index 0000000..ebf6bf2
--- /dev/null
+++ b/extensions/cep/modules/distribution/README.md
@@ -0,0 +1,12 @@
+# Apache Stratos CEP Extensions
+
+Apache Stratos Complex Event Processor (CEP) extensions include Window Processors for processing 
+health statistic events. These extensions are available in Stratos binary distribution, in a 
+distributed deployment where CEP is run externally, these extensions need to be deployed manually.
+
+Please refer below link for more information on WSO2 CEP.
+http://wso2.com/products/complex-event-processor/
+
+
+Thank you for using Apache Stratos!
+The Stratos Team
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/distribution/pom.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/distribution/pom.xml b/extensions/cep/modules/distribution/pom.xml
new file mode 100644
index 0000000..1fe52f0
--- /dev/null
+++ b/extensions/cep/modules/distribution/pom.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.stratos</groupId>
+        <artifactId>cep-extensions</artifactId>
+        <version>4.1.2</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>apache-stratos-cep-artifacts</artifactId>
+    <packaging>pom</packaging>
+    <name>Apache Stratos CEP artifacts</name>
+    <description>Apache Stratos CEP artifacts</description>
+
+    <profiles>
+        <profile>
+            <id>default</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-assembly-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <id>4-dist</id>
+                                <phase>package</phase>
+                                <goals>
+                                    <goal>attached</goal>
+                                </goals>
+                                <configuration>
+                                    <descriptors>
+                                        <descriptor>${basedir}/src/assembly/bin.xml</descriptor>
+                                    </descriptors>
+                                    <appendAssemblyId>false</appendAssemblyId>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/distribution/src/assembly/bin.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/distribution/src/assembly/bin.xml b/extensions/cep/modules/distribution/src/assembly/bin.xml
new file mode 100755
index 0000000..509a6e2
--- /dev/null
+++ b/extensions/cep/modules/distribution/src/assembly/bin.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<assembly>
+    <id>bin</id>
+    <formats>
+        <format>zip</format>
+    </formats>
+    <includeBaseDirectory>false</includeBaseDirectory>
+    <fileSets>
+        <fileSet>
+            <directory>../artifacts/</directory>
+            <outputDirectory>${project.artifactId}-${project.version}</outputDirectory>
+            <includes>
+                <include>**/*.xml</include>
+            </includes>
+            <excludes>
+                <exclude>pom.xml</exclude>
+                <exclude>src/**</exclude>
+            </excludes>
+        </fileSet>
+    </fileSets>
+    <files>
+        <file>
+            <source>${project.basedir}/README.md</source>
+            <outputDirectory>${project.artifactId}-${project.version}</outputDirectory>
+            <filtered>true</filtered>
+            <fileMode>644</fileMode>
+        </file>
+        <file>
+            <source>src/main/notice/NOTICE</source>
+            <outputDirectory>${project.artifactId}-${project.version}</outputDirectory>
+            <filtered>true</filtered>
+            <fileMode>644</fileMode>
+        </file>
+        <file>
+            <source>src/main/license/LICENSE</source>
+            <outputDirectory>${project.artifactId}-${project.version}</outputDirectory>
+            <filtered>true</filtered>
+            <fileMode>644</fileMode>
+        </file>
+    </files>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/distribution/src/main/license/LICENSE
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/distribution/src/main/license/LICENSE b/extensions/cep/modules/distribution/src/main/license/LICENSE
new file mode 100644
index 0000000..25202d8
--- /dev/null
+++ b/extensions/cep/modules/distribution/src/main/license/LICENSE
@@ -0,0 +1,204 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+===================================================================================

http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/distribution/src/main/notice/NOTICE
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/distribution/src/main/notice/NOTICE b/extensions/cep/modules/distribution/src/main/notice/NOTICE
new file mode 100644
index 0000000..46ddddd
--- /dev/null
+++ b/extensions/cep/modules/distribution/src/main/notice/NOTICE
@@ -0,0 +1,7 @@
+Apache Stratos CEP Extensions
+Copyright 2015 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+================================================================================
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/stratos-cep-extension/pom.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/stratos-cep-extension/pom.xml b/extensions/cep/modules/stratos-cep-extension/pom.xml
new file mode 100644
index 0000000..1905ed4
--- /dev/null
+++ b/extensions/cep/modules/stratos-cep-extension/pom.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+  #  Licensed to the Apache Software Foundation (ASF) under one
+  #  or more contributor license agreements.  See the NOTICE file
+  #  distributed with this work for additional information
+  #  regarding copyright ownership.  The ASF licenses this file
+  #  to you under the Apache License, Version 2.0 (the
+  #  "License"); you may not use this file except in compliance
+  #  with the License.  You may obtain a copy of the License at
+  #
+  #  http://www.apache.org/licenses/LICENSE-2.0
+  #
+  #  Unless required by applicable law or agreed to in writing,
+  #  software distributed under the License is distributed on an
+  #  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  #  KIND, either express or implied.  See the License for the
+  #  specific language governing permissions and limitations
+  #  under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.stratos</groupId>
+        <artifactId>cep-extensions</artifactId>
+        <version>4.1.2</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>org.apache.stratos.cep.extension</artifactId>
+    <name>Apache Stratos - CEP Extensions</name>
+    <description>Apache Stratos CEP Extensions</description>
+
+    <repositories>
+        <repository>
+            <id>wso2-maven2-repository</id>
+            <name>WSO2 Maven2 Repository</name>
+            <url>http://dist.wso2.org/maven2</url>
+        </repository>
+    </repositories>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.wso2.siddhi</groupId>
+            <artifactId>siddhi-core</artifactId>
+            <version>2.0.0-wso2v5</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.stratos</groupId>
+            <artifactId>org.apache.stratos.messaging</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java b/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
new file mode 100644
index 0000000..59c70c5
--- /dev/null
+++ b/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cep.extension;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
+import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener;
+import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener;
+import org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListener;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+/**
+ * CEP Topology Receiver for Fault Handling Window Processor.
+ */
+public class CEPTopologyEventReceiver extends TopologyEventReceiver {
+
+    private static final Log log = LogFactory.getLog(CEPTopologyEventReceiver.class);
+
+    private FaultHandlingWindowProcessor faultHandler;
+
+    public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) {
+        this.faultHandler = faultHandler;
+        addEventListeners();
+    }
+
+    @Override
+    public void execute() {
+        super.execute();
+        log.info("CEP topology event receiver thread started");
+    }
+
+    private void addEventListeners() {
+        // Load member time stamp map from the topology as a one time task
+        addEventListener(new CompleteTopologyEventListener() {
+            private boolean initialized;
+
+            @Override
+            protected void onEvent(Event event) {
+                if (!initialized) {
+                    try {
+                        TopologyManager.acquireReadLock();
+                        log.debug("Complete topology event received to fault handling window processor.");
+                        CompleteTopologyEvent completeTopologyEvent = (CompleteTopologyEvent) event;
+                        initialized = faultHandler.loadTimeStampMapFromTopology(completeTopologyEvent.getTopology());
+                    } catch (Exception e) {
+                        log.error("Error loading member time stamp map from complete topology event.", e);
+                    } finally {
+                        TopologyManager.releaseReadLock();
+                    }
+                }
+            }
+        });
+
+        // Remove member from the time stamp map when MemberTerminated event is received.
+        addEventListener(new MemberTerminatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
+                faultHandler.getMemberTimeStampMap().remove(memberTerminatedEvent.getMemberId());
+                log.debug("Member was removed from the timestamp map: [member] " + memberTerminatedEvent.getMemberId());
+            }
+        });
+
+        // Add member to time stamp map whenever member is activated
+        addEventListener(new MemberActivatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
+
+                // do not put this member if we have already received a health event
+                faultHandler.getMemberTimeStampMap().putIfAbsent(memberActivatedEvent.getMemberId(),
+                        System.currentTimeMillis());
+                log.debug("Member was added to the timestamp map: [member] " + memberActivatedEvent.getMemberId());
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/ConcatWindowProcessor.java
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/ConcatWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/ConcatWindowProcessor.java
new file mode 100644
index 0000000..699f036
--- /dev/null
+++ b/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/ConcatWindowProcessor.java
@@ -0,0 +1,54 @@
+/*
+ *     Licensed to the Apache Software Foundation (ASF) under one
+ *     or more contributor license agreements.  See the NOTICE file
+ *     distributed with this work for additional information
+ *     regarding copyright ownership.  The ASF licenses this file
+ *     to you under the Apache License, Version 2.0 (the
+ *     "License"); you may not use this file except in compliance
+ *     with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *     Unless required by applicable law or agreed to in writing,
+ *     software distributed under the License is distributed on an
+ *     "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *     KIND, either express or implied.  See the License for the
+ *     specific language governing permissions and limitations
+ *     under the License.
+ */
+package org.apache.stratos.cep.extension;
+import org.wso2.siddhi.core.config.SiddhiContext;
+import org.wso2.siddhi.core.executor.function.FunctionExecutor;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
+
+@SiddhiExtension(namespace = "stratos", function = "concat")
+public class ConcatWindowProcessor extends FunctionExecutor {
+    Attribute.Type returnType = Attribute.Type.STRING;
+    @Override
+    public void init(Attribute.Type[] types, SiddhiContext siddhiContext) {
+    }
+
+    @Override
+    protected Object process(Object obj) {
+        if (obj instanceof Object[]) {
+            StringBuffer sb=new StringBuffer();
+            for (Object aObj : (Object[]) obj) {
+                sb.append(aObj);
+            }
+            return sb.toString();
+        } else {
+            return obj.toString();
+        }
+
+    }
+
+    @Override
+    public void destroy() {
+    }
+
+    @Override
+    public Attribute.Type getReturnType() {
+        return returnType;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
new file mode 100644
index 0000000..0aa01ed
--- /dev/null
+++ b/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cep.extension;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.stratos.common.threading.StratosThreadPool;
+import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
+import org.apache.stratos.messaging.domain.topology.*;
+import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import org.apache.stratos.messaging.util.MessagingUtil;
+import org.wso2.siddhi.core.config.SiddhiContext;
+import org.wso2.siddhi.core.event.StreamEvent;
+import org.wso2.siddhi.core.event.in.InEvent;
+import org.wso2.siddhi.core.event.in.InListEvent;
+import org.wso2.siddhi.core.persistence.ThreadBarrier;
+import org.wso2.siddhi.core.query.QueryPostProcessingElement;
+import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor;
+import org.wso2.siddhi.core.query.processor.window.WindowProcessor;
+import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue;
+import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue;
+import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+import org.wso2.siddhi.query.api.expression.Expression;
+import org.wso2.siddhi.query.api.expression.Variable;
+import org.wso2.siddhi.query.api.expression.constant.IntConstant;
+import org.wso2.siddhi.query.api.expression.constant.LongConstant;
+import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * CEP window processor to handle faulty member instances. This window processor is responsible for
+ * publishing MemberFault event if health stats are not received within a given time window.
+ */
+@SiddhiExtension(namespace = "stratos", function = "faultHandling")
+public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor {
+
+	private static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class);
+
+    private static final int TIME_OUT = 60 * 1000;
+    public static final String CEP_EXTENSION_THREAD_POOL_KEY = "cep.extension.thread.pool";
+    public static final int CEP_EXTENSION_THREAD_POOL_SIZE = 10;
+
+    private ExecutorService executorService;
+    private ScheduledExecutorService faultHandleScheduler;
+    private ScheduledFuture<?> lastSchedule;
+	private ThreadBarrier threadBarrier;
+	private long timeToKeep;
+	private ISchedulerSiddhiQueue<StreamEvent> window;
+	private EventPublisher healthStatPublisher =
+			EventPublisherPool.getPublisher(MessagingUtil.Topics.HEALTH_STAT_TOPIC.getTopicName());
+	private Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>();
+	private Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>();
+
+	// Map of member id's to their last received health event time stamp
+	private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>();
+
+	// Event receiver to receive topology events published by cloud-controller
+	private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this);
+
+	// Stratos member id attribute index in stream execution plan
+	private int memberIdAttrIndex;
+
+	@Override
+	protected void processEvent(InEvent event) {
+		addDataToMap(event);
+	}
+
+	@Override
+	protected void processEvent(InListEvent listEvent) {
+		for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
+			addDataToMap((InEvent) listEvent.getEvent(i));
+		}
+	}
+
+	/**
+	 * Add new entry to time stamp map from the received event.
+	 *
+	 * @param event Event received by Siddhi.
+	 */
+	protected void addDataToMap(InEvent event) {
+		String id = (String) event.getData()[memberIdAttrIndex];
+		//checking whether this member is the topology.
+		//sometimes there can be a delay between publishing member terminated events
+		//and actually terminating instances. Hence CEP might get events for already terminated members
+		//so we are checking the topology for the member existence
+		Member member = getMemberFromId(id);
+		if (null == member) {
+			log.debug("Member not found in the topology. Event rejected");
+			return;
+		}
+        if (StringUtils.isNotEmpty(id)) {
+            memberTimeStampMap.put(id, event.getTimeStamp());
+        } else {
+            log.warn("NULL member id found in the event received. Event rejected.");
+        }
+        if (log.isDebugEnabled()){
+            log.debug("Event received from [member-id] " + id + " [time-stamp] " + event.getTimeStamp());
+        }
+    }
+
+    @Override
+    public Iterator<StreamEvent> iterator() {
+        return window.iterator();
+    }
+
+    @Override
+    public Iterator<StreamEvent> iterator(String predicate) {
+        if (siddhiContext.isDistributedProcessingEnabled()) {
+            return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate);
+        } else {
+            return window.iterator();
+        }
+    }
+
+    /**
+     *  Retrieve the current activated members from the topology and initialize the timestamp map.
+     *  This will allow the system to recover from a restart
+     *
+     *  @param topology Topology model object
+     */
+    boolean loadTimeStampMapFromTopology(Topology topology){
+
+        long currentTimeStamp = System.currentTimeMillis();
+        if (topology == null || topology.getServices() == null){
+            return false;
+        }
+        // TODO make this efficient by adding APIs to messaging component
+        for (Service service : topology.getServices()) {
+            if (service.getClusters() != null) {
+                for (Cluster cluster : service.getClusters()) {
+                    if (cluster.getMembers() != null) {
+                        for (Member member : cluster.getMembers()) {
+                            // we are checking faulty status only in previously activated members
+                            if (member != null && MemberStatus.Active.equals(member.getStatus())) {
+                                // Initialize the member time stamp map from the topology at the beginning
+                                memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        if (log.isDebugEnabled()){
+            log.debug("Member timestamps were successfully loaded from the topology: [timestamps] " +
+                    memberTimeStampMap);
+        }
+        return true;
+    }
+
+    private Member getMemberFromId(String memberId){
+        if (StringUtils.isEmpty(memberId)){
+            return null;
+        }
+        if (TopologyManager.getTopology().isInitialized()){
+        	try {
+                TopologyManager.acquireReadLock();
+                if (TopologyManager.getTopology().getServices() == null){
+                    return null;
+                }
+                // TODO make this efficient by adding APIs to messaging component
+                for (Service service : TopologyManager.getTopology().getServices()) {
+                    if (service.getClusters() != null) {
+                        for (Cluster cluster : service.getClusters()) {
+                            if (cluster.getMembers() != null) {
+                                for (Member member : cluster.getMembers()){
+                                    if (memberId.equals(member.getMemberId())){
+                                        return member;
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+        	} catch (Exception e) {
+        		log.error("Error while reading topology" + e);
+        	} finally {
+        		TopologyManager.releaseReadLock();
+        	}
+        }
+        return null;
+    }
+
+    private void publishMemberFault(String memberId){
+        Member member = getMemberFromId(memberId);
+        if (member == null){
+            log.warn("Failed to publish member fault event. Member having [member-id] " + memberId +
+                    " does not exist in topology");
+            return;
+        }
+        log.info("Publishing member fault event for [member-id] " + memberId);
+
+        MemberFaultEvent memberFaultEvent = new MemberFaultEvent(member.getClusterId(), member.getClusterInstanceId(),
+                member.getMemberId(), member.getPartitionId(),
+                member.getNetworkPartitionId(), 0);
+
+        memberFaultEventMessageMap.put("message", memberFaultEvent);
+        healthStatPublisher.publish(MemberFaultEventMap, true);
+    }
+
+    @Override
+    public void run() {
+        try {
+            threadBarrier.pass();
+
+            for (Object o : memberTimeStampMap.entrySet()) {
+                Map.Entry pair = (Map.Entry) o;
+                long currentTime = System.currentTimeMillis();
+                Long eventTimeStamp = (Long) pair.getValue();
+
+                if ((currentTime - eventTimeStamp) > TIME_OUT) {
+                    log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " +
+                            eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds");
+                    publishMemberFault((String) pair.getKey());
+                }
+            }
+            if (log.isDebugEnabled()){
+                log.debug("Fault handling processor iteration completed with [time-stamp map length] " +
+                        memberTimeStampMap.size() + " [time-stamp map] " + memberTimeStampMap);
+            }
+        } catch (Throwable t) {
+            log.error(t.getMessage(), t);
+        } finally {
+            if (lastSchedule != null) {
+                lastSchedule.cancel(false);
+            }
+            lastSchedule = faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    @Override
+    protected Object[] currentState() {
+        return new Object[]{window.currentState()};
+    }
+
+    @Override
+    protected void restoreState(Object[] data) {
+        window.restoreState(data);
+        window.restoreState((Object[]) data[0]);
+        window.reSchedule();
+    }
+
+    @Override
+    protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor,
+                        AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {
+
+        if (parameters[0] instanceof IntConstant) {
+            timeToKeep = ((IntConstant) parameters[0]).getValue();
+        } else {
+            timeToKeep = ((LongConstant) parameters[0]).getValue();
+        }
+
+        String memberIdAttrName = ((Variable) parameters[1]).getAttributeName();
+        memberIdAttrIndex = streamDefinition.getAttributePosition(memberIdAttrName);
+
+        if (this.siddhiContext.isDistributedProcessingEnabled()) {
+            window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async);
+        } else {
+            window = new SchedulerSiddhiQueue<StreamEvent>(this);
+        }
+        MemberFaultEventMap.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap);
+
+	    executorService = StratosThreadPool.getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY,
+                CEP_EXTENSION_THREAD_POOL_SIZE);
+	    cepTopologyEventReceiver.setExecutorService(executorService);
+        cepTopologyEventReceiver.execute();
+
+        //Ordinary scheduling
+        window.schedule();
+        if (log.isDebugEnabled()){
+            log.debug("Fault handling window processor initialized with [timeToKeep] " + timeToKeep +
+                    ", [memberIdAttrName] " + memberIdAttrName + ", [memberIdAttrIndex] " + memberIdAttrIndex +
+                    ", [distributed-enabled] " + this.siddhiContext.isDistributedProcessingEnabled());
+        }
+    }
+
+    @Override
+    public void schedule() {
+        if (lastSchedule != null) {
+            lastSchedule.cancel(false);
+        }
+        lastSchedule = faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void scheduleNow() {
+        if (lastSchedule != null) {
+            lastSchedule.cancel(false);
+        }
+        lastSchedule = faultHandleScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
+        this.faultHandleScheduler = scheduledExecutorService;
+    }
+
+    @Override
+    public void setThreadBarrier(ThreadBarrier threadBarrier) {
+        this.threadBarrier = threadBarrier;
+    }
+
+    @Override
+    public void destroy(){
+        // terminate topology listener thread
+        cepTopologyEventReceiver.terminate();
+        window = null;
+
+        // Shutdown executor service
+        if(executorService != null) {
+            try {
+                executorService.shutdownNow();
+            } catch (Exception e) {
+                log.warn("An error occurred while shutting down cep extension executor service", e);
+            }
+        }
+    }
+
+    public ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
+        return memberTimeStampMap;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java
new file mode 100644
index 0000000..dff0f79
--- /dev/null
+++ b/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/GradientFinderWindowProcessor.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cep.extension;
+
+import org.apache.log4j.Logger;
+import org.wso2.siddhi.core.config.SiddhiContext;
+import org.wso2.siddhi.core.event.StreamEvent;
+import org.wso2.siddhi.core.event.in.InEvent;
+import org.wso2.siddhi.core.event.in.InListEvent;
+import org.wso2.siddhi.core.event.remove.RemoveEvent;
+import org.wso2.siddhi.core.event.remove.RemoveListEvent;
+import org.wso2.siddhi.core.persistence.ThreadBarrier;
+import org.wso2.siddhi.core.query.QueryPostProcessingElement;
+import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor;
+import org.wso2.siddhi.core.query.processor.window.WindowProcessor;
+import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue;
+import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue;
+import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.definition.Attribute.Type;
+import org.wso2.siddhi.query.api.expression.Expression;
+import org.wso2.siddhi.query.api.expression.Variable;
+import org.wso2.siddhi.query.api.expression.constant.IntConstant;
+import org.wso2.siddhi.query.api.expression.constant.LongConstant;
+import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+@SiddhiExtension(namespace = "stratos", function = "gradient")
+public class GradientFinderWindowProcessor extends WindowProcessor implements RunnableWindowProcessor {
+
+    static final Logger log = Logger.getLogger(GradientFinderWindowProcessor.class);
+    private ScheduledExecutorService eventRemoverScheduler;
+    private ScheduledFuture<?> lastSchedule;
+    private long timeToKeep;
+    private int subjectedAttrIndex;
+    private Attribute.Type subjectedAttrType;
+    private List<InEvent> newEventList;
+    private List<RemoveEvent> oldEventList;
+    private ThreadBarrier threadBarrier;
+    private ISchedulerSiddhiQueue<StreamEvent> window;
+
+    @Override
+    protected void processEvent(InEvent event) {
+        acquireLock();
+        try {
+            newEventList.add(event);
+        } finally {
+            releaseLock();
+        }
+    }
+
+    @Override
+    protected void processEvent(InListEvent listEvent) {
+        acquireLock();
+        try {
+            System.out.println(listEvent);
+            for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
+                newEventList.add((InEvent) listEvent.getEvent(i));
+            }
+        } finally {
+            releaseLock();
+        }
+    }
+
+    @Override
+    public Iterator<StreamEvent> iterator() {
+        return window.iterator();
+    }
+
+    @Override
+    public Iterator<StreamEvent> iterator(String predicate) {
+        if (siddhiContext.isDistributedProcessingEnabled()) {
+            return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate);
+        } else {
+            return window.iterator();
+        }
+    }
+
+
+    @Override
+	public void run() {
+		acquireLock();
+		try {
+			long scheduledTime = System.currentTimeMillis();
+			try {
+				oldEventList.clear();
+				while (true) {
+					threadBarrier.pass();
+					RemoveEvent removeEvent = (RemoveEvent) window.poll();
+					if (removeEvent == null) {
+						if (oldEventList.size() > 0) {
+							nextProcessor.process(new RemoveListEvent(
+							                                          oldEventList.toArray(new RemoveEvent[oldEventList.size()])));
+							oldEventList.clear();
+						}
+
+						if (newEventList.size() > 0) {
+							InEvent[] inEvents =
+							                     newEventList.toArray(new InEvent[newEventList.size()]);
+							for (InEvent inEvent : inEvents) {
+								window.put(new RemoveEvent(inEvent, -1));
+							}
+							
+							InEvent[] gradientEvents = gradient(inEvents[0], inEvents[newEventList.size() - 1]);
+							
+							for (InEvent inEvent : gradientEvents) {
+	                            window.put(new RemoveEvent(inEvent, -1));
+                            }
+							nextProcessor.process(new InListEvent(gradientEvents));
+
+							newEventList.clear();
+						}
+
+						long diff = timeToKeep - (System.currentTimeMillis() - scheduledTime);
+						if (diff > 0) {
+                            try {
+                                if (lastSchedule != null) {
+                                    lastSchedule.cancel(false);
+                                }
+                                lastSchedule = eventRemoverScheduler.schedule(this, diff, TimeUnit.MILLISECONDS);
+                            } catch (RejectedExecutionException ex) {
+								log.warn("scheduling cannot be accepted for execution: elementID " +
+								         elementId);
+							}
+							break;
+						}
+						scheduledTime = System.currentTimeMillis();
+					} else {
+						oldEventList.add(new RemoveEvent(removeEvent, System.currentTimeMillis()));
+					}
+				}
+			} catch (Throwable t) {
+				log.error(t.getMessage(), t);
+			}
+		} finally {
+			releaseLock();
+		}
+	}
+
+
+    /**
+     * This function will calculate the linear gradient (per second) of the events received during
+     * a specified time period.
+     */
+	private InEvent[] gradient(InEvent firstInEvent, InEvent lastInEvent) {
+		double firstVal = 0.0, lastVal = 0.0;
+		// FIXME I'm not sure whether there's some other good way to do correct casting,
+		// based on the type.
+		if (Type.DOUBLE.equals(subjectedAttrType)) {
+			firstVal = (Double) firstInEvent.getData()[subjectedAttrIndex];
+			lastVal = (Double) lastInEvent.getData()[subjectedAttrIndex];
+		} else if (Type.INT.equals(subjectedAttrType)) {
+			firstVal = (Integer) firstInEvent.getData()[subjectedAttrIndex];
+			lastVal = (Integer) lastInEvent.getData()[subjectedAttrIndex];
+		} else if (Type.LONG.equals(subjectedAttrType)) {
+			firstVal = (Long) firstInEvent.getData()[subjectedAttrIndex];
+			lastVal = (Long) lastInEvent.getData()[subjectedAttrIndex];
+		} else if (Type.FLOAT.equals(subjectedAttrType)) {
+			firstVal = (Float) firstInEvent.getData()[subjectedAttrIndex];
+			lastVal = (Float) lastInEvent.getData()[subjectedAttrIndex];
+		}
+		
+		long t1 = firstInEvent.getTimeStamp();
+		long t2 = lastInEvent.getTimeStamp();
+		long millisecondsForASecond = 1000;
+		long tGap = t2 - t1 > millisecondsForASecond ? t2 - t1 : millisecondsForASecond;
+		double gradient = 0.0;
+		if (tGap > 0) {
+			gradient = ((lastVal - firstVal) * millisecondsForASecond) / tGap;
+		}
+		if (log.isDebugEnabled()) {
+            log.debug("Gradient: " + gradient + " Last val: " + lastVal +
+                    " First val: " + firstVal + " Time Gap: " + tGap + " t1: "+t1+ " t2: "+
+                    t2+" hash: "+this.hashCode());
+        }
+		Object[] data = firstInEvent.getData().clone();
+		data[subjectedAttrIndex] = gradient;
+		InEvent gradientEvent =
+		                        new InEvent(firstInEvent.getStreamId(), (t1+t2)/2,
+		                                    data);
+		InEvent[] output = new InEvent[1];
+		output[0] = gradientEvent;
+		return output;
+	}
+
+	@Override
+    protected Object[] currentState() {
+        return new Object[]{window.currentState(), oldEventList, newEventList};
+    }
+
+    @Override
+    protected void restoreState(Object[] data) {
+        window.restoreState(data);
+        window.restoreState((Object[]) data[0]);
+        oldEventList = ((ArrayList<RemoveEvent>) data[1]);
+        newEventList = ((ArrayList<InEvent>) data[2]);
+        window.reSchedule();
+    }
+
+    @Override
+    protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {
+        if (parameters[0] instanceof IntConstant) {
+            timeToKeep = ((IntConstant) parameters[0]).getValue();
+        } else {
+            timeToKeep = ((LongConstant) parameters[0]).getValue();
+        }
+        
+        String subjectedAttr = ((Variable)parameters[1]).getAttributeName();
+        subjectedAttrIndex = streamDefinition.getAttributePosition(subjectedAttr);
+        subjectedAttrType = streamDefinition.getAttributeType(subjectedAttr);
+
+        oldEventList = new ArrayList<RemoveEvent>();
+        if (this.siddhiContext.isDistributedProcessingEnabled()) {
+            newEventList = this.siddhiContext.getHazelcastInstance().getList(elementId + "-newEventList");
+        } else {
+            newEventList = new ArrayList<InEvent>();
+        }
+
+        if (this.siddhiContext.isDistributedProcessingEnabled()) {
+            window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async);
+        } else {
+            window = new SchedulerSiddhiQueue<StreamEvent>(this);
+        }
+        //Ordinary scheduling
+        window.schedule();
+
+    }
+
+    @Override
+    public void schedule() {
+        if (lastSchedule != null) {
+            lastSchedule.cancel(false);
+            }
+        lastSchedule = eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
+    }
+
+    public void scheduleNow() {
+        if (lastSchedule != null) {
+            lastSchedule.cancel(false);
+        }
+        lastSchedule = eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
+        this.eventRemoverScheduler = scheduledExecutorService;
+    }
+
+    public void setThreadBarrier(ThreadBarrier threadBarrier) {
+        this.threadBarrier = threadBarrier;
+    }
+
+    @Override
+    public void destroy(){
+    	oldEventList = null;
+    	newEventList = null;
+    	window = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/d97d9c78/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java
new file mode 100755
index 0000000..0dc24bd
--- /dev/null
+++ b/extensions/cep/modules/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java
@@ -0,0 +1,68 @@
+/*
+ *     Licensed to the Apache Software Foundation (ASF) under one
+ *     or more contributor license agreements.  See the NOTICE file
+ *     distributed with this work for additional information
+ *     regarding copyright ownership.  The ASF licenses this file
+ *     to you under the Apache License, Version 2.0 (the
+ *     "License"); you may not use this file except in compliance
+ *     with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *     Unless required by applicable law or agreed to in writing,
+ *     software distributed under the License is distributed on an
+ *     "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *     KIND, either express or implied.  See the License for the
+ *     specific language governing permissions and limitations
+ *     under the License.
+ */
+
+package org.apache.stratos.cep.extension;
+
+/**
+ * Member Request Handling Capability Window Processor
+ */
+
+import org.wso2.siddhi.core.config.SiddhiContext;
+import org.wso2.siddhi.core.executor.function.FunctionExecutor;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
+
+@SiddhiExtension(namespace = "stratos", function = "divider")
+public class MemeberRequestHandlingCapabilityWindowProcessor extends FunctionExecutor {
+
+    Attribute.Type returnType = Attribute.Type.DOUBLE;
+
+    @Override
+    public void init(Attribute.Type[] types, SiddhiContext siddhiContext) {
+    }
+
+    @Override
+    protected Object process(Object obj) {
+
+        double[] value = new double[2];
+        if (obj instanceof Object[]) {
+            int i=0;
+            for (Object aObj : (Object[]) obj) {
+                value[i]= Double.parseDouble(String.valueOf(aObj));
+                i++;
+            }
+        }//to do avoid deviding zero number of active instances won't be zero cz there is min
+        Double unit = (value[0] / value[1]);
+        if(!unit.isNaN() && !unit.isInfinite())
+            return unit;
+        else
+            return 0.0;
+
+    }
+
+    @Override
+    public void destroy() {
+
+    }
+
+    @Override
+    public Attribute.Type getReturnType() {
+        return returnType;
+    }
+}