You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ga...@apache.org on 2015/08/20 08:44:45 UTC

[1/5] stratos git commit: Adding Metering and Monitoring Service Implementation

Repository: stratos
Updated Branches:
  refs/heads/tenant-isolation 4fdd431f0 -> 7575faa30


http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/artifacts/sparkscript/CCEvent
----------------------------------------------------------------------
diff --git a/extensions/das/artifacts/sparkscript/CCEvent b/extensions/das/artifacts/sparkscript/CCEvent
new file mode 100644
index 0000000..b8bb6fd
--- /dev/null
+++ b/extensions/das/artifacts/sparkscript/CCEvent
@@ -0,0 +1,18 @@
+CREATE TEMPORARY TABLE memberstatus
+USING CarbonAnalytics
+OPTIONS (tableName "ORG_APACHE_STRATOS_CLOUD_CONTROLLER");
+
+CREATE TEMPORARY TABLE memberstatusnew
+USING CarbonAnalytics
+OPTIONS (tableName "CLUSTER_MEMBER_NEW",
+         schema "startTime String, endTime String, clusterId STRING, activatedInstanceCount INT, terminatedInstanceCount INT, activeInstanceCount INT");
+
+;WITH InstanceCount as
+(select clusterId, count(case when status='Active' and timeStamp > current_time(null)-60000 and timeStamp <= current_time(null) then 1 else NULL end) as activatedInstanceCount, count(case when status='Terminated' and timeStamp > current_time(null)-60000 and timeStamp <= current_time(null) then 1 else NULL end) as terminatedInstanceCount, (sum(case when status='Active' then 1 else 0 end) - sum(case when status='Terminated' then 1 else 0 end))as activeInstanceCount from memberstatus group by clusterId)
+INSERT INTO table memberstatusnew select time(current_time(null)-60000),time(current_time(null)),clusterId, activatedInstanceCount, terminatedInstanceCount,activeInstanceCount from InstanceCount;
+
+CREATE TEMPORARY TABLE membersnew
+USING CarbonAnalytics
+OPTIONS (tableName "MEMBER_NEW",schema "clusterId STRING, clusterInstanceId STRING, networkId STRING, partitionId STRING, cartridgeType STRING, instanceType STRING, memberId STRING, scalingTime LONG,scalingReason STRING, timeStamp STRING");
+
+INSERT INTO TABLE membersnew select clusterId,clusterInstanceId,networkId,partitionId,cartridgeType,instanceType, memberId,scalingTime,scalingReason,time(timeStamp)as timeStamp FROM memberstatus where status='Created';
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/pom.xml
----------------------------------------------------------------------
diff --git a/extensions/das/pom.xml b/extensions/das/pom.xml
new file mode 100644
index 0000000..d21d1be
--- /dev/null
+++ b/extensions/das/pom.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>stratos-extensions</artifactId>
+        <groupId>org.apache.stratos</groupId>
+        <version>4.1.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.stratos</groupId>
+    <artifactId>strats-das-extension</artifactId>
+    <packaging>pom</packaging>
+    <name>Apache Stratos - DAS Extension</name>
+    <description>Apache Stratos extensions for DAS.</description>
+    <modules>
+        <module>spark-udf</module>
+    </modules>
+
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/spark-udf/pom.xml
----------------------------------------------------------------------
diff --git a/extensions/das/spark-udf/pom.xml b/extensions/das/spark-udf/pom.xml
new file mode 100644
index 0000000..ced0f0a
--- /dev/null
+++ b/extensions/das/spark-udf/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>strats-das-extension</artifactId>
+        <groupId>org.apache.stratos</groupId>
+        <version>4.1.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.stratos</groupId>
+    <artifactId>apache-stratos-spark-udf</artifactId>
+    <name>Apache Stratos - Spark UDF</name>
+    <description>Apache Stratos Spark UDF for DAS</description>
+
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/spark-udf/src/main/java/org/apache/stratos/das/extension/spark/udf/TimeUDF.java
----------------------------------------------------------------------
diff --git a/extensions/das/spark-udf/src/main/java/org/apache/stratos/das/extension/spark/udf/TimeUDF.java b/extensions/das/spark-udf/src/main/java/org/apache/stratos/das/extension/spark/udf/TimeUDF.java
new file mode 100644
index 0000000..0b8f408
--- /dev/null
+++ b/extensions/das/spark-udf/src/main/java/org/apache/stratos/das/extension/spark/udf/TimeUDF.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.das.extension.spark.udf;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ * Implementing UDF for implementing spark sql query related to time.
+ */
+public class TimeUDF {
+    /**
+     * Convert time(ms) to DateFormat
+     *
+     * @param timeStamp time in ms
+     * @return date as String
+     */
+    public String time(Long timeStamp) {
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
+        Date date = new Date(timeStamp.longValue());
+        return sdf.format(date);
+    }
+
+    /**
+     * Get the current time in ms
+     *
+     * @param param
+     * @return
+     */
+    public long current_time(Integer param) {
+        return System.currentTimeMillis();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/pom.xml
----------------------------------------------------------------------
diff --git a/extensions/pom.xml b/extensions/pom.xml
index ffbfa8e..1cc9038 100644
--- a/extensions/pom.xml
+++ b/extensions/pom.xml
@@ -17,7 +17,8 @@
   ~ specific language governing permissions and limitations
   ~ under the License.
   -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
 
     <parent>
         <groupId>org.apache.stratos</groupId>
@@ -36,6 +37,7 @@
         <module>cep/stratos-cep-extension</module>
         <module>cep/distribution/</module>
         <module>load-balancer</module>
+        <module>das</module>
     </modules>
 </project>
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl
----------------------------------------------------------------------
diff --git a/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl b/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl
index 56e9164..a8102da 100644
--- a/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl
+++ b/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl
@@ -84,7 +84,9 @@ dialect "mvel"
 
                         log.info("[dependency-scale] [scale-up] Partition available, hence trying to spawn an instance to scale up!" );
                         log.debug("[dependency-scale] [scale-up] " + " [partition] " + partitionContext.getPartitionId() + " [cluster] " + clusterId );
-                        delegator.delegateSpawn(partitionContext, clusterId, clusterInstanceContext.getId(), isPrimary);
+                        long scalingTime = System.currentTimeMillis();
+                        String scalingReason = "Dependency scaling";
+                        delegator.delegateSpawn(partitionContext, clusterId, clusterInstanceContext.getId(), isPrimary,scalingReason,scalingTime);
                         count++;
                     } else {
                         partitionsAvailable = false;

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl
----------------------------------------------------------------------
diff --git a/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl b/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl
index 96b60da..4eaab2b 100755
--- a/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl
+++ b/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl
@@ -84,7 +84,10 @@ dialect "mvel"
 
                 log.info("[min-check] Partition available, hence trying to spawn an instance to fulfil minimum count!" + " [cluster] " + clusterId);
                 log.debug("[min-check] " + " [partition] " + partitionContext.getPartitionId() + " [cluster] " + clusterId);
-                delegator.delegateSpawn(partitionContext, clusterId, clusterInstanceContext.getId(), isPrimary);
+                long scalingTime = System.currentTimeMillis();
+                String scalingReason = "Scaling up to fulfil minimum count, [Cluster Min Members] "+clusterInstanceContext.getMinInstanceCount()+" [Additional instances to be created] " + additionalInstances;
+                delegator.delegateSpawn(partitionContext, clusterId, clusterInstanceContext.getId(), isPrimary,scalingReason,scalingTime);
+
 
                 count++;
             } else {

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl
----------------------------------------------------------------------
diff --git a/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl b/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl
index e6f8f67..3b4a916 100644
--- a/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl
+++ b/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl
@@ -164,6 +164,10 @@ dialect "mvel"
                     boolean partitionsAvailable = true;
                     int count = 0;
 
+                    String autoscalingReason = (numberOfRequiredInstances == numberOfInstancesReuquiredBasedOnRif)?"Scaling up due to RIF, [Predicted Value] "+rifPredictedValue+" [Threshold] "+rifThreshold:(numberOfRequiredInstances== numberOfInstancesReuquiredBasedOnMemoryConsumption)?"Scaling up due to MC, [Predicted Value] "+mcPredictedValue+" [Threshold] "+mcThreshold:"Scaling up due to LA, [Predicted Value] "+laPredictedValue+" [Threshold] "+laThreshold;
+                    autoscalingReason += " [Number of required instances] "+numberOfRequiredInstances+" [Cluster Max Members] "+clusterMaxMembers+" [Additional instances to be created] " + additionalInstances;
+
+
                     while(count != additionalInstances && partitionsAvailable){
 
                         ClusterLevelPartitionContext partitionContext = (ClusterLevelPartitionContext) partitionAlgorithm.getNextScaleUpPartitionContext(clusterInstanceContext.getPartitionCtxtsAsAnArray());
@@ -182,7 +186,8 @@ dialect "mvel"
                                 " [laPredictedValue] " + laPredictedValue + " [laThreshold] " + laThreshold);
 
                             log.debug("[scale-up] " + " [partition] " + partitionContext.getPartitionId() + " [cluster] " + clusterId );
-                            delegator.delegateSpawn(partitionContext, clusterId, clusterInstanceContext.getId(), isPrimary);
+                            long scalingTime = System.currentTimeMillis();
+                            delegator.delegateSpawn(partitionContext, clusterId, clusterInstanceContext.getId(), isPrimary,autoscalingReason,scalingTime);
                             count++;
                         } else {
 


[4/5] stratos git commit: Adding Metering and Monitoring Service Implementation

Posted by ga...@apache.org.
Adding Metering and Monitoring Service Implementation

Conflicts:
	components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
	components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
	components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/54283eda
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/54283eda
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/54283eda

Branch: refs/heads/tenant-isolation
Commit: 54283eda982c5677402b1e4ac895d95c9cccebe7
Parents: 4fdd431
Author: Thanuja <th...@wso2.com>
Authored: Wed Jul 29 18:51:34 2015 +0530
Committer: Thanuja <th...@wso2.com>
Committed: Thu Aug 20 10:58:32 2015 +0530

----------------------------------------------------------------------
 .../client/AutoscalerCloudControllerClient.java |   16 +-
 .../autoscaler/rule/RuleTasksDelegator.java     |   48 +-
 .../publisher/HealthStatisticsNotifier.java     |   10 +-
 .../messaging/topology/TopologyBuilder.java     | 1434 +++++++++---------
 .../impl/CloudControllerServiceImpl.java        |    6 +-
 .../impl/CloudControllerServiceUtil.java        |   15 +-
 .../services/impl/InstanceCreator.java          |   17 +-
 .../publisher/BAMUsageDataPublisher.java        |   44 +-
 .../util/CloudControllerConstants.java          |    4 +
 .../common/constants/StratosConstants.java      |    8 +-
 .../publisher/HealthStatisticsPublisher.java    |    3 +-
 .../publisher/InFlightRequestPublisher.java     |    4 +-
 .../cep/WSO2CEPHealthStatisticsPublisher.java   |    9 +-
 .../cep/WSO2CEPInFlightRequestPublisher.java    |    6 +-
 .../LoadBalancerStatisticsNotifier.java         |    3 +-
 .../publisher/MockHealthStatisticsNotifier.java |    3 +
 .../modules/healthstatspublisher/healthstats.py |    5 +-
 .../HealthStatsEventFormatter.xml               |   30 +
 .../eventformatters/RIFEventFormatter.xml       |   31 +
 .../DASDefaultWSO2EventOutputAdaptor.xml        |   29 +
 .../streamdefinitions/stream-manager-config.xml |  486 +++---
 extensions/das/README.md                        |   10 +
 .../CloudControllerEventReceiver.xml            |   29 +
 .../eventreceivers/HealthStatsEventReceiver.xml |   29 +
 .../eventreceivers/RIFEventReceiver.xml         |   29 +
 .../eventsink/cartridge_agent_health_stats.xml  |   85 ++
 .../artifacts/eventsink/in_flight_requests.xml  |   64 +
 .../org_apache_stratos_cloud_controller.xml     |  211 +++
 .../cartridge_agent_health_stats_1.0.0.json     |   40 +
 .../eventstreams/in_flight_requests_1.0.0.json  |   28 +
 ...g.apache.stratos.cloud.controller_1.0.0.json |  112 ++
 extensions/das/artifacts/sparkscript/CCEvent    |   18 +
 extensions/das/pom.xml                          |   40 +
 extensions/das/spark-udf/pom.xml                |   36 +
 .../das/extension/spark/udf/TimeUDF.java        |   49 +
 extensions/pom.xml                              |    4 +-
 .../src/main/conf/drools/dependent-scaling.drl  |    4 +-
 .../src/main/conf/drools/mincheck.drl           |    5 +-
 .../src/main/conf/drools/scaling.drl            |    7 +-
 39 files changed, 2007 insertions(+), 1004 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java
index 65e952f..f19531b 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java
@@ -84,7 +84,8 @@ public class AutoscalerCloudControllerClient {
     public synchronized MemberContext startInstance(PartitionRef partition,
                                                     String clusterId, String clusterInstanceId,
                                                     String networkPartitionId, boolean isPrimary,
-                                                    int minMemberCount) throws SpawningException {
+                                                    int minMemberCount, String autoscalingReason,
+                                                    long scalingTime) throws SpawningException {
         try {
             if (log.isInfoEnabled()) {
                 log.info(String.format("Trying to spawn an instance via cloud controller: " +
@@ -115,8 +116,18 @@ public class AutoscalerCloudControllerClient {
             minCountProp.setName(StratosConstants.MIN_COUNT);
             minCountProp.setValue(String.valueOf(minMemberCount));
 
+            Property autoscalingReasonProp = new Property();
+            autoscalingReasonProp.setName(StratosConstants.SCALING_REASON);
+            autoscalingReasonProp.setValue(autoscalingReason);
+
+            Property scalingTimeProp = new Property();
+            scalingTimeProp.setName(StratosConstants.SCALING_TIME);
+            scalingTimeProp.setValue(String.valueOf(scalingTime));
+
             memberContextProps.addProperty(isPrimaryProp);
             memberContextProps.addProperty(minCountProp);
+            memberContextProps.addProperty(autoscalingReasonProp);
+            memberContextProps.addProperty(scalingTimeProp);
             instanceContext.setProperties(AutoscalerUtil.toStubProperties(memberContextProps));
 
             long startTime = System.currentTimeMillis();
@@ -228,7 +239,8 @@ public class AutoscalerCloudControllerClient {
     public void terminateAllInstances(String clusterId) throws RemoteException,
             CloudControllerServiceInvalidClusterExceptionException {
         if (log.isInfoEnabled()) {
-            log.info(String.format("Terminating all instances of cluster via cloud controller: [cluster] %s", clusterId));
+            log.info(String.format("Terminating all instances of cluster via cloud controller: " +
+                    "[cluster] %s", clusterId));
         }
         long startTime = System.currentTimeMillis();
         stub.terminateInstances(clusterId);

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
index c2d2be6..5c335b1 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
@@ -49,7 +49,8 @@ public class RuleTasksDelegator {
 
     private static final Log log = LogFactory.getLog(RuleTasksDelegator.class);
 
-    public double getPredictedValueForNextMinute(float average, float gradient, float secondDerivative, int timeInterval) {
+    public double getPredictedValueForNextMinute(float average, float gradient, float secondDerivative,
+                                                 int timeInterval) {
         double predictedValue;
 //        s = u * t + 0.5 * a * t * t
         if (log.isDebugEnabled()) {
@@ -176,19 +177,21 @@ public class RuleTasksDelegator {
      * @param clusterId                      Cluster id
      * @param clusterInstanceId              Instance id
      * @param isPrimary                      Is a primary member
+     * @param autoscalingReason              scaling reason for member
+     * @param scalingTime                    scaling time
      */
     public void delegateSpawn(ClusterLevelPartitionContext clusterMonitorPartitionContext, String clusterId,
-                              String clusterInstanceId, boolean isPrimary) {
+                              String clusterInstanceId, boolean isPrimary, String autoscalingReason, long scalingTime) {
 
         try {
             String nwPartitionId = clusterMonitorPartitionContext.getNetworkPartitionId();
-			String nwPartitionUuid=null;
-	        NetworkPartition[] networkPartitionList= CloudControllerServiceClient.getInstance().getNetworkPartitions();
-	        for(int i=0;i<networkPartitionList.length;i++){
-		        if(networkPartitionList[i].getUuid().equals(nwPartitionId)){
-			        nwPartitionUuid=networkPartitionList[i].getUuid();
-		        }
-	        }
+            String nwPartitionUuid = null;
+            NetworkPartition[] networkPartitionList = CloudControllerServiceClient.getInstance().getNetworkPartitions();
+            for (int i = 0; i < networkPartitionList.length; i++) {
+                if (networkPartitionList[i].getUuid().equals(nwPartitionId)) {
+                    nwPartitionUuid = networkPartitionList[i].getUuid();
+                }
+            }
 
             // Calculate accumulation of minimum counts of all the partition of current network partition
             int minimumCountOfNetworkPartition;
@@ -204,18 +207,18 @@ public class RuleTasksDelegator {
             MemberContext memberContext =
                     AutoscalerCloudControllerClient.getInstance()
                             .startInstance(clusterMonitorPartitionContext.getPartition(),
-                                           clusterId,
-                                           clusterInstanceId,
-                                           nwPartitionUuid,
-                                           isPrimary,
-                                           minimumCountOfNetworkPartition);
+                                    clusterId,
+                                    clusterInstanceId, clusterMonitorPartitionContext.getNetworkPartitionId(),
+                                    isPrimary,
+                                    minimumCountOfNetworkPartition, autoscalingReason, scalingTime);
             if (memberContext != null) {
                 ClusterLevelPartitionContext partitionContext = clusterInstanceContext.
                         getPartitionCtxt(clusterMonitorPartitionContext.getPartitionId());
                 partitionContext.addPendingMember(memberContext);
                 partitionContext.addMemberStatsContext(new MemberStatsContext(memberContext.getMemberId()));
                 if (log.isDebugEnabled()) {
-                    log.debug(String.format("Pending member added, [member] %s [partition] %s", memberContext.getMemberId(),
+                    log.debug(String.format("Pending member added, [member] %s [partition] %s",
+                            memberContext.getMemberId(),
                             memberContext.getPartition().getId()));
                 }
 
@@ -254,13 +257,14 @@ public class RuleTasksDelegator {
         clusterMonitor.sendScalingOverMaxEvent(networkPartitionId, instanceId);
     }
 
-    public void delegateScalingDownBeyondMinNotification(String clusterId, String networkPartitionId, String instanceId) {
+    public void delegateScalingDownBeyondMinNotification(String clusterId, String networkPartitionId,
+                                                         String instanceId) {
         if (log.isDebugEnabled()) {
             log.debug("Scaling down lower min notification is going to the [parentInstance] " + instanceId);
         }
         //Notify parent for checking scaling dependencies
         ClusterMonitor clusterMonitor = AutoscalerContext.getInstance().getClusterMonitor(clusterId);
-	    clusterMonitor.sendScalingDownBeyondMinEvent(networkPartitionId, instanceId);
+        clusterMonitor.sendScalingDownBeyondMinEvent(networkPartitionId, instanceId);
     }
 
     public void delegateTerminate(ClusterLevelPartitionContext clusterMonitorPartitionContext, String memberId) {
@@ -277,8 +281,8 @@ public class RuleTasksDelegator {
                 clusterMonitorPartitionContext.removeMemberStatsContext(memberId);
             } else if (clusterMonitorPartitionContext.pendingMemberAvailable(memberId)) {
 
-                log.info(String.format("[scale-down] Moving pending member to termination pending list [member id] %s " +
-                                "[partition] %s [network partition] %s", memberId,
+                log.info(String.format("[scale-down] Moving pending member to termination pending list " +
+                                "[member id] %s " + "[partition] %s [network partition] %s", memberId,
                         clusterMonitorPartitionContext.getPartitionId(),
                         clusterMonitorPartitionContext.getNetworkPartitionId()));
                 clusterMonitorPartitionContext.movePendingMemberToObsoleteMembers(memberId);
@@ -289,7 +293,8 @@ public class RuleTasksDelegator {
         }
     }
 
-    public void delegateTerminateDependency(ClusterLevelPartitionContext clusterMonitorPartitionContext, String memberId) {
+    public void delegateTerminateDependency(ClusterLevelPartitionContext clusterMonitorPartitionContext,
+                                            String memberId) {
         try {
             //calling SM to send the instance notification event.
             if (log.isDebugEnabled()) {
@@ -374,7 +379,8 @@ public class RuleTasksDelegator {
 
                 float memberMemoryConsumptionAverage = memberStatsContext.getMemoryConsumption().getAverage();
                 float memberMemoryConsumptionGredient = memberStatsContext.getMemoryConsumption().getGradient();
-                float memberMemoryConsumptionSecondDerivative = memberStatsContext.getMemoryConsumption().getSecondDerivative();
+                float memberMemoryConsumptionSecondDerivative =
+                        memberStatsContext.getMemoryConsumption().getSecondDerivative();
 
                 double memberPredictedMemoryConsumption = getPredictedValueForNextMinute(memberMemoryConsumptionAverage,
                         memberMemoryConsumptionGredient, memberMemoryConsumptionSecondDerivative, 1);

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java
index 74c5156..5ab2ebf 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java
@@ -51,7 +51,8 @@ public class HealthStatisticsNotifier implements Runnable {
             File pluginFile = new File(pluginFileName);
             if ((pluginFile != null)
                     && (pluginFile.exists())) {
-                List<Class> pluginClass = PluginLoader.loadPluginClassesFromJar(pluginFile, IHealthStatisticsReader.class);
+                List<Class> pluginClass = PluginLoader.loadPluginClassesFromJar(pluginFile,
+                        IHealthStatisticsReader.class);
                 if (!pluginClass.isEmpty()) {
                     try {
                         log.trace("Instantiating new instance of plugin type " + pluginClass);
@@ -63,7 +64,8 @@ public class HealthStatisticsNotifier implements Runnable {
                     }
                 }
             } else {
-                log.error("Plugin not found or malformed: " + pluginFileName + ((pluginFile == null) ? " NULL" : "Doesn't exist"));
+                log.error("Plugin not found or malformed: " + pluginFileName + ((pluginFile == null) ? " NULL" :
+                        "Doesn't exist"));
             }
         }
         if (this.statsReader == null) {
@@ -95,7 +97,7 @@ public class HealthStatisticsNotifier implements Runnable {
                         if (log.isDebugEnabled()) {
                             log.debug(String.format("Publishing memory consumption: %f", stats.getMemoryUsage()));
                         }
-                        statsPublisher.publish(
+                        statsPublisher.publish(System.currentTimeMillis(),
                                 CartridgeAgentConfiguration.getInstance().getClusterId(),
                                 CartridgeAgentConfiguration.getInstance().getClusterInstanceId(),
                                 CartridgeAgentConfiguration.getInstance().getNetworkPartitionId(),
@@ -108,7 +110,7 @@ public class HealthStatisticsNotifier implements Runnable {
                         if (log.isDebugEnabled()) {
                             log.debug(String.format("Publishing load average: %f", stats.getProcessorUsage()));
                         }
-                        statsPublisher.publish(
+                        statsPublisher.publish(System.currentTimeMillis(),
                                 CartridgeAgentConfiguration.getInstance().getClusterId(),
                                 CartridgeAgentConfiguration.getInstance().getClusterInstanceId(),
                                 CartridgeAgentConfiguration.getInstance().getNetworkPartitionId(),


[2/5] stratos git commit: Adding Metering and Monitoring Service Implementation

Posted by ga...@apache.org.
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java
index 4015a77..adbe294 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java
@@ -65,18 +65,21 @@ public class CloudControllerServiceUtil {
         TopologyBuilder.handleMemberTerminated(memberContext.getCartridgeType(),
                 memberContext.getClusterId(), memberContext.getNetworkPartitionId(),
                 partitionId, memberContext.getMemberId());
-
+        //member terminated time
+        Long timeStamp = System.currentTimeMillis();
         // Publish statistics to BAM
         BAMUsageDataPublisher.publish(memberContext.getMemberId(),
                 partitionId,
                 memberContext.getNetworkPartitionId(),
+                memberContext.getClusterInstanceId(),
                 memberContext.getClusterId(),
                 memberContext.getCartridgeType(),
                 MemberStatus.Terminated.toString(),
-                null);
+                timeStamp, null, null, null);
 
         // Remove member context
-        CloudControllerContext.getInstance().removeMemberContext(memberContext.getClusterId(), memberContext.getMemberId());
+        CloudControllerContext.getInstance().removeMemberContext(memberContext.getClusterId(),
+                memberContext.getMemberId());
 
         // Persist cloud controller context
         CloudControllerContext.getInstance().persist();
@@ -87,7 +90,8 @@ public class CloudControllerServiceUtil {
         return isValid;
     }
 
-    public static IaasProvider validatePartitionAndGetIaasProvider(Partition partition, IaasProvider iaasProvider) throws InvalidPartitionException {
+    public static IaasProvider validatePartitionAndGetIaasProvider(Partition partition, IaasProvider iaasProvider)
+            throws InvalidPartitionException {
         if (iaasProvider != null) {
             // if this is a IaaS based partition
             Iaas iaas = iaasProvider.getIaas();
@@ -104,7 +108,8 @@ public class CloudControllerServiceUtil {
         }
     }
 
-    public static boolean validatePartition(Partition partition, IaasProvider iaasProvider) throws InvalidPartitionException {
+    public static boolean validatePartition(Partition partition, IaasProvider iaasProvider)
+            throws InvalidPartitionException {
         validatePartitionAndGetIaasProvider(partition, iaasProvider);
         return true;
     }

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java
index 34a7c93..02730cf 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java
@@ -27,8 +27,6 @@ import org.apache.stratos.cloud.controller.domain.*;
 import org.apache.stratos.cloud.controller.exception.CartridgeNotFoundException;
 import org.apache.stratos.cloud.controller.iaases.Iaas;
 import org.apache.stratos.cloud.controller.messaging.topology.TopologyBuilder;
-import org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
 
 import java.util.concurrent.locks.Lock;
 
@@ -68,7 +66,8 @@ public class InstanceCreator implements Runnable {
             memberContext = startInstance(iaas, memberContext, payload);
 
             if (log.isInfoEnabled()) {
-                log.info(String.format("Instance started successfully: [cartridge-type] %s [cluster-id] %s [instance-id] %s " +
+                log.info(String.format("Instance started successfully: [cartridge-type] %s [cluster-id] %s " +
+                                "[instance-id] %s " +
                                 "[default-private-ip] %s [default-public-ip] %s",
                         memberContext.getCartridgeType(), memberContext.getClusterId(),
                         memberContext.getInstanceId(), memberContext.getDefaultPrivateIP(),
@@ -85,15 +84,6 @@ public class InstanceCreator implements Runnable {
             // Update topology
             TopologyBuilder.handleMemberInitializedEvent(memberContext);
 
-            // Publish instance creation statistics to BAM
-            BAMUsageDataPublisher.publish(
-                    memberContext.getMemberId(),
-                    memberContext.getPartition().getUuid(),
-                    memberContext.getNetworkPartitionId(),
-                    memberContext.getClusterId(),
-                    memberContext.getCartridgeType(),
-                    MemberStatus.Initialized.toString(),
-                    memberContext.getInstanceMetadata());
         } catch (Exception e) {
             String message = String.format("Could not start instance: [cartridge-type] %s [cluster-id] %s",
                     memberContext.getCartridgeType(), memberContext.getClusterId());
@@ -105,7 +95,8 @@ public class InstanceCreator implements Runnable {
         }
     }
 
-    private MemberContext startInstance(Iaas iaas, MemberContext memberContext, byte[] payload) throws CartridgeNotFoundException {
+    private MemberContext startInstance(Iaas iaas, MemberContext memberContext, byte[] payload) throws
+            CartridgeNotFoundException {
         memberContext = iaas.startInstance(memberContext, payload);
 
         // Validate instance id

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java
index d5aabbd..690bc59 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java
@@ -52,12 +52,31 @@ public class BAMUsageDataPublisher {
     private static StreamDefinition streamDefinition;
     private static final String cloudControllerEventStreamVersion = "1.0.0";
 
+    /**
+     * Publish events to BAM
+     *
+     * @param memberId          member id
+     * @param partitionId       partition id
+     * @param networkId         network partition id
+     * @param clusterId         cluster id
+     * @param clusterInstanceId cluster instance id
+     * @param serviceName       service name
+     * @param status            member status
+     * @param timeStamp         time
+     * @param autoscalingReason scaling reason related to member
+     * @param scalingTime       scaling time
+     * @param metadata          meta-data
+     */
     public static void publish(String memberId,
                                String partitionId,
                                String networkId,
                                String clusterId,
+                               String clusterInstanceId,
                                String serviceName,
                                String status,
+                               Long timeStamp,
+                               String autoscalingReason,
+                               Long scalingTime,
                                InstanceMetadata metadata) {
         if (!CloudControllerConfig.getInstance().isBAMDataPublisherEnabled()) {
             return;
@@ -79,16 +98,23 @@ public class BAMUsageDataPublisher {
         MemberContext memberContext = CloudControllerContext.getInstance().getMemberContextOfMemberId(memberId);
         String cartridgeType = memberContext.getCartridgeType();
         Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType);
+        String instanceType = CloudControllerContext.getInstance().getIaasProviderOfPartition(cartridgeType,
+                partitionId).getProperty(CloudControllerConstants.INSTANCE_TYPE);
 
         //Construct the data to be published
         List<Object> payload = new ArrayList<Object>();
         // Payload values
+        payload.add(timeStamp);
         payload.add(memberId);
         payload.add(serviceName);
         payload.add(clusterId);
+        payload.add(clusterInstanceId);
         payload.add(handleNull(memberContext.getLbClusterId()));
         payload.add(handleNull(partitionId));
         payload.add(handleNull(networkId));
+        payload.add(handleNull(instanceType));
+        payload.add(handleNull(autoscalingReason));
+        payload.add(handleNull(scalingTime));
         if (cartridge != null) {
             payload.add(handleNull(String.valueOf(cartridge.isMultiTenant())));
         } else {
@@ -129,12 +155,14 @@ public class BAMUsageDataPublisher {
 
         try {
             if (log.isDebugEnabled()) {
-                log.debug(String.format("Publishing BAM event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()));
+                log.debug(String.format("Publishing BAM event: [stream] %s [version] %s", streamDefinition.getName(),
+                        streamDefinition.getVersion()));
             }
             dataPublisher.publish(streamDefinition.getName(), streamDefinition.getVersion(), event);
         } catch (AgentException e) {
             if (log.isErrorEnabled()) {
-                log.error(String.format("Could not publish BAM event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()), e);
+                log.error(String.format("Could not publish BAM event: [stream] %s [version] %s",
+                        streamDefinition.getName(), streamDefinition.getVersion()), e);
             }
         }
     }
@@ -151,12 +179,17 @@ public class BAMUsageDataPublisher {
         streamDefinition.setDescription("Instances booted up by the Cloud Controller");
         // Payload definition
         List<Attribute> payloadData = new ArrayList<Attribute>();
+        payloadData.add(new Attribute(CloudControllerConstants.TIME_STAMP, AttributeType.LONG));
         payloadData.add(new Attribute(CloudControllerConstants.MEMBER_ID_COL, AttributeType.STRING));
         payloadData.add(new Attribute(CloudControllerConstants.CARTRIDGE_TYPE_COL, AttributeType.STRING));
         payloadData.add(new Attribute(CloudControllerConstants.CLUSTER_ID_COL, AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.CLUSTER_INSTANCE_ID_COL, AttributeType.STRING));
         payloadData.add(new Attribute(CloudControllerConstants.LB_CLUSTER_ID_COL, AttributeType.STRING));
         payloadData.add(new Attribute(CloudControllerConstants.PARTITION_ID_COL, AttributeType.STRING));
         payloadData.add(new Attribute(CloudControllerConstants.NETWORK_ID_COL, AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.INSTANCE_TYPE, AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.SCALING_REASON, AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.SCALING_TIME, AttributeType.LONG));
         payloadData.add(new Attribute(CloudControllerConstants.IS_MULTI_TENANT_COL, AttributeType.STRING));
         payloadData.add(new Attribute(CloudControllerConstants.IAAS_COL, AttributeType.STRING));
         payloadData.add(new Attribute(CloudControllerConstants.STATUS_COL, AttributeType.STRING));
@@ -210,4 +243,11 @@ public class BAMUsageDataPublisher {
         }
         return val;
     }
+
+    private static Long handleNull(Long val) {
+        if (val == null) {
+            return -1L;
+        }
+        return val;
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
index 5e6115f..2cb0c31 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
@@ -103,6 +103,7 @@ public final class CloudControllerConstants {
     public static final String MEMBER_ID_COL = "memberId";
     public static final String CARTRIDGE_TYPE_COL = "cartridgeType";
     public static final String CLUSTER_ID_COL = "clusterId";
+    public static final String CLUSTER_INSTANCE_ID_COL = "clusterInstanceId";
     public static final String PARTITION_ID_COL = "partitionId";
     public static final String NETWORK_ID_COL = "networkId";
     public static final String ALIAS_COL = "alias";
@@ -122,6 +123,9 @@ public final class CloudControllerConstants {
     public static final String PRIV_IP_COL = "privateIPAddresses";
     public static final String PUB_IP_COL = "publicIPAddresses";
     public static final String ALLOCATE_IP_COL = "allocateIPAddresses";
+    public static final String TIME_STAMP = "timeStamp";
+    public static final String SCALING_REASON = "scalingReason";
+    public static final String SCALING_TIME = "scalingTime";
 
     /**
      * Properties

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java
index 194bd81..55e97d9 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java
@@ -91,7 +91,8 @@ public class StratosConstants {
 
     // metering constants
     public static final String THROTTLING_ALL_ACTION = "all_actions";
-    public static final String THROTTLING_IN_DATA_ACTION = "in_data_action"; //this covers registry capacity + registry bandwidth
+    public static final String THROTTLING_IN_DATA_ACTION =
+            "in_data_action"; //this covers registry capacity + registry bandwidth
     public static final String THROTTLING_OUT_DATA_ACTION = "out_data_action"; //this covers registry bandwidth
     public static final String THROTTLING_ADD_USER_ACTION = "add_user_action";
     public static final String THROTTLING_SERVICE_IN_BANDWIDTH_ACTION = "service_in_bandwith_action";
@@ -158,6 +159,8 @@ public class StratosConstants {
     public static final String MAX_CHECK_DROOL_FILE = "maxcheck.drl";
     public static final String OBSOLETE_CHECK_DROOL_FILE = "obsoletecheck.drl";
     public static final String MIN_COUNT = "MIN_COUNT";
+    public static final String SCALING_REASON = "SCALING_REASON";
+    public static final String SCALING_TIME = "SCALING_TIME";
 
     // Policy and definition related constants
     public static final int PUBLIC_DEFINITION = 0;
@@ -165,7 +168,8 @@ public class StratosConstants {
     // member expiry timeout constants
     public static final String PENDING_MEMBER_EXPIRY_TIMEOUT = "autoscaler.member.pendingMemberExpiryTimeout";
     public static final String OBSOLETED_MEMBER_EXPIRY_TIMEOUT = "autoscaler.member.obsoletedMemberExpiryTimeout";
-    public static final String PENDING_TERMINATION_MEMBER_EXPIRY_TIMEOUT = "autoscaler.member.pendingTerminationMemberExpiryTimeout";
+    public static final String PENDING_TERMINATION_MEMBER_EXPIRY_TIMEOUT =
+            "autoscaler.member.pendingTerminationMemberExpiryTimeout";
 
     public static final String FILTER_VALUE_SEPARATOR = ",";
     public static final String TOPOLOGY_SERVICE_FILTER = "stratos.topology.service.filter";

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
index dd7ddd4..95b04ff 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
@@ -27,6 +27,7 @@ public interface HealthStatisticsPublisher extends StatisticsPublisher {
     /**
      * Publish health statistics to complex event processor.
      *
+     * @param timeStamp          time
      * @param clusterId          Cluster id of the member
      * @param clusterInstanceId  Cluster instance id of the member
      * @param networkPartitionId Network partition id of the member
@@ -35,6 +36,6 @@ public interface HealthStatisticsPublisher extends StatisticsPublisher {
      * @param health             Health type: memory_consumption | load_average
      * @param value              Health type value
      */
-    void publish(String clusterId, String clusterInstanceId, String networkPartitionId,
+    void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId,
                  String memberId, String partitionId, String health, double value);
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
index 289be8b..af9c8e9 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
@@ -27,10 +27,12 @@ public interface InFlightRequestPublisher extends StatisticsPublisher {
     /**
      * Publish in-flight request count.
      *
+     * @param timeStamp            time
      * @param clusterId            Cluster id
      * @param clusterInstanceId    Cluster instance id
      * @param networkPartitionId   Network partition id of the cluster
      * @param inFlightRequestCount In-flight request count of the cluster
      */
-    void publish(String clusterId, String clusterInstanceId, String networkPartitionId, int inFlightRequestCount);
+    void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId,
+                 int inFlightRequestCount);
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
index 1dc4240..d5c9265 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
@@ -52,6 +52,7 @@ public class WSO2CEPHealthStatisticsPublisher extends WSO2CEPStatisticsPublisher
 
             // Set payload definition
             List<Attribute> payloadData = new ArrayList<Attribute>();
+            payloadData.add(new Attribute("time_stamp", AttributeType.LONG));
             payloadData.add(new Attribute("cluster_id", AttributeType.STRING));
             payloadData.add(new Attribute("cluster_instance_id", AttributeType.STRING));
             payloadData.add(new Attribute("network_partition_id", AttributeType.STRING));
@@ -70,6 +71,7 @@ public class WSO2CEPHealthStatisticsPublisher extends WSO2CEPStatisticsPublisher
     /**
      * Publish health statistics to cep.
      *
+     * @param timeStamp
      * @param clusterId
      * @param clusterInstanceId
      * @param networkPartitionId
@@ -79,13 +81,16 @@ public class WSO2CEPHealthStatisticsPublisher extends WSO2CEPStatisticsPublisher
      * @param value
      */
     @Override
-    public void publish(String clusterId, String clusterInstanceId, String networkPartitionId, String memberId, String partitionId, String health, double value) {
+    public void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId,
+                        String memberId, String partitionId, String health, double value) {
         if (log.isDebugEnabled()) {
-            log.debug(String.format("Publishing health statistics: [cluster] %s [network-partition] %s [partition] %s [member] %s [health] %s [value] %f",
+            log.debug(String.format("Publishing health statistics: [cluster] %s [network-partition] %s " +
+                            "[partition] %s [member] %s [health] %s [value] %f",
                     clusterId, networkPartitionId, partitionId, memberId, health, value));
         }
         // Set payload values
         List<Object> payload = new ArrayList<Object>();
+        payload.add(timeStamp);
         payload.add(clusterId);
         payload.add(clusterInstanceId);
         payload.add(networkPartitionId);

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
index 2ed8883..f51eb91 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
@@ -51,6 +51,7 @@ public class WSO2CEPInFlightRequestPublisher extends WSO2CEPStatisticsPublisher
             List<Attribute> payloadData = new ArrayList<Attribute>();
 
             // Set payload definition
+            payloadData.add(new Attribute("time_stamp", AttributeType.LONG));
             payloadData.add(new Attribute("cluster_id", AttributeType.STRING));
             payloadData.add(new Attribute("cluster_instance_id", AttributeType.STRING));
             payloadData.add(new Attribute("network_partition_id", AttributeType.STRING));
@@ -65,15 +66,18 @@ public class WSO2CEPInFlightRequestPublisher extends WSO2CEPStatisticsPublisher
     /**
      * Publish in-flight request count of a cluster.
      *
+     * @param timeStamp
      * @param clusterId
      * @param clusterInstanceId
      * @param networkPartitionId
      * @param inFlightRequestCount
      */
     @Override
-    public void publish(String clusterId, String clusterInstanceId, String networkPartitionId, int inFlightRequestCount) {
+    public void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId,
+                        int inFlightRequestCount) {
         // Set payload values
         List<Object> payload = new ArrayList<Object>();
+        payload.add(timeStamp);
         payload.add(clusterId);
         payload.add(clusterInstanceId);
         payload.add(networkPartitionId);

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
index dc2233d..1dd12c7 100644
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
@@ -81,7 +81,8 @@ public class LoadBalancerStatisticsNotifier implements Runnable {
                         for (Cluster cluster : service.getClusters()) {
                             // Publish in-flight request count of load balancer's network partition
                             int requestCount = statsReader.getInFlightRequestCount(cluster.getClusterId());
-                            inFlightRequestPublisher.publish(cluster.getClusterId(), clusterInstanceId,
+                            inFlightRequestPublisher.publish(System.currentTimeMillis(), cluster.getClusterId(),
+                                    clusterInstanceId,
                                     networkPartitionId, requestCount);
 
                             if (log.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java
index c2d1c6c..0dc5e67 100644
--- a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java
+++ b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java
@@ -69,6 +69,7 @@ public class MockHealthStatisticsNotifier implements Runnable {
                         mockMemberContext.getMemberId(), memoryConsumption));
             }
             healthStatisticsPublisher.publish(
+                    System.currentTimeMillis(),
                     mockMemberContext.getClusterId(),
                     mockMemberContext.getClusterInstanceId(),
                     mockMemberContext.getNetworkPartitionId(),
@@ -93,6 +94,7 @@ public class MockHealthStatisticsNotifier implements Runnable {
                         mockMemberContext.getMemberId(), loadAvereage));
             }
             healthStatisticsPublisher.publish(
+                    System.currentTimeMillis(),
                     mockMemberContext.getClusterId(),
                     mockMemberContext.getClusterInstanceId(),
                     mockMemberContext.getNetworkPartitionId(),
@@ -116,6 +118,7 @@ public class MockHealthStatisticsNotifier implements Runnable {
                         mockMemberContext.getMemberId(), requestsInFlight));
             }
             inFlightRequestPublisher.publish(
+                    System.currentTimeMillis(),
                     mockMemberContext.getClusterId(),
                     mockMemberContext.getClusterInstanceId(),
                     mockMemberContext.getNetworkPartitionId(),

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py
index 9753c3e..aae9e9d 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py
@@ -17,7 +17,7 @@
 
 from threading import Thread
 import multiprocessing
-
+import time
 import psutil
 
 from abstracthealthstatisticspublisher import *
@@ -124,6 +124,7 @@ class HealthStatisticsPublisher:
         stream_def.description = HealthStatisticsPublisherManager.STREAM_DESCRIPTION
 
         # stream_def.add_payloaddata_attribute()
+        stream_def.add_payloaddata_attribute("time_stamp", StreamDefinition.LONG)
         stream_def.add_payloaddata_attribute("cluster_id", StreamDefinition.STRING)
         stream_def.add_payloaddata_attribute("cluster_instance_id", StreamDefinition.STRING)
         stream_def.add_payloaddata_attribute("network_partition_id", StreamDefinition.STRING)
@@ -141,6 +142,7 @@ class HealthStatisticsPublisher:
         """
 
         event = ThriftEvent()
+        event.payloadData.append(int(round(time.time() * 1000)))
         event.payloadData.append(self.cartridge_agent_config.cluster_id)
         event.payloadData.append(self.cartridge_agent_config.cluster_instance_id)
         event.payloadData.append(self.cartridge_agent_config.network_partition_id)
@@ -159,6 +161,7 @@ class HealthStatisticsPublisher:
         """
 
         event = ThriftEvent()
+        event.payloadData.append(int(round(time.time() * 1000)))
         event.payloadData.append(self.cartridge_agent_config.cluster_id)
         event.payloadData.append(self.cartridge_agent_config.cluster_instance_id)
         event.payloadData.append(self.cartridge_agent_config.network_partition_id)

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/cep/artifacts/eventformatters/HealthStatsEventFormatter.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/artifacts/eventformatters/HealthStatsEventFormatter.xml b/extensions/cep/artifacts/eventformatters/HealthStatsEventFormatter.xml
new file mode 100644
index 0000000..bcef15f
--- /dev/null
+++ b/extensions/cep/artifacts/eventformatters/HealthStatsEventFormatter.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<eventFormatter name="HealthStatsEventFormatter"
+                statistics="disable" trace="enable" xmlns="http://wso2.org/carbon/eventformatter">
+    <from streamName="cartridge_agent_health_stats" version="1.0.0"/>
+    <mapping customMapping="disable" type="wso2event"/>
+    <to eventAdaptorName="DASDefaultWSO2EventOutputAdaptor" eventAdaptorType="wso2event">
+        <property name="stream">cartridge_agent_health_stats</property>
+        <property name="version">1.0.0</property>
+    </to>
+</eventFormatter>

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/cep/artifacts/eventformatters/RIFEventFormatter.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/artifacts/eventformatters/RIFEventFormatter.xml b/extensions/cep/artifacts/eventformatters/RIFEventFormatter.xml
new file mode 100644
index 0000000..3cfd4a9
--- /dev/null
+++ b/extensions/cep/artifacts/eventformatters/RIFEventFormatter.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<eventFormatter name="RIFEventFormatter" statistics="disable"
+                trace="enable" xmlns="http://wso2.org/carbon/eventformatter">
+    <from streamName="in_flight_requests" version="1.0.0"/>
+    <mapping customMapping="disable" type="wso2event"/>
+    <to eventAdaptorName="DASDefaultWSO2EventOutputAdaptor" eventAdaptorType="wso2event">
+        <property name="stream">in_flight_requests</property>
+        <property name="version">1.0.0</property>
+    </to>
+</eventFormatter>
+

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/cep/artifacts/outputeventadaptors/DASDefaultWSO2EventOutputAdaptor.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/artifacts/outputeventadaptors/DASDefaultWSO2EventOutputAdaptor.xml b/extensions/cep/artifacts/outputeventadaptors/DASDefaultWSO2EventOutputAdaptor.xml
new file mode 100755
index 0000000..5cec300
--- /dev/null
+++ b/extensions/cep/artifacts/outputeventadaptors/DASDefaultWSO2EventOutputAdaptor.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<outputEventAdaptor name="DASDefaultWSO2EventOutputAdaptor"
+                    statistics="disable" trace="disable" type="wso2event"
+                    xmlns="http://wso2.org/carbon/eventadaptormanager">
+    <property name="username">admin</property>
+    <property name="receiverURL">tcp://localhost:7612</property>
+    <property name="password">admin</property>
+    <property name="authenticatorURL">ssl://localhost:7712</property>
+</outputEventAdaptor>

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml b/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml
index 4c4c7e0..a256770 100644
--- a/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml
+++ b/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml
@@ -23,287 +23,289 @@
 <streamManagerConfiguration xmlns="http://wso2.org/carbon/streammanager">
     <!-- in-flight requests stream definitions start -->
     <streamDefinition name="in_flight_requests" version="1.0.0">
-         <description>in-flight request count</description>
-         <nickName>in-flight requests</nickName>
-         <metaData>
-         </metaData>
-         <correlationData>
-         </correlationData>
-         <payloadData>
-             <property name="cluster_id" type="String"/>
-             <property name="cluster_instance_id" type="String"/>
-             <property name="network_partition_id" type="String"/>
-             <property name="in_flight_request_count" type="double"/>
-         </payloadData>
-     </streamDefinition> 
+        <description>in-flight request count</description>
+        <nickName>in-flight requests</nickName>
+        <metaData>
+        </metaData>
+        <correlationData>
+        </correlationData>
+        <payloadData>
+            <property name="time_stamp" type="long"/>
+            <property name="cluster_id" type="String"/>
+            <property name="cluster_instance_id" type="String"/>
+            <property name="network_partition_id" type="String"/>
+            <property name="in_flight_request_count" type="double"/>
+        </payloadData>
+    </streamDefinition>
 
     <streamDefinition name="gradient_in_flight_requests" version="1.0.0">
-         <description>gradient of in flight request count</description>
-         <nickName>gradient in flight requests</nickName>
-         <metaData>
-         </metaData>
-         <correlationData>
-         </correlationData>
-         <payloadData>
-             <property name="cluster_id" type="String"/>
-             <property name="cluster_instance_id" type="String"/>
-             <property name="network_partition_id" type="String"/>
-             <property name="count" type="double"/>
-         </payloadData>
-     </streamDefinition>
+        <description>gradient of in flight request count</description>
+        <nickName>gradient in flight requests</nickName>
+        <metaData>
+        </metaData>
+        <correlationData>
+        </correlationData>
+        <payloadData>
+            <property name="cluster_id" type="String"/>
+            <property name="cluster_instance_id" type="String"/>
+            <property name="network_partition_id" type="String"/>
+            <property name="count" type="double"/>
+        </payloadData>
+    </streamDefinition>
 
     <streamDefinition name="average_in_flight_requests" version="1.0.0">
-         <description>average of in-flight request count</description>
-         <nickName>average in-flight requests</nickName>
-         <metaData>
-         </metaData>
-         <correlationData>
-         </correlationData>
-         <payloadData>
-             <property name="cluster_id" type="String"/>
-             <property name="cluster_instance_id" type="String"/>
-             <property name="network_partition_id" type="String"/>
-             <property name="count" type="double"/>
-         </payloadData>
-     </streamDefinition>
+        <description>average of in-flight request count</description>
+        <nickName>average in-flight requests</nickName>
+        <metaData>
+        </metaData>
+        <correlationData>
+        </correlationData>
+        <payloadData>
+            <property name="cluster_id" type="String"/>
+            <property name="cluster_instance_id" type="String"/>
+            <property name="network_partition_id" type="String"/>
+            <property name="count" type="double"/>
+        </payloadData>
+    </streamDefinition>
 
     <streamDefinition name="second_derivative_in_flight_requests" version="1.0.0">
-         <description>second derivative of in-flight request count</description>
-         <nickName>second derivative in-flight requests</nickName>
-         <metaData>
-         </metaData>
-         <correlationData>
-         </correlationData>
-         <payloadData>
-             <property name="cluster_id" type="String"/>
-             <property name="cluster_instance_id" type="String"/>
-             <property name="network_partition_id" type="String"/>
-             <property name="count" type="double"/>
-         </payloadData>
+        <description>second derivative of in-flight request count</description>
+        <nickName>second derivative in-flight requests</nickName>
+        <metaData>
+        </metaData>
+        <correlationData>
+        </correlationData>
+        <payloadData>
+            <property name="cluster_id" type="String"/>
+            <property name="cluster_instance_id" type="String"/>
+            <property name="network_partition_id" type="String"/>
+            <property name="count" type="double"/>
+        </payloadData>
     </streamDefinition>
     <!-- in-flight requests stream definitions end -->
 
     <!-- cartridge agent health stats stream definitions start -->
     <streamDefinition name="cartridge_agent_health_stats" version="1.0.0">
-         <description>agent health stats</description>
-         <nickName>agent health stats</nickName>
-         <metaData>
-         </metaData>
-         <correlationData>
-         </correlationData>
-         <payloadData>
-	         <property name="cluster_id" type="String" />
-             <property name="cluster_instance_id" type="String"/>
-             <property name="network_partition_id" type="String"/>
-             <property name="member_id" type="String" />
-             <property name="partition_id" type="String" />
-             <property name="health_description" type="String"/>
-             <property name="value" type="double"/>
-         </payloadData>
-     </streamDefinition>
+        <description>agent health stats</description>
+        <nickName>agent health stats</nickName>
+        <metaData>
+        </metaData>
+        <correlationData>
+        </correlationData>
+        <payloadData>
+            <property name="time_stamp" type="long"/>
+            <property name="cluster_id" type="String"/>
+            <property name="cluster_instance_id" type="String"/>
+            <property name="network_partition_id" type="String"/>
+            <property name="member_id" type="String"/>
+            <property name="partition_id" type="String"/>
+            <property name="health_description" type="String"/>
+            <property name="value" type="double"/>
+        </payloadData>
+    </streamDefinition>
 
     <streamDefinition name="average_load_average_stats" version="1.0.0">
-         <description>average load average stats</description>
-         <nickName>average load average stats</nickName>
-         <metaData>
-         </metaData>
-         <correlationData>
-         </correlationData>
-         <payloadData>
-	     <property name="cluster_id" type="String" />
-             <property name="cluster_instance_id" type="String"/>
-             <property name="network_partition_id" type="String"/>
-             <property name="average_load_average" type="double"/>
-         </payloadData>
-     </streamDefinition>
+        <description>average load average stats</description>
+        <nickName>average load average stats</nickName>
+        <metaData>
+        </metaData>
+        <correlationData>
+        </correlationData>
+        <payloadData>
+            <property name="cluster_id" type="String"/>
+            <property name="cluster_instance_id" type="String"/>
+            <property name="network_partition_id" type="String"/>
+            <property name="average_load_average" type="double"/>
+        </payloadData>
+    </streamDefinition>
 
     <streamDefinition name="average_memory_consumption_stats" version="1.0.0">
-         <description>average memory consumption stats</description>
-         <nickName>average memory consumption stats</nickName>
-         <metaData>
-         </metaData>
-         <correlationData>
-         </correlationData>
-         <payloadData>
-             <property name="cluster_id" type="String"/>
-             <property name="cluster_instance_id" type="String"/>
-             <property name="network_partition_id" type="String"/>
-             <property name="average_memory_consumption" type="double"/>
-         </payloadData>
-     </streamDefinition>
+        <description>average memory consumption stats</description>
+        <nickName>average memory consumption stats</nickName>
+        <metaData>
+        </metaData>
+        <correlationData>
+        </correlationData>
+        <payloadData>
+            <property name="cluster_id" type="String"/>
+            <property name="cluster_instance_id" type="String"/>
+            <property name="network_partition_id" type="String"/>
+            <property name="average_memory_consumption" type="double"/>
+        </payloadData>
+    </streamDefinition>
 
     <streamDefinition name="gradient_load_average_stats" version="1.0.0">
-         <description>gradient load average stats</description>
-         <nickName>gradient load average stats</nickName>
-         <metaData>
-         </metaData>
-         <correlationData>
-         </correlationData>
-         <payloadData>
-	     <property name="cluster_id" type="String" />
-             <property name="cluster_instance_id" type="String"/>
-             <property name="network_partition_id" type="String"/>
-             <property name="gradient_load_average" type="double"/>
-         </payloadData>
-     </streamDefinition>
+        <description>gradient load average stats</description>
+        <nickName>gradient load average stats</nickName>
+        <metaData>
+        </metaData>
+        <correlationData>
+        </correlationData>
+        <payloadData>
+            <property name="cluster_id" type="String"/>
+            <property name="cluster_instance_id" type="String"/>
+            <property name="network_partition_id" type="String"/>
+            <property name="gradient_load_average" type="double"/>
+        </payloadData>
+    </streamDefinition>
 
     <streamDefinition name="gradient_memory_consumption_stats" version="1.0.0">
-         <description>gradient memoryconsumption stats</description>
-         <nickName>gradient memoryconsumption stats</nickName>
-         <metaData>
-         </metaData>
-         <correlationData>
-         </correlationData>
-         <payloadData>
-	     <property name="cluster_id" type="String" />
-             <property name="cluster_instance_id" type="String"/>
-             <property name="network_partition_id" type="String"/>
-             <property name="gradient_memory_consumption" type="double"/>
-         </payloadData>
-     </streamDefinition>
+        <description>gradient memoryconsumption stats</description>
+        <nickName>gradient memoryconsumption stats</nickName>
+        <metaData>
+        </metaData>
+        <correlationData>
+        </correlationData>
+        <payloadData>
+            <property name="cluster_id" type="String"/>
+            <property name="cluster_instance_id" type="String"/>
+            <property name="network_partition_id" type="String"/>
+            <property name="gradient_memory_consumption" type="double"/>
+        </payloadData>
+    </streamDefinition>
 
     <streamDefinition name="second_derivative_memory_consumption_stats" version="1.0.0">
-         <description>second derivative memory consumption stats</description>
-         <nickName>second derivative memory consumption stats</nickName>
-         <metaData>
-         </metaData>
-         <correlationData>
-         </correlationData>
-         <payloadData>
-	     <property name="cluster_id" type="String" />
-             <property name="cluster_instance_id" type="String"/>
-             <property name="network_partition_id" type="String"/>
-             <property name="second_derivative_memory_consumption" type="double"/>
-         </payloadData>
-     </streamDefinition>
+        <description>second derivative memory consumption stats</description>
+        <nickName>second derivative memory consumption stats</nickName>
+        <metaData>
+        </metaData>
+        <correlationData>
+        </correlationData>
+        <payloadData>
+            <property name="cluster_id" type="String"/>
+            <property name="cluster_instance_id" type="String"/>
+            <property name="network_partition_id" type="String"/>
+            <property name="second_derivative_memory_consumption" type="double"/>
+        </payloadData>
+    </streamDefinition>
 
     <streamDefinition name="second_derivative_load_average_stats" version="1.0.0">
-         <description>second derivative load average stats</description>
-         <nickName>second derivative load average stats</nickName>
-         <metaData>
-         </metaData>
-         <correlationData>
-         </correlationData>
-         <payloadData>
-	     <property name="cluster_id" type="String" />
-             <property name="cluster_instance_id" type="String"/>
-             <property name="network_partition_id" type="String"/>
-             <property name="second_derivative_load_average" type="double"/>
-         </payloadData>
-     </streamDefinition>
+        <description>second derivative load average stats</description>
+        <nickName>second derivative load average stats</nickName>
+        <metaData>
+        </metaData>
+        <correlationData>
+        </correlationData>
+        <payloadData>
+            <property name="cluster_id" type="String"/>
+            <property name="cluster_instance_id" type="String"/>
+            <property name="network_partition_id" type="String"/>
+            <property name="second_derivative_load_average" type="double"/>
+        </payloadData>
+    </streamDefinition>
 
     <streamDefinition name="fault_message" version="1.0.0">
-         <description>fault message</description>
-         <nickName>fault message</nickName>
-         <metaData>
-         </metaData>
-         <correlationData>
-         </correlationData>
-         <payloadData>
-             <property name="cluster_id" type="String"/>
-             <property name="cluster_instance_id" type="String"/>
-             <property name="network_partition_id" type="String"/>
-             <property name="member_id" type="String"/>
-             <property name="partition_id" type="String"/>
-         </payloadData>
+        <description>fault message</description>
+        <nickName>fault message</nickName>
+        <metaData>
+        </metaData>
+        <correlationData>
+        </correlationData>
+        <payloadData>
+            <property name="cluster_id" type="String"/>
+            <property name="cluster_instance_id" type="String"/>
+            <property name="network_partition_id" type="String"/>
+            <property name="member_id" type="String"/>
+            <property name="partition_id" type="String"/>
+        </payloadData>
     </streamDefinition>
     <!-- cartridge agent health stats stream definitions end -->
 
     <!-- This is for member_id wise grouping-->
     <streamDefinition name="member_average_load_average_stats" version="1.0.0">
-         <description>average load average stats</description>
-         <nickName>average load average stats</nickName>
-         <metaData>
-         </metaData>
-         <correlationData>
-         </correlationData>
-         <payloadData>
-             <property name="member_id" type="String" />
-             <property name="cluster_id" type="String" />
-             <property name="cluster_instance_id" type="String"/>
-             <property name="network_partition_id" type="String"/>
-             <property name="member_average_load_average" type="double"/>
-         </payloadData>
-     </streamDefinition>
+        <description>average load average stats</description>
+        <nickName>average load average stats</nickName>
+        <metaData>
+        </metaData>
+        <correlationData>
+        </correlationData>
+        <payloadData>
+            <property name="member_id" type="String"/>
+            <property name="cluster_id" type="String"/>
+            <property name="cluster_instance_id" type="String"/>
+            <property name="network_partition_id" type="String"/>
+            <property name="member_average_load_average" type="double"/>
+        </payloadData>
+    </streamDefinition>
 
     <streamDefinition name="member_average_memory_consumption_stats" version="1.0.0">
-         <description>average memory consumption stats</description>
-         <nickName>average memory consumption stats</nickName>
-         <metaData>
-         </metaData>
-         <correlationData>
-         </correlationData>
-         <payloadData>
-             <property name="member_id" type="String"/>
-             <property name="cluster_id" type="String"/>
-             <property name="cluster_instance_id" type="String"/>
-             <property name="network_partition_id" type="String"/>
-             <property name="member_average_memory_consumption" type="double"/>
-         </payloadData>
-     </streamDefinition>
+        <description>average memory consumption stats</description>
+        <nickName>average memory consumption stats</nickName>
+        <metaData>
+        </metaData>
+        <correlationData>
+        </correlationData>
+        <payloadData>
+            <property name="member_id" type="String"/>
+            <property name="cluster_id" type="String"/>
+            <property name="cluster_instance_id" type="String"/>
+            <property name="network_partition_id" type="String"/>
+            <property name="member_average_memory_consumption" type="double"/>
+        </payloadData>
+    </streamDefinition>
 
     <streamDefinition name="member_gradient_load_average_stats" version="1.0.0">
-         <description>gradient load average stats</description>
-         <nickName>gradient load average stats</nickName>
-         <metaData>
-         </metaData>
-         <correlationData>
-         </correlationData>
-         <payloadData>
-             <property name="member_id" type="String" />
-             <property name="cluster_id" type="String" />
-             <property name="cluster_instance_id" type="String"/>
-             <property name="network_partition_id" type="String"/>
-             <property name="member_gradient_load_average" type="double"/>
-         </payloadData>
-     </streamDefinition>
+        <description>gradient load average stats</description>
+        <nickName>gradient load average stats</nickName>
+        <metaData>
+        </metaData>
+        <correlationData>
+        </correlationData>
+        <payloadData>
+            <property name="member_id" type="String"/>
+            <property name="cluster_id" type="String"/>
+            <property name="cluster_instance_id" type="String"/>
+            <property name="network_partition_id" type="String"/>
+            <property name="member_gradient_load_average" type="double"/>
+        </payloadData>
+    </streamDefinition>
 
     <streamDefinition name="member_gradient_memory_consumption_stats" version="1.0.0">
-         <description>gradient memoryconsumption stats</description>
-         <nickName>gradient memoryconsumption stats</nickName>
-         <metaData>
-         </metaData>
-         <correlationData>
-         </correlationData>
-         <payloadData>
-             <property name="member_id" type="String" />
-             <property name="cluster_id" type="String" />
-             <property name="cluster_instance_id" type="String"/>
-             <property name="network_partition_id" type="String"/>
-             <property name="member_gradient_memory_consumption" type="double"/>
-         </payloadData>
-     </streamDefinition>
+        <description>gradient memoryconsumption stats</description>
+        <nickName>gradient memoryconsumption stats</nickName>
+        <metaData>
+        </metaData>
+        <correlationData>
+        </correlationData>
+        <payloadData>
+            <property name="member_id" type="String"/>
+            <property name="cluster_id" type="String"/>
+            <property name="cluster_instance_id" type="String"/>
+            <property name="network_partition_id" type="String"/>
+            <property name="member_gradient_memory_consumption" type="double"/>
+        </payloadData>
+    </streamDefinition>
 
     <streamDefinition name="member_second_derivative_memory_consumption_stats" version="1.0.0">
-         <description>second derivative memory consumption stats</description>
-         <nickName>second derivative memory consumption stats</nickName>
-         <metaData>
-         </metaData>
-         <correlationData>
-         </correlationData>
-         <payloadData>
-             <property name="member_id" type="String" />
-             <property name="cluster_id" type="String" />
-             <property name="cluster_instance_id" type="String"/>
-             <property name="network_partition_id" type="String"/>
-             <property name="member_second_derivative_memory_consumption" type="double"/>
-         </payloadData>
-     </streamDefinition>
+        <description>second derivative memory consumption stats</description>
+        <nickName>second derivative memory consumption stats</nickName>
+        <metaData>
+        </metaData>
+        <correlationData>
+        </correlationData>
+        <payloadData>
+            <property name="member_id" type="String"/>
+            <property name="cluster_id" type="String"/>
+            <property name="cluster_instance_id" type="String"/>
+            <property name="network_partition_id" type="String"/>
+            <property name="member_second_derivative_memory_consumption" type="double"/>
+        </payloadData>
+    </streamDefinition>
 
     <streamDefinition name="member_second_derivative_load_average_stats" version="1.0.0">
-         <description>second derivative load average stats</description>
-         <nickName>second derivative load average stats</nickName>
-         <metaData>
-         </metaData>
-         <correlationData>
-         </correlationData>
-         <payloadData>
-             <property name="member_id" type="String" />
-             <property name="cluster_id" type="String" />
-             <property name="cluster_instance_id" type="String"/>
-             <property name="network_partition_id" type="String"/>
-             <property name="member_second_derivative_load_average" type="double"/>
-         </payloadData>
-     </streamDefinition>
+        <description>second derivative load average stats</description>
+        <nickName>second derivative load average stats</nickName>
+        <metaData>
+        </metaData>
+        <correlationData>
+        </correlationData>
+        <payloadData>
+            <property name="member_id" type="String"/>
+            <property name="cluster_id" type="String"/>
+            <property name="cluster_instance_id" type="String"/>
+            <property name="network_partition_id" type="String"/>
+            <property name="member_second_derivative_load_average" type="double"/>
+        </payloadData>
+    </streamDefinition>
 
 </streamManagerConfiguration>

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/README.md
----------------------------------------------------------------------
diff --git a/extensions/das/README.md b/extensions/das/README.md
new file mode 100644
index 0000000..00be297
--- /dev/null
+++ b/extensions/das/README.md
@@ -0,0 +1,10 @@
+# Apache Stratos DAS Extensions
+
+Apache Stratos Data Analytics Server (DAS) extensions include DAS artifacts and spark udf to run spark script.
+These extensions need to be deployed manually when running DAS externally.
+
+Please refer below link for more information on WSO2 DAS.
+https://docs.wso2.com/display/DAS300/WSO2+Data+Analytics+Server+Documentation
+
+Thank you for using Apache Stratos!
+The Stratos Team
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/artifacts/eventreceivers/CloudControllerEventReceiver.xml
----------------------------------------------------------------------
diff --git a/extensions/das/artifacts/eventreceivers/CloudControllerEventReceiver.xml b/extensions/das/artifacts/eventreceivers/CloudControllerEventReceiver.xml
new file mode 100644
index 0000000..05789db
--- /dev/null
+++ b/extensions/das/artifacts/eventreceivers/CloudControllerEventReceiver.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<eventReceiver name="CloudControllerEventReceiver" statistics="disable"
+               trace="enable" xmlns="http://wso2.org/carbon/eventreceiver">
+    <from eventAdapterType="wso2event">
+        <property name="events.duplicated.in.cluster">false</property>
+    </from>
+    <mapping customMapping="disable" type="wso2event"/>
+    <to streamName="org.apache.stratos.cloud.controller" version="1.0.0"/>
+</eventReceiver>

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/artifacts/eventreceivers/HealthStatsEventReceiver.xml
----------------------------------------------------------------------
diff --git a/extensions/das/artifacts/eventreceivers/HealthStatsEventReceiver.xml b/extensions/das/artifacts/eventreceivers/HealthStatsEventReceiver.xml
new file mode 100644
index 0000000..7e0a5ce
--- /dev/null
+++ b/extensions/das/artifacts/eventreceivers/HealthStatsEventReceiver.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<eventReceiver name="HealthStatsEventReceiver" statistics="disable"
+               trace="enable" xmlns="http://wso2.org/carbon/eventreceiver">
+    <from eventAdapterType="wso2event">
+        <property name="events.duplicated.in.cluster">false</property>
+    </from>
+    <mapping customMapping="disable" type="wso2event"/>
+    <to streamName="cartridge_agent_health_stats" version="1.0.0"/>
+</eventReceiver>

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/artifacts/eventreceivers/RIFEventReceiver.xml
----------------------------------------------------------------------
diff --git a/extensions/das/artifacts/eventreceivers/RIFEventReceiver.xml b/extensions/das/artifacts/eventreceivers/RIFEventReceiver.xml
new file mode 100644
index 0000000..b11c016
--- /dev/null
+++ b/extensions/das/artifacts/eventreceivers/RIFEventReceiver.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<eventReceiver name="RIFEventReceiver" statistics="disable"
+               trace="enable" xmlns="http://wso2.org/carbon/eventreceiver">
+    <from eventAdapterType="wso2event">
+        <property name="events.duplicated.in.cluster">false</property>
+    </from>
+    <mapping customMapping="disable" type="wso2event"/>
+    <to streamName="in_flight_requests" version="1.0.0"/>
+</eventReceiver>

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/artifacts/eventsink/cartridge_agent_health_stats.xml
----------------------------------------------------------------------
diff --git a/extensions/das/artifacts/eventsink/cartridge_agent_health_stats.xml b/extensions/das/artifacts/eventsink/cartridge_agent_health_stats.xml
new file mode 100644
index 0000000..b870bc2
--- /dev/null
+++ b/extensions/das/artifacts/eventsink/cartridge_agent_health_stats.xml
@@ -0,0 +1,85 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<EventStoreConfiguration>
+    <TableSchema>
+        <ColumnDefinition>
+            <Name>time_stamp</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>LONG</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>cluster_id</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>cluster_instance_id</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>network_partition_id</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>member_id</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>partition_id</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>health_description</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>value</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>DOUBLE</Type>
+        </ColumnDefinition>
+    </TableSchema>
+    <Source>
+        <StreamId>cartridge_agent_health_stats:1.0.0</StreamId>
+    </Source>
+    <RecordStoreName>EVENT_STORE</RecordStoreName>
+</EventStoreConfiguration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/artifacts/eventsink/in_flight_requests.xml
----------------------------------------------------------------------
diff --git a/extensions/das/artifacts/eventsink/in_flight_requests.xml b/extensions/das/artifacts/eventsink/in_flight_requests.xml
new file mode 100644
index 0000000..d4ca48b
--- /dev/null
+++ b/extensions/das/artifacts/eventsink/in_flight_requests.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<EventStoreConfiguration>
+    <TableSchema>
+        <ColumnDefinition>
+            <Name>time_stamp</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>LONG</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>cluster_id</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>cluster_instance_id</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>network_partition_id</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>in_flight_request_count</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>DOUBLE</Type>
+        </ColumnDefinition>
+    </TableSchema>
+    <Source>
+        <StreamId>in_flight_requests:1.0.0</StreamId>
+    </Source>
+    <RecordStoreName>EVENT_STORE</RecordStoreName>
+</EventStoreConfiguration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/artifacts/eventsink/org_apache_stratos_cloud_controller.xml
----------------------------------------------------------------------
diff --git a/extensions/das/artifacts/eventsink/org_apache_stratos_cloud_controller.xml b/extensions/das/artifacts/eventsink/org_apache_stratos_cloud_controller.xml
new file mode 100644
index 0000000..f0dae09
--- /dev/null
+++ b/extensions/das/artifacts/eventsink/org_apache_stratos_cloud_controller.xml
@@ -0,0 +1,211 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<EventStoreConfiguration>
+    <TableSchema>
+        <ColumnDefinition>
+            <Name>timeStamp</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>LONG</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>memberId</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>cartridgeType</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>clusterId</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>clusterInstanceId</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>lbclusterId</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>partitionId</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>networkId</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>instanceType</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>scalingReason</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>scalingTime</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>LONG</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>isMultiTenant</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>iaas</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>status</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>hostName</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>hypervisor</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>ram</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>imageId</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>loginPort</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>INTEGER</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>osName</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>osVersion</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>osArch</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>is64bitOS</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>privateIPAddresses</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>publicIPAddresses</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+        <ColumnDefinition>
+            <Name>allocateIPAddresses</Name>
+            <EnableIndexing>false</EnableIndexing>
+            <IsPrimaryKey>false</IsPrimaryKey>
+            <EnableScoreParam>false</EnableScoreParam>
+            <Type>STRING</Type>
+        </ColumnDefinition>
+    </TableSchema>
+    <Source>
+        <StreamId>org.apache.stratos.cloud.controller:1.0.0</StreamId>
+    </Source>
+    <RecordStoreName>EVENT_STORE</RecordStoreName>
+</EventStoreConfiguration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/artifacts/eventstreams/cartridge_agent_health_stats_1.0.0.json
----------------------------------------------------------------------
diff --git a/extensions/das/artifacts/eventstreams/cartridge_agent_health_stats_1.0.0.json b/extensions/das/artifacts/eventstreams/cartridge_agent_health_stats_1.0.0.json
new file mode 100644
index 0000000..ec61229
--- /dev/null
+++ b/extensions/das/artifacts/eventstreams/cartridge_agent_health_stats_1.0.0.json
@@ -0,0 +1,40 @@
+{
+  "name": "cartridge_agent_health_stats",
+  "version": "1.0.0",
+  "nickName": "",
+  "description": "",
+  "payloadData": [
+    {
+      "name": "time_stamp",
+      "type": "LONG"
+    },
+    {
+      "name": "cluster_id",
+      "type": "STRING"
+    },
+    {
+      "name": "cluster_instance_id",
+      "type": "STRING"
+    },
+    {
+      "name": "network_partition_id",
+      "type": "STRING"
+    },
+    {
+      "name": "member_id",
+      "type": "STRING"
+    },
+    {
+      "name": "partition_id",
+      "type": "STRING"
+    },
+    {
+      "name": "health_description",
+      "type": "STRING"
+    },
+    {
+      "name": "value",
+      "type": "DOUBLE"
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/artifacts/eventstreams/in_flight_requests_1.0.0.json
----------------------------------------------------------------------
diff --git a/extensions/das/artifacts/eventstreams/in_flight_requests_1.0.0.json b/extensions/das/artifacts/eventstreams/in_flight_requests_1.0.0.json
new file mode 100644
index 0000000..8c5232a
--- /dev/null
+++ b/extensions/das/artifacts/eventstreams/in_flight_requests_1.0.0.json
@@ -0,0 +1,28 @@
+{
+  "name": "in_flight_requests",
+  "version": "1.0.0",
+  "nickName": "",
+  "description": "",
+  "payloadData": [
+    {
+      "name": "time_stamp",
+      "type": "LONG"
+    },
+    {
+      "name": "cluster_id",
+      "type": "STRING"
+    },
+    {
+      "name": "cluster_instance_id",
+      "type": "STRING"
+    },
+    {
+      "name": "network_partition_id",
+      "type": "STRING"
+    },
+    {
+      "name": "in_flight_request_count",
+      "type": "DOUBLE"
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/artifacts/eventstreams/org.apache.stratos.cloud.controller_1.0.0.json
----------------------------------------------------------------------
diff --git a/extensions/das/artifacts/eventstreams/org.apache.stratos.cloud.controller_1.0.0.json b/extensions/das/artifacts/eventstreams/org.apache.stratos.cloud.controller_1.0.0.json
new file mode 100644
index 0000000..de1025f
--- /dev/null
+++ b/extensions/das/artifacts/eventstreams/org.apache.stratos.cloud.controller_1.0.0.json
@@ -0,0 +1,112 @@
+{
+  "name": "org.apache.stratos.cloud.controller",
+  "version": "1.0.0",
+  "nickName": "cloud.controller",
+  "description": "Instances booted up by the Cloud Controller",
+  "payloadData": [
+    {
+      "name": "timeStamp",
+      "type": "LONG"
+    },
+    {
+      "name": "memberId",
+      "type": "STRING"
+    },
+    {
+      "name": "cartridgeType",
+      "type": "STRING"
+    },
+    {
+      "name": "clusterId",
+      "type": "STRING"
+    },
+    {
+      "name": "clusterInstanceId",
+      "type": "STRING"
+    },
+    {
+      "name": "lbclusterId",
+      "type": "STRING"
+    },
+    {
+      "name": "partitionId",
+      "type": "STRING"
+    },
+    {
+      "name": "networkId",
+      "type": "STRING"
+    },
+    {
+      "name": "instanceType",
+      "type": "STRING"
+    },
+    {
+      "name": "scalingReason",
+      "type": "STRING"
+    },
+    {
+      "name": "scalingTime",
+      "type": "LONG"
+    },
+    {
+      "name": "isMultiTenant",
+      "type": "STRING"
+    },
+    {
+      "name": "iaas",
+      "type": "STRING"
+    },
+    {
+      "name": "status",
+      "type": "STRING"
+    },
+    {
+      "name": "hostName",
+      "type": "STRING"
+    },
+    {
+      "name": "hypervisor",
+      "type": "STRING"
+    },
+    {
+      "name": "ram",
+      "type": "STRING"
+    },
+    {
+      "name": "imageId",
+      "type": "STRING"
+    },
+    {
+      "name": "loginPort",
+      "type": "INT"
+    },
+    {
+      "name": "osName",
+      "type": "STRING"
+    },
+    {
+      "name": "osVersion",
+      "type": "STRING"
+    },
+    {
+      "name": "osArch",
+      "type": "STRING"
+    },
+    {
+      "name": "is64bitOS",
+      "type": "STRING"
+    },
+    {
+      "name": "privateIPAddresses",
+      "type": "STRING"
+    },
+    {
+      "name": "publicIPAddresses",
+      "type": "STRING"
+    },
+    {
+      "name": "allocateIPAddresses",
+      "type": "STRING"
+    }
+  ]
+}
\ No newline at end of file


[5/5] stratos git commit: Refactoring Monitoring Service Classes

Posted by ga...@apache.org.
Refactoring Monitoring Service Classes


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/7575faa3
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/7575faa3
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/7575faa3

Branch: refs/heads/tenant-isolation
Commit: 7575faa30b90a477931c857768b70b369f7fa9c3
Parents: 54283ed
Author: Thanuja <th...@wso2.com>
Authored: Wed Aug 12 18:16:54 2015 +0530
Committer: Thanuja <th...@wso2.com>
Committed: Thu Aug 20 10:59:16 2015 +0530

----------------------------------------------------------------------
 .../publisher/HealthStatisticsPublisher.java    |   4 +-
 .../publisher/InFlightRequestPublisher.java     |   4 +-
 .../publisher/StatisticsPublisherType.java      |   2 +-
 .../publisher/ThriftClientConfig.java           |  81 +++++++++++
 .../publisher/ThriftClientConfigParser.java     | 139 +++++++++++++++++++
 .../statistics/publisher/ThriftClientInfo.java  |  63 +++++++++
 .../publisher/ThriftStatisticsPublisher.java    | 115 +++++++++++++++
 .../publisher/wso2/cep/ThriftClientConfig.java  |  81 -----------
 .../wso2/cep/ThriftClientConfigParser.java      | 139 -------------------
 .../publisher/wso2/cep/ThriftClientInfo.java    |  63 ---------
 .../cep/WSO2CEPHealthStatisticsPublisher.java   |  26 ++--
 .../cep/WSO2CEPInFlightRequestPublisher.java    |  28 ++--
 .../wso2/cep/WSO2CEPStatisticsPublisher.java    | 114 ---------------
 .../test/ThriftClientConfigParserTest.java      |  13 +-
 .../modules/healthstatspublisher/healthstats.py |   4 +-
 .../streamdefinitions/stream-manager-config.xml |   4 +-
 16 files changed, 447 insertions(+), 433 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
index 95b04ff..6af1317 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
@@ -27,7 +27,7 @@ public interface HealthStatisticsPublisher extends StatisticsPublisher {
     /**
      * Publish health statistics to complex event processor.
      *
-     * @param timeStamp          time
+     * @param timestamp          Time
      * @param clusterId          Cluster id of the member
      * @param clusterInstanceId  Cluster instance id of the member
      * @param networkPartitionId Network partition id of the member
@@ -36,6 +36,6 @@ public interface HealthStatisticsPublisher extends StatisticsPublisher {
      * @param health             Health type: memory_consumption | load_average
      * @param value              Health type value
      */
-    void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId,
+    void publish(Long timestamp, String clusterId, String clusterInstanceId, String networkPartitionId,
                  String memberId, String partitionId, String health, double value);
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
index af9c8e9..e4e65c0 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
@@ -27,12 +27,12 @@ public interface InFlightRequestPublisher extends StatisticsPublisher {
     /**
      * Publish in-flight request count.
      *
-     * @param timeStamp            time
+     * @param timestamp            Time
      * @param clusterId            Cluster id
      * @param clusterInstanceId    Cluster instance id
      * @param networkPartitionId   Network partition id of the cluster
      * @param inFlightRequestCount In-flight request count of the cluster
      */
-    void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId,
+    void publish(Long timestamp, String clusterId, String clusterInstanceId, String networkPartitionId,
                  int inFlightRequestCount);
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/StatisticsPublisherType.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/StatisticsPublisherType.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/StatisticsPublisherType.java
index 77c5b78..d4b9a87 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/StatisticsPublisherType.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/StatisticsPublisherType.java
@@ -23,5 +23,5 @@ package org.apache.stratos.common.statistics.publisher;
  * Statistics publisher type enumneration.
  */
 public enum StatisticsPublisherType {
-    WSO2CEP
+    WSO2CEP, WSO2DAS
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfig.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfig.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfig.java
new file mode 100644
index 0000000..25ee897
--- /dev/null
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfig.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.common.statistics.publisher;
+
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Thrift Client configuration.
+ */
+public class ThriftClientConfig {
+
+    public static final String THRIFT_CLIENT_CONFIG_FILE_PATH = "thrift.client.config.file.path";
+
+    private static volatile ThriftClientConfig instance;
+    private ThriftClientInfo thriftClientInfo;
+
+    /*
+    * A private Constructor prevents any other
+    * class from instantiating.
+    */
+    ThriftClientConfig() {
+    }
+
+    public static ThriftClientConfig getInstance() {
+        if (instance == null) {
+            synchronized (ThriftClientConfig.class) {
+                if (instance == null) {
+                    String configFilePath = System.getProperty(THRIFT_CLIENT_CONFIG_FILE_PATH);
+                    if (StringUtils.isBlank(configFilePath)) {
+                        throw new RuntimeException(String.format("Thrift client configuration file path system " +
+                                "property is not set: %s", THRIFT_CLIENT_CONFIG_FILE_PATH));
+                    }
+                    instance = ThriftClientConfigParser.parse(configFilePath);
+                }
+            }
+        }
+        return instance;
+    }
+
+    /**
+     * Returns an ThriftClientInfo Object that stores the credential information.
+     * Thrift client credential information can be found under thrift-client-config.xml file
+     * These credential information then get parsed and assigned into ThriftClientInfo
+     * Object.
+     * <p/>
+     * This method is used to return the assigned values in ThriftClientInfo Object
+     *
+     * @return ThriftClientInfo object which consists of username,password,ip and port values
+     */
+    public ThriftClientInfo getThriftClientInfo() {
+        return thriftClientInfo;
+    }
+
+    /**
+     * Parsed values will be assigned to ThriftClientInfo object. Required fields will be taken
+     * from thrift-client-config.xml file.
+     *
+     * @param thriftClientInfo Object of the ThriftClientInfo
+     */
+    public void setThriftClientInfo(ThriftClientInfo thriftClientInfo) {
+        this.thriftClientInfo = thriftClientInfo;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfigParser.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfigParser.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfigParser.java
new file mode 100644
index 0000000..aa05d6f
--- /dev/null
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfigParser.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.common.statistics.publisher;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.util.AxiomXpathParserUtil;
+import org.wso2.securevault.SecretResolver;
+import org.wso2.securevault.SecretResolverFactory;
+
+import java.io.File;
+import java.util.Iterator;
+
+/**
+ * Thrift client config parser.
+ */
+public class ThriftClientConfigParser {
+
+    private static final Log log = LogFactory.getLog(ThriftClientConfigParser.class);
+
+    /**
+     * Fields to be read from the thrift-client-config.xml file
+     */
+    private static final String USERNAME_ELEMENT = "username";
+    private static final String PASSWORD_ELEMENT = "password";
+    private static final String IP_ELEMENT = "ip";
+    private static final String PORT_ELEMENT = "port";
+
+    /**
+     * This method reads thrift-client-config.xml file and assign necessary credential
+     * values into thriftClientInfo object.  A singleton design has been implemented
+     * with the use of thriftClientIConfig class.
+     * <p/>
+     * The filePath argument is the path to thrift-client-config.xml file
+     *
+     * @param filePath the path to thrift-client-config.xml file
+     * @return ThriftClientConfig object
+     */
+    public static ThriftClientConfig parse(String filePath) {
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Parsing thrift client config file: %s", filePath));
+            }
+
+            ThriftClientConfig thriftClientIConfig = new ThriftClientConfig();
+            ThriftClientInfo thriftClientInfo = new ThriftClientInfo();
+            thriftClientIConfig.setThriftClientInfo(thriftClientInfo);
+
+            File configFile = new File(filePath);
+            if (!configFile.exists()) {
+                throw new RuntimeException(String.format("Thrift client config file does not exist: %s", filePath));
+            }
+            OMElement document = AxiomXpathParserUtil.parse(configFile);
+            Iterator thriftClientIterator = document.getChildElements();
+
+            //Initialize the SecretResolver providing the configuration element.
+            SecretResolver secretResolver = SecretResolverFactory.create(document, false);
+
+            String userNameValuesStr = null;
+            String passwordValueStr = null;
+            String ipValuesStr = null;
+            String portValueStr = null;
+
+            //same entry used in cipher-text.properties and cipher-tool.properties.
+            String secretAlias = "thrift.client.configuration.password";
+
+            // Iterate the thrift-client-config.xml file and read child element
+            // consists of credential information necessary for ThriftStatisticsPublisher
+            while (thriftClientIterator.hasNext()) {
+                OMElement thriftClientElement = (OMElement) thriftClientIterator.next();
+
+                if (USERNAME_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) {
+                    userNameValuesStr = thriftClientElement.getText();
+                    thriftClientInfo.setUsername(userNameValuesStr);
+                }
+                //password field protected using Secure vault
+                if (PASSWORD_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) {
+                    if ((secretResolver != null) && (secretResolver.isInitialized())) {
+                        if (secretResolver.isTokenProtected(secretAlias)) {
+                            passwordValueStr = secretResolver.resolve(secretAlias);
+                        } else {
+                            passwordValueStr = thriftClientElement.getText();
+                        }
+                    } else {
+                        passwordValueStr = thriftClientElement.getText();
+                    }
+                    thriftClientInfo.setPassword(passwordValueStr);
+                }
+
+                if (IP_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) {
+                    ipValuesStr = thriftClientElement.getText();
+                    thriftClientInfo.setIp(ipValuesStr);
+                }
+
+                if (PORT_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) {
+                    portValueStr = thriftClientElement.getText();
+                    thriftClientInfo.setPort(portValueStr);
+                }
+            }
+
+            if (userNameValuesStr == null) {
+                throw new RuntimeException("Username value not found in thrift client configuration");
+            }
+            if (passwordValueStr == null) {
+                throw new RuntimeException("Password not found in thrift client configuration ");
+            }
+
+            if (ipValuesStr == null) {
+                throw new RuntimeException("Ip values not found in thrift client configuration ");
+            }
+
+            if (portValueStr == null) {
+                throw new RuntimeException("Port not found in thrift client configuration ");
+            }
+
+            return thriftClientIConfig;
+        } catch (Exception e) {
+            throw new RuntimeException("Could not parse thrift client configuration", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientInfo.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientInfo.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientInfo.java
new file mode 100644
index 0000000..514d907
--- /dev/null
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientInfo.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.common.statistics.publisher;
+
+/**
+ * Thrift Client Info
+ */
+public class ThriftClientInfo {
+    private String username;
+    private String password;
+    private String ip;
+    private String port;
+
+
+    public String getUsername() {
+        return username;
+    }
+
+    public void setUsername(String username) {
+        this.username = username;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public String getIp() {
+        return ip;
+    }
+
+    public void setIp(String ip) {
+        this.ip = ip;
+    }
+
+    public String getPort() {
+        return port;
+    }
+
+    public void setPort(String port) {
+        this.port = port;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
new file mode 100644
index 0000000..6a9e955
--- /dev/null
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.common.statistics.publisher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.databridge.agent.thrift.Agent;
+import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
+import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration;
+import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
+import org.wso2.carbon.databridge.commons.Event;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+
+import java.util.HashMap;
+
+/**
+ * Thrift statistics publisher.
+ */
+public class ThriftStatisticsPublisher implements StatisticsPublisher {
+
+    private static final Log log = LogFactory.getLog(ThriftStatisticsPublisher.class);
+
+    private StreamDefinition streamDefinition;
+    private AsyncDataPublisher asyncDataPublisher;
+    private String ip;
+    private String port;
+    private String username;
+    private String password;
+    private boolean enabled = false;
+
+    /**
+     * Credential information stored inside thrift-client-config.xml file
+     * is parsed and assigned into ip,port,username and password fields
+     *
+     * @param streamDefinition Thrift Event Stream Definition
+     */
+    public ThriftStatisticsPublisher(StreamDefinition streamDefinition, String statsPublisherEnabled) {
+        ThriftClientConfig thriftClientConfig = ThriftClientConfig.getInstance();
+        ThriftClientInfo thriftClientInfo = thriftClientConfig.getThriftClientInfo();
+
+        this.streamDefinition = streamDefinition;
+        this.ip = thriftClientInfo.getIp();
+        this.port = thriftClientInfo.getPort();
+        this.username = thriftClientInfo.getUsername();
+        this.password = thriftClientInfo.getPassword();
+
+        enabled = Boolean.getBoolean(statsPublisherEnabled);
+        if (enabled) {
+            init();
+        }
+    }
+
+    private void init() {
+        AgentConfiguration agentConfiguration = new AgentConfiguration();
+        Agent agent = new Agent(agentConfiguration);
+
+        // Initialize asynchronous data publisher
+        asyncDataPublisher = new AsyncDataPublisher("tcp://" + ip + ":" + port + "", username, password, agent);
+        asyncDataPublisher.addStreamDefinition(streamDefinition);
+    }
+
+    @Override
+    public void setEnabled(boolean enabled) {
+        this.enabled = enabled;
+        if (this.enabled) {
+            init();
+        }
+    }
+
+    @Override
+    public boolean isEnabled() {
+        return enabled;
+    }
+
+    @Override
+    public void publish(Object[] payload) {
+        if (!isEnabled()) {
+            throw new RuntimeException("Statistics publisher is not enabled");
+        }
+
+        Event event = new Event();
+        event.setPayloadData(payload);
+        event.setArbitraryDataMap(new HashMap<String, String>());
+
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Publishing thrift event: [stream] %s [version] %s",
+                        streamDefinition.getName(), streamDefinition.getVersion()));
+            }
+            asyncDataPublisher.publish(streamDefinition.getName(), streamDefinition.getVersion(), event);
+        } catch (AgentException e) {
+            if (log.isErrorEnabled()) {
+                log.error(String.format("Could not publish thrift event: [stream] %s [version] %s",
+                        streamDefinition.getName(), streamDefinition.getVersion()), e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfig.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfig.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfig.java
deleted file mode 100644
index 178c5a6..0000000
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfig.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.common.statistics.publisher.wso2.cep;
-
-
-import org.apache.commons.lang.StringUtils;
-
-/**
- * Thrift Client configuration.
- */
-public class ThriftClientConfig {
-
-    public static final String THRIFT_CLIENT_CONFIG_FILE_PATH = "thrift.client.config.file.path";
-
-    private static volatile ThriftClientConfig instance;
-    private ThriftClientInfo thriftClientInfo;
-
-    /*
-    * A private Constructor prevents any other
-    * class from instantiating.
-    */
-    ThriftClientConfig() {
-    }
-
-    public static ThriftClientConfig getInstance() {
-        if (instance == null) {
-            synchronized (ThriftClientConfig.class) {
-                if (instance == null) {
-                    String configFilePath = System.getProperty(THRIFT_CLIENT_CONFIG_FILE_PATH);
-                    if (StringUtils.isBlank(configFilePath)) {
-                        throw new RuntimeException(String.format("Thrift client configuration file path system " +
-                                "property is not set: %s", THRIFT_CLIENT_CONFIG_FILE_PATH));
-                    }
-                    instance = ThriftClientConfigParser.parse(configFilePath);
-                }
-            }
-        }
-        return instance;
-    }
-
-    /**
-     * Returns an ThriftClientInfo Object that stores the credential information.
-     * CEP credential information can be found under thrift-client-config.xml file
-     * These credential information then get parsed and assigned into ThriftClientInfo
-     * Object.
-     * <p/>
-     * This method is used to return the assigned values in ThriftClientInfo Object
-     *
-     * @return ThriftClientInfo object which consists of username,password,ip and port values
-     */
-    public ThriftClientInfo getThriftClientInfo() {
-        return thriftClientInfo;
-    }
-
-    /**
-     * Parsed values will be assigned to ThriftClientInfo object. Required fields will be taken
-     * from thrift-client-config.xml file.
-     *
-     * @param thriftClientInfo Object of the ThriftClientInfo
-     */
-    public void setThriftClientInfo(ThriftClientInfo thriftClientInfo) {
-        this.thriftClientInfo = thriftClientInfo;
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfigParser.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfigParser.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfigParser.java
deleted file mode 100644
index ae3c692..0000000
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfigParser.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.common.statistics.publisher.wso2.cep;
-
-import org.apache.axiom.om.OMElement;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.common.util.AxiomXpathParserUtil;
-import org.wso2.securevault.SecretResolver;
-import org.wso2.securevault.SecretResolverFactory;
-
-import java.io.File;
-import java.util.Iterator;
-
-/**
- * Thrift client config parser.
- */
-public class ThriftClientConfigParser {
-
-    private static final Log log = LogFactory.getLog(ThriftClientConfigParser.class);
-
-    /**
-     * Fields to be read from the thrift-client-config.xml file
-     */
-    private static final String USERNAME_ELEMENT = "username";
-    private static final String PASSWORD_ELEMENT = "password";
-    private static final String IP_ELEMENT = "ip";
-    private static final String PORT_ELEMENT = "port";
-
-    /**
-     * This method reads thrift-client-config.xml file and assign necessary credential
-     * values into thriftClientInfo object.  A singleton design has been implemented
-     * with the use of thriftClientIConfig class.
-     * <p/>
-     * The filePath argument is the path to thrift-client-config.xml file
-     *
-     * @param filePath the path to thrift-client-config.xml file
-     * @return ThriftClientConfig object
-     */
-    public static ThriftClientConfig parse(String filePath) {
-        try {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Parsing thrift client config file: %s", filePath));
-            }
-
-            ThriftClientConfig thriftClientIConfig = new ThriftClientConfig();
-            ThriftClientInfo thriftClientInfo = new ThriftClientInfo();
-            thriftClientIConfig.setThriftClientInfo(thriftClientInfo);
-
-            File configFile = new File(filePath);
-            if (!configFile.exists()) {
-                throw new RuntimeException(String.format("Thrift client config file does not exist: %s", filePath));
-            }
-            OMElement document = AxiomXpathParserUtil.parse(configFile);
-            Iterator thriftClientIterator = document.getChildElements();
-
-            //Initialize the SecretResolver providing the configuration element.
-            SecretResolver secretResolver = SecretResolverFactory.create(document, false);
-
-            String userNameValuesStr = null;
-            String passwordValueStr = null;
-            String ipValuesStr = null;
-            String portValueStr = null;
-
-            //same entry used in cipher-text.properties and cipher-tool.properties.
-            String secretAlias = "thrift.client.configuration.password";
-
-            // Iterate the thrift-client-config.xml file and read child element
-            // consists of credential information necessary for WSO2CEPStatisticsPublisher
-            while (thriftClientIterator.hasNext()) {
-                OMElement thriftClientElement = (OMElement) thriftClientIterator.next();
-
-                if (USERNAME_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) {
-                    userNameValuesStr = thriftClientElement.getText();
-                    thriftClientInfo.setUsername(userNameValuesStr);
-                }
-                //password field protected using Secure vault
-                if (PASSWORD_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) {
-                    if ((secretResolver != null) && (secretResolver.isInitialized())) {
-                        if (secretResolver.isTokenProtected(secretAlias)) {
-                            passwordValueStr = secretResolver.resolve(secretAlias);
-                        } else {
-                            passwordValueStr = thriftClientElement.getText();
-                        }
-                    } else {
-                        passwordValueStr = thriftClientElement.getText();
-                    }
-                    thriftClientInfo.setPassword(passwordValueStr);
-                }
-
-                if (IP_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) {
-                    ipValuesStr = thriftClientElement.getText();
-                    thriftClientInfo.setIp(ipValuesStr);
-                }
-
-                if (PORT_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) {
-                    portValueStr = thriftClientElement.getText();
-                    thriftClientInfo.setPort(portValueStr);
-                }
-            }
-
-            if (userNameValuesStr == null) {
-                throw new RuntimeException("Username value not found in thrift client configuration");
-            }
-            if (passwordValueStr == null) {
-                throw new RuntimeException("Password not found in thrift client configuration ");
-            }
-
-            if (ipValuesStr == null) {
-                throw new RuntimeException("Ip values not found in thrift client configuration ");
-            }
-
-            if (portValueStr == null) {
-                throw new RuntimeException("Port not found in thrift client configuration ");
-            }
-
-            return thriftClientIConfig;
-        } catch (Exception e) {
-            throw new RuntimeException("Could not parse thrift client configuration", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientInfo.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientInfo.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientInfo.java
deleted file mode 100644
index 1a9ba81..0000000
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientInfo.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.common.statistics.publisher.wso2.cep;
-
-/**
- * Thrift Client Info
- */
-public class ThriftClientInfo {
-    private String username;
-    private String password;
-    private String ip;
-    private String port;
-
-
-    public String getUsername() {
-        return username;
-    }
-
-    public void setUsername(String username) {
-        this.username = username;
-    }
-
-    public String getPassword() {
-        return password;
-    }
-
-    public void setPassword(String password) {
-        this.password = password;
-    }
-
-    public String getIp() {
-        return ip;
-    }
-
-    public void setIp(String ip) {
-        this.ip = ip;
-    }
-
-    public String getPort() {
-        return port;
-    }
-
-    public void setPort(String port) {
-        this.port = port;
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
index d5c9265..33cf0b5 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
@@ -22,6 +22,7 @@ package org.apache.stratos.common.statistics.publisher.wso2.cep;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.common.statistics.publisher.HealthStatisticsPublisher;
+import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
 import org.wso2.carbon.databridge.commons.Attribute;
 import org.wso2.carbon.databridge.commons.AttributeType;
 import org.wso2.carbon.databridge.commons.StreamDefinition;
@@ -32,15 +33,16 @@ import java.util.List;
 /**
  * Health statistics publisher for publishing statistics to WSO2 CEP.
  */
-public class WSO2CEPHealthStatisticsPublisher extends WSO2CEPStatisticsPublisher implements HealthStatisticsPublisher {
+public class WSO2CEPHealthStatisticsPublisher extends ThriftStatisticsPublisher implements HealthStatisticsPublisher {
 
     private static final Log log = LogFactory.getLog(WSO2CEPHealthStatisticsPublisher.class);
 
+    private static final String STATS_PUBLISHER_ENABLED = "cep.stats.publisher.enabled";
     private static final String DATA_STREAM_NAME = "cartridge_agent_health_stats";
     private static final String VERSION = "1.0.0";
 
     public WSO2CEPHealthStatisticsPublisher() {
-        super(createStreamDefinition());
+        super(createStreamDefinition(), STATS_PUBLISHER_ENABLED);
     }
 
     private static StreamDefinition createStreamDefinition() {
@@ -71,17 +73,17 @@ public class WSO2CEPHealthStatisticsPublisher extends WSO2CEPStatisticsPublisher
     /**
      * Publish health statistics to cep.
      *
-     * @param timeStamp
-     * @param clusterId
-     * @param clusterInstanceId
-     * @param networkPartitionId
-     * @param memberId
-     * @param partitionId
-     * @param health
-     * @param value
+     * @param timestamp          Time
+     * @param clusterId          Cluster id of the member
+     * @param clusterInstanceId  Cluster instance id of the member
+     * @param networkPartitionId Network partition id of the member
+     * @param memberId           Member id
+     * @param partitionId        Partition id of the member
+     * @param health             Health type: memory_consumption | load_average
+     * @param value              Health type value
      */
     @Override
-    public void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId,
+    public void publish(Long timestamp, String clusterId, String clusterInstanceId, String networkPartitionId,
                         String memberId, String partitionId, String health, double value) {
         if (log.isDebugEnabled()) {
             log.debug(String.format("Publishing health statistics: [cluster] %s [network-partition] %s " +
@@ -90,7 +92,7 @@ public class WSO2CEPHealthStatisticsPublisher extends WSO2CEPStatisticsPublisher
         }
         // Set payload values
         List<Object> payload = new ArrayList<Object>();
-        payload.add(timeStamp);
+        payload.add(timestamp);
         payload.add(clusterId);
         payload.add(clusterInstanceId);
         payload.add(networkPartitionId);

http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
index f51eb91..f853f23 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
@@ -19,7 +19,10 @@
 
 package org.apache.stratos.common.statistics.publisher.wso2.cep;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.common.statistics.publisher.InFlightRequestPublisher;
+import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
 import org.wso2.carbon.databridge.commons.Attribute;
 import org.wso2.carbon.databridge.commons.AttributeType;
 import org.wso2.carbon.databridge.commons.StreamDefinition;
@@ -33,13 +36,15 @@ import java.util.List;
  * In-flight request count:
  * Number of requests being served at a given moment could be identified as in-flight request count.
  */
-public class WSO2CEPInFlightRequestPublisher extends WSO2CEPStatisticsPublisher implements InFlightRequestPublisher {
+public class WSO2CEPInFlightRequestPublisher extends ThriftStatisticsPublisher implements InFlightRequestPublisher {
+    private static final Log log = LogFactory.getLog(WSO2CEPInFlightRequestPublisher.class);
 
+    private static final String STATS_PUBLISHER_ENABLED = "cep.stats.publisher.enabled";
     private static final String DATA_STREAM_NAME = "in_flight_requests";
     private static final String VERSION = "1.0.0";
 
     public WSO2CEPInFlightRequestPublisher() {
-        super(createStreamDefinition());
+        super(createStreamDefinition(), STATS_PUBLISHER_ENABLED);
     }
 
     private static StreamDefinition createStreamDefinition() {
@@ -66,18 +71,23 @@ public class WSO2CEPInFlightRequestPublisher extends WSO2CEPStatisticsPublisher
     /**
      * Publish in-flight request count of a cluster.
      *
-     * @param timeStamp
-     * @param clusterId
-     * @param clusterInstanceId
-     * @param networkPartitionId
-     * @param inFlightRequestCount
+     * @param timestamp            Time
+     * @param clusterId            Cluster id
+     * @param clusterInstanceId    Cluster instance id
+     * @param networkPartitionId   Network partition id of the cluster
+     * @param inFlightRequestCount In-flight request count of the cluster
      */
     @Override
-    public void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId,
+    public void publish(Long timestamp, String clusterId, String clusterInstanceId, String networkPartitionId,
                         int inFlightRequestCount) {
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Publishing health statistics: [timestamp] %d [cluster] %s " +
+                            "[cluster-instance] %s [network-partition] %s [in-flight-request-count] %d",
+                    timestamp, clusterId, clusterInstanceId, networkPartitionId, inFlightRequestCount));
+        }
         // Set payload values
         List<Object> payload = new ArrayList<Object>();
-        payload.add(timeStamp);
+        payload.add(timestamp);
         payload.add(clusterId);
         payload.add(clusterInstanceId);
         payload.add(networkPartitionId);

http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPStatisticsPublisher.java
deleted file mode 100644
index 653288d..0000000
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPStatisticsPublisher.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.common.statistics.publisher.wso2.cep;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.common.statistics.publisher.StatisticsPublisher;
-import org.wso2.carbon.databridge.agent.thrift.Agent;
-import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
-import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration;
-import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
-import org.wso2.carbon.databridge.commons.Event;
-import org.wso2.carbon.databridge.commons.StreamDefinition;
-
-import java.util.HashMap;
-
-/**
- * WSO2 CEP statistics publisher.
- */
-public class WSO2CEPStatisticsPublisher implements StatisticsPublisher {
-
-    private static final Log log = LogFactory.getLog(WSO2CEPStatisticsPublisher.class);
-
-    private StreamDefinition streamDefinition;
-    private AsyncDataPublisher asyncDataPublisher;
-    private String ip;
-    private String port;
-    private String username;
-    private String password;
-    private boolean enabled = false;
-
-    /**
-     * Credential information stored inside thrift-client-config.xml file
-     * is parsed and assigned into ip,port,username and password fields
-     *
-     * @param streamDefinition
-     */
-    public WSO2CEPStatisticsPublisher(StreamDefinition streamDefinition) {
-        ThriftClientConfig thriftClientConfig = ThriftClientConfig.getInstance();
-        thriftClientConfig.getThriftClientInfo();
-
-        this.streamDefinition = streamDefinition;
-        this.ip = thriftClientConfig.getThriftClientInfo().getIp();
-        this.port = thriftClientConfig.getThriftClientInfo().getPort();
-        this.username = thriftClientConfig.getThriftClientInfo().getUsername();
-        this.password = thriftClientConfig.getThriftClientInfo().getPassword();
-
-        enabled = Boolean.getBoolean("cep.stats.publisher.enabled");
-        if (enabled) {
-            init();
-        }
-    }
-
-    private void init() {
-        AgentConfiguration agentConfiguration = new AgentConfiguration();
-        Agent agent = new Agent(agentConfiguration);
-
-        // Initialize asynchronous data publisher
-        asyncDataPublisher = new AsyncDataPublisher("tcp://" + ip + ":" + port + "", username, password, agent);
-        asyncDataPublisher.addStreamDefinition(streamDefinition);
-    }
-
-    @Override
-    public void setEnabled(boolean enabled) {
-        this.enabled = enabled;
-        if (this.enabled) {
-            init();
-        }
-    }
-
-    @Override
-    public boolean isEnabled() {
-        return enabled;
-    }
-
-    @Override
-    public void publish(Object[] payload) {
-        if (!isEnabled()) {
-            throw new RuntimeException("Statistics publisher is not enabled");
-        }
-
-        Event event = new Event();
-        event.setPayloadData(payload);
-        event.setArbitraryDataMap(new HashMap<String, String>());
-
-        try {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Publishing cep event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()));
-            }
-            asyncDataPublisher.publish(streamDefinition.getName(), streamDefinition.getVersion(), event);
-        } catch (AgentException e) {
-            if (log.isErrorEnabled()) {
-                log.error(String.format("Could not publish cep event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()), e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/ThriftClientConfigParserTest.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/ThriftClientConfigParserTest.java b/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/ThriftClientConfigParserTest.java
index 09d5b5e..352041d 100644
--- a/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/ThriftClientConfigParserTest.java
+++ b/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/ThriftClientConfigParserTest.java
@@ -20,7 +20,8 @@
 package org.apache.stratos.common.test;
 
 import junit.framework.TestCase;
-import org.apache.stratos.common.statistics.publisher.wso2.cep.ThriftClientConfig;
+import org.apache.stratos.common.statistics.publisher.ThriftClientConfig;
+import org.apache.stratos.common.statistics.publisher.ThriftClientInfo;
 import org.junit.Test;
 
 import java.net.URL;
@@ -41,11 +42,11 @@ public class ThriftClientConfigParserTest extends TestCase {
         URL configFileUrl = ThriftClientConfigParserTest.class.getResource("/thrift-client-config.xml");
         System.setProperty(ThriftClientConfig.THRIFT_CLIENT_CONFIG_FILE_PATH, configFileUrl.getPath());
         ThriftClientConfig thriftClientConfig = ThriftClientConfig.getInstance();
-        thriftClientConfig.getThriftClientInfo();
+        ThriftClientInfo thriftClientInfo = thriftClientConfig.getThriftClientInfo();
 
-        assertEquals("Incorrect Password", "admin", thriftClientConfig.getThriftClientInfo().getUsername());
-        assertEquals("Incorrect Password", "1234", thriftClientConfig.getThriftClientInfo().getPassword());
-        assertEquals("Incorrect IP", "192.168.10.10", thriftClientConfig.getThriftClientInfo().getIp());
-        assertEquals("Incorrect Port", "9300", thriftClientConfig.getThriftClientInfo().getPort());
+        assertEquals("Incorrect Username", "admin", thriftClientInfo.getUsername());
+        assertEquals("Incorrect Password", "1234", thriftClientInfo.getPassword());
+        assertEquals("Incorrect IP", "192.168.10.10", thriftClientInfo.getIp());
+        assertEquals("Incorrect Port", "9300", thriftClientInfo.getPort());
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py
index aae9e9d..c0b3981 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py
@@ -124,7 +124,7 @@ class HealthStatisticsPublisher:
         stream_def.description = HealthStatisticsPublisherManager.STREAM_DESCRIPTION
 
         # stream_def.add_payloaddata_attribute()
-        stream_def.add_payloaddata_attribute("time_stamp", StreamDefinition.LONG)
+        stream_def.add_payloaddata_attribute("timestamp", StreamDefinition.LONG)
         stream_def.add_payloaddata_attribute("cluster_id", StreamDefinition.STRING)
         stream_def.add_payloaddata_attribute("cluster_instance_id", StreamDefinition.STRING)
         stream_def.add_payloaddata_attribute("network_partition_id", StreamDefinition.STRING)
@@ -199,7 +199,7 @@ class DefaultHealthStatisticsReader(AbstractHealthStatisticsReader):
         (one, five, fifteen) = os.getloadavg()
         cores = multiprocessing.cpu_count()
 
-        return (one/cores) * 100
+        return (one / cores) * 100
 
 
 class CEPPublisherConfiguration:

http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml b/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml
index a256770..9e0a833 100644
--- a/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml
+++ b/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml
@@ -30,7 +30,7 @@
         <correlationData>
         </correlationData>
         <payloadData>
-            <property name="time_stamp" type="long"/>
+            <property name="timestamp" type="long"/>
             <property name="cluster_id" type="String"/>
             <property name="cluster_instance_id" type="String"/>
             <property name="network_partition_id" type="String"/>
@@ -93,7 +93,7 @@
         <correlationData>
         </correlationData>
         <payloadData>
-            <property name="time_stamp" type="long"/>
+            <property name="timestamp" type="long"/>
             <property name="cluster_id" type="String"/>
             <property name="cluster_instance_id" type="String"/>
             <property name="network_partition_id" type="String"/>


[3/5] stratos git commit: Adding Metering and Monitoring Service Implementation

Posted by ga...@apache.org.
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
index dab6827..f76c928 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
@@ -31,6 +31,7 @@ import org.apache.stratos.cloud.controller.messaging.publisher.TopologyEventPubl
 import org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher;
 import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
 import org.apache.stratos.common.Property;
+import org.apache.stratos.common.constants.StratosConstants;
 import org.apache.stratos.messaging.domain.application.ClusterDataHolder;
 import org.apache.stratos.messaging.domain.instance.ClusterInstance;
 import org.apache.stratos.messaging.domain.topology.*;
@@ -67,8 +68,11 @@ public class TopologyBuilder {
             TopologyManager.acquireWriteLock();
             for (Cartridge cartridge : cartridgeList) {
                 if (!topology.serviceExists(cartridge.getType())) {
-                    ServiceType serviceType = cartridge.isMultiTenant() ? ServiceType.MultiTenant : ServiceType.SingleTenant;
+
+                    ServiceType serviceType = cartridge.isMultiTenant() ? ServiceType.MultiTenant :
+                            ServiceType.SingleTenant;
                     service = new Service(cartridge.getType(), serviceType, cartridge.getUuid());
+
                     Properties properties = new Properties();
 
                     try {
@@ -199,867 +203,899 @@ public class TopologyBuilder {
         }
 
         log.debug("Creating cluster port mappings: [application-id] " + appUuid);
-        for(Cluster cluster : appClusters) {
+        for (Cluster cluster : appClusters) {
             String cartridgeUuid = cluster.getServiceUuid();
             Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeUuid);
-            if(cartridge == null) {
+            if (cartridge == null) {
                 throw new CloudControllerException("Cartridge not found: [cartridge-uuid] " + cartridgeUuid);
             }
 
-            for(PortMapping portMapping : cartridge.getPortMappings()) {
+            for (PortMapping portMapping : cartridge.getPortMappings()) {
                 ClusterPortMapping clusterPortMapping = new ClusterPortMapping(appUuid,
                         cluster.getClusterId(), portMapping.getName(), portMapping.getProtocol(), portMapping.getPort(),
-                        portMapping.getProxyPort());
-                CloudControllerContext.getInstance().addClusterPortMapping(clusterPortMapping);
-                log.debug("Cluster port mapping created: " + clusterPortMapping.toString());
-            }
-        }
-
-        // Persist cluster port mappings
-        CloudControllerContext.getInstance().persist();
-
-        // Send application clusters created event
-        TopologyEventPublisher.sendApplicationClustersCreated(appUuid, appClusters);
-    }
-
-    public static void handleApplicationClustersRemoved(String appId,
-                                                        Set<ClusterDataHolder> clusterData) {
-        TopologyManager.acquireWriteLock();
-
-        List<Cluster> removedClusters = new ArrayList<Cluster>();
-        CloudControllerContext context = CloudControllerContext.getInstance();
-        try {
-            Topology topology = TopologyManager.getTopology();
-
-            if (clusterData != null) {
-                // remove clusters from CC topology model and remove runtime information
-                for (ClusterDataHolder aClusterData : clusterData) {
-                    Service aService = topology.getService(aClusterData.getServiceUuid());
-                    if (aService != null) {
-                        removedClusters.add(aService.removeCluster(aClusterData.getClusterId()));
-                    } else {
-                        log.warn("Service " + aClusterData.getServiceType() + " not found, " +
-                                "unable to remove Cluster " + aClusterData.getClusterId());
+                                portMapping.getProxyPort());
+                        CloudControllerContext.getInstance().addClusterPortMapping(clusterPortMapping);
+                        log.debug("Cluster port mapping created: " + clusterPortMapping.toString());
                     }
-                    // remove runtime data
-                    context.removeClusterContext(aClusterData.getClusterId());
-
-                    log.info("Removed application [ " + appId + " ]'s Cluster " +
-                            "[ " + aClusterData.getClusterId() + " ] from the topology");
                 }
-                // persist runtime data changes
-                CloudControllerContext.getInstance().persist();
-            } else {
-                log.info("No cluster data found for application " + appId + " to remove");
-            }
 
-            TopologyManager.updateTopology(topology);
+                // Persist cluster port mappings
+                CloudControllerContext.getInstance().persist();
 
-        } finally {
-            TopologyManager.releaseWriteLock();
-        }
+                // Send application clusters created event
+                TopologyEventPublisher.sendApplicationClustersCreated(appUuid, appClusters);
+            }
 
-        // Remove cluster port mappings of application
-        CloudControllerContext.getInstance().removeClusterPortMappings(appId);
-        CloudControllerContext.getInstance().persist();
+        public static void handleApplicationClustersRemoved (String appId,
+                Set < ClusterDataHolder > clusterData){
+            TopologyManager.acquireWriteLock();
 
-        TopologyEventPublisher.sendApplicationClustersRemoved(appId, clusterData);
+            List<Cluster> removedClusters = new ArrayList<Cluster>();
+            CloudControllerContext context = CloudControllerContext.getInstance();
+            try {
+                Topology topology = TopologyManager.getTopology();
+
+                if (clusterData != null) {
+                    // remove clusters from CC topology model and remove runtime information
+                    for (ClusterDataHolder aClusterData : clusterData) {
+                        Service aService = topology.getService(aClusterData.getServiceUuid());
+                        if (aService != null) {
+                            removedClusters.add(aService.removeCluster(aClusterData.getClusterId()));
+                        } else {
+                            log.warn("Service " + aClusterData.getServiceType() + " not found, " +
+                                    "unable to remove Cluster " + aClusterData.getClusterId());
+                        }
+                        // remove runtime data
+                        context.removeClusterContext(aClusterData.getClusterId());
 
-    }
+                        log.info("Removed application [ " + appId + " ]'s Cluster " +
+                                "[ " + aClusterData.getClusterId() + " ] from the topology");
+                    }
+                    // persist runtime data changes
+                    CloudControllerContext.getInstance().persist();
+                } else {
+                    log.info("No cluster data found for application " + appId + " to remove");
+                }
 
-    public static void handleClusterReset(ClusterStatusClusterResetEvent event) {
-        TopologyManager.acquireWriteLock();
+                TopologyManager.updateTopology(topology);
 
-        try {
-            Topology topology = TopologyManager.getTopology();
-            Service service = topology.getService(event.getServiceName());
-            if (service == null) {
-                log.error("Service " + event.getServiceName() +
-                        " not found in Topology, unable to update the cluster status to Created");
-                return;
+            } finally {
+                TopologyManager.releaseWriteLock();
             }
 
-            Cluster cluster = service.getCluster(event.getClusterId());
-            if (cluster == null) {
-                log.error("Cluster " + event.getClusterId() + " not found in Topology, unable to update " +
-                        "status to Created");
-                return;
-            }
+            // Remove cluster port mappings of application
+            CloudControllerContext.getInstance().removeClusterPortMappings(appId);
+            CloudControllerContext.getInstance().persist();
 
-            ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId());
-            if (context == null) {
-                log.warn("Cluster Instance Context is not found for [cluster] " +
-                        event.getClusterId() + " [instance-id] " +
-                        event.getInstanceId());
-                return;
-            }
-            ClusterStatus status = ClusterStatus.Created;
-            if (context.isStateTransitionValid(status)) {
-                context.setStatus(status);
-                log.info("Cluster Created adding status started for" + cluster.getClusterId());
-                TopologyManager.updateTopology(topology);
-                //publishing data
-                TopologyEventPublisher.sendClusterResetEvent(event.getAppId(), event.getServiceName(),
-                        event.getClusterId(), event.getInstanceId());
-            } else {
-                log.warn(String.format("Cluster state transition is not valid: [cluster-id] %s " +
-                                " [instance-id] %s [current-status] %s [status-requested] %s",
-                        event.getClusterId(), event.getInstanceId(),
-                        context.getStatus(), status));
-            }
+            TopologyEventPublisher.sendApplicationClustersRemoved(appId, clusterData);
 
-        } finally {
-            TopologyManager.releaseWriteLock();
         }
 
+        public static void handleClusterReset (ClusterStatusClusterResetEvent event){
+            TopologyManager.acquireWriteLock();
 
-    }
-
-    public static void handleClusterInstanceCreated(String serviceUuid, String clusterId,
-                                                    String alias, String instanceId, String partitionId,
-                                                    String networkPartitionUuid) {
-
-        TopologyManager.acquireWriteLock();
+            try {
+                Topology topology = TopologyManager.getTopology();
+                Service service = topology.getService(event.getServiceName());
+                if (service == null) {
+                    log.error("Service " + event.getServiceName() +
+                            " not found in Topology, unable to update the cluster status to Created");
+                    return;
+                }
 
-        try {
-            Topology topology = TopologyManager.getTopology();
-            Service service = topology.getService(serviceUuid);
-            if (service == null) {
-                log.error("Service " + serviceUuid +
-                        " not found in Topology, unable to update the cluster status to Created");
-                return;
-            }
+                Cluster cluster = service.getCluster(event.getClusterId());
+                if (cluster == null) {
+                    log.error("Cluster " + event.getClusterId() + " not found in Topology, unable to update " +
+                            "status to Created");
+                    return;
+                }
 
-            Cluster cluster = service.getCluster(clusterId);
-            if (cluster == null) {
-                log.error("Cluster " + clusterId + " not found in Topology, unable to update " +
-                        "status to Created");
-                return;
-            }
+                ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId());
+                if (context == null) {
+                    log.warn("Cluster Instance Context is not found for [cluster] " +
+                            event.getClusterId() + " [instance-id] " +
+                            event.getInstanceId());
+                    return;
+                }
+                ClusterStatus status = ClusterStatus.Created;
+                if (context.isStateTransitionValid(status)) {
+                    context.setStatus(status);
+                    log.info("Cluster Created adding status started for" + cluster.getClusterId());
+                    TopologyManager.updateTopology(topology);
+                    //publishing data
+                    TopologyEventPublisher.sendClusterResetEvent(event.getAppId(), event.getServiceName(),
+                            event.getClusterId(), event.getInstanceId());
+                } else {
+                    log.warn(String.format("Cluster state transition is not valid: [cluster-id] %s " +
+                                    " [instance-id] %s [current-status] %s [status-requested] %s",
+                            event.getClusterId(), event.getInstanceId(),
+                            context.getStatus(), status));
+                }
 
-            if (cluster.getInstanceContexts(instanceId) != null) {
-                log.warn("The Instance context for the cluster already exists for [cluster] " +
-                        clusterId + " [instance-id] " + instanceId);
-                return;
+            } finally {
+                TopologyManager.releaseWriteLock();
             }
 
-            ClusterInstance clusterInstance = new ClusterInstance(alias, clusterId, instanceId);
-            clusterInstance.setNetworkPartitionUuid(networkPartitionUuid);
-            clusterInstance.setPartitionId(partitionId);
-            cluster.addInstanceContext(instanceId, clusterInstance);
-            TopologyManager.updateTopology(topology);
-
-            ClusterInstanceCreatedEvent clusterInstanceCreatedEvent =
-                    new ClusterInstanceCreatedEvent(serviceUuid, clusterId,
-                            clusterInstance);
-            clusterInstanceCreatedEvent.setPartitionId(partitionId);
-            TopologyEventPublisher.sendClusterInstanceCreatedEvent(clusterInstanceCreatedEvent);
-
-        } finally {
-            TopologyManager.releaseWriteLock();
-        }
-    }
-
 
-    public static void handleClusterRemoved(ClusterContext ctxt) {
-        Topology topology = TopologyManager.getTopology();
-        Service service = topology.getService(ctxt.getCartridgeUuid());
-        String deploymentPolicy;
-        if (service == null) {
-            log.warn(String.format("Service %s does not exist",
-                    ctxt.getCartridgeUuid()));
-            return;
         }
 
-        if (!service.clusterExists(ctxt.getClusterId())) {
-            log.warn(String.format("Cluster %s does not exist for service %s",
-                    ctxt.getClusterId(),
-                    ctxt.getCartridgeUuid()));
-            return;
-        }
+        public static void handleClusterInstanceCreated (String serviceUuid, String clusterId,
+                String alias, String instanceId, String partitionId,
+                String networkPartitionUuid){
 
-        try {
             TopologyManager.acquireWriteLock();
-            Cluster cluster = service.removeCluster(ctxt.getClusterId());
-            deploymentPolicy = cluster.getDeploymentPolicyUuid();
-            TopologyManager.updateTopology(topology);
-        } finally {
-            TopologyManager.releaseWriteLock();
-        }
-        TopologyEventPublisher.sendClusterRemovedEvent(ctxt, deploymentPolicy);
-    }
 
-    /**
-     * Add member object to the topology and publish member created event
-     *
-     * @param memberContext
-     */
-    public static void handleMemberCreatedEvent(MemberContext memberContext) {
-        Topology topology = TopologyManager.getTopology();
+            try {
+                Topology topology = TopologyManager.getTopology();
+                Service service = topology.getService(serviceUuid);
+                if (service == null) {
+                    log.error("Service " + serviceUuid +
+                            " not found in Topology, unable to update the cluster status to Created");
+                    return;
+                }
 
-        Service service = topology.getService(memberContext.getCartridgeType());
-        String clusterId = memberContext.getClusterId();
-        Cluster cluster = service.getCluster(clusterId);
-        String memberId = memberContext.getMemberId();
-        String clusterInstanceId = memberContext.getClusterInstanceId();
-        String networkPartitionId = memberContext.getNetworkPartitionId();
-        String partitionId = memberContext.getPartition().getUuid();
-        String lbClusterId = memberContext.getLbClusterId();
-        long initTime = memberContext.getInitTime();
-
-        if (cluster.memberExists(memberId)) {
-            log.warn(String.format("Member %s already exists", memberId));
-            return;
-        }
+                Cluster cluster = service.getCluster(clusterId);
+                if (cluster == null) {
+                    log.error("Cluster " + clusterId + " not found in Topology, unable to update " +
+                            "status to Created");
+                    return;
+                }
 
-        try {
-            TopologyManager.acquireWriteLock();
-            Member member = new Member(service.getServiceName(), clusterId, memberId, clusterInstanceId,
-                    networkPartitionId, partitionId, memberContext.getLoadBalancingIPType(), initTime);
-            member.setStatus(MemberStatus.Created);
-            member.setLbClusterId(lbClusterId);
-            member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties()));
-            cluster.addMember(member);
-            TopologyManager.updateTopology(topology);
-        } finally {
-            TopologyManager.releaseWriteLock();
-        }
+                if (cluster.getInstanceContexts(instanceId) != null) {
+                    log.warn("The Instance context for the cluster already exists for [cluster] " +
+                            clusterId + " [instance-id] " + instanceId);
+                    return;
+                }
 
-        TopologyEventPublisher.sendMemberCreatedEvent(memberContext);
-    }
+                ClusterInstance clusterInstance = new ClusterInstance(alias, clusterId, instanceId);
+                clusterInstance.setNetworkPartitionUuid(networkPartitionUuid);
+                clusterInstance.setPartitionId(partitionId);
+                cluster.addInstanceContext(instanceId, clusterInstance);
+                TopologyManager.updateTopology(topology);
 
-    /**
-     * Update member status to initialized and publish member initialized event
-     *
-     * @param memberContext
-     */
-    public static void handleMemberInitializedEvent(MemberContext memberContext) {
-        Topology topology = TopologyManager.getTopology();
-        Service service = topology.getService(memberContext.getCartridgeType());
-        if (service == null) {
-            log.warn(String.format("Service %s does not exist",
-                    memberContext.getCartridgeType()));
-            return;
-        }
-        if (!service.clusterExists(memberContext.getClusterId())) {
-            log.warn(String.format("Cluster %s does not exist in service %s",
-                    memberContext.getClusterId(),
-                    memberContext.getCartridgeType()));
-            return;
-        }
+                ClusterInstanceCreatedEvent clusterInstanceCreatedEvent =
+                        new ClusterInstanceCreatedEvent(serviceUuid, clusterId,
+                                clusterInstance);
+                clusterInstanceCreatedEvent.setPartitionId(partitionId);
+                TopologyEventPublisher.sendClusterInstanceCreatedEvent(clusterInstanceCreatedEvent);
 
-        Member member = service.getCluster(memberContext.getClusterId()).
-                getMember(memberContext.getMemberId());
-        if (member == null) {
-            log.warn(String.format("Member %s does not exist",
-                    memberContext.getMemberId()));
-            return;
+            } finally {
+                TopologyManager.releaseWriteLock();
+            }
         }
 
-        try {
-            TopologyManager.acquireWriteLock();
 
-            // Set ip addresses
-            member.setDefaultPrivateIP(memberContext.getDefaultPrivateIP());
-            if (memberContext.getPrivateIPs() != null) {
-                member.setMemberPrivateIPs(Arrays.asList(memberContext.getPrivateIPs()));
-            }
-            member.setDefaultPublicIP(memberContext.getDefaultPublicIP());
-            if (memberContext.getPublicIPs() != null) {
-                member.setMemberPublicIPs(Arrays.asList(memberContext.getPublicIPs()));
+        public static void handleClusterRemoved (ClusterContext ctxt){
+            Topology topology = TopologyManager.getTopology();
+            Service service = topology.getService(ctxt.getCartridgeUuid());
+            String deploymentPolicy;
+            if (service == null) {
+                log.warn(String.format("Service %s does not exist",
+                        ctxt.getCartridgeUuid()));
+                return;
             }
 
-            // try update lifecycle state
-            if (!member.isStateTransitionValid(MemberStatus.Initialized)) {
-                log.error("Invalid state transition from " + member.getStatus() + " to " +
-                        MemberStatus.Initialized);
+            if (!service.clusterExists(ctxt.getClusterId())) {
+                log.warn(String.format("Cluster %s does not exist for service %s",
+                        ctxt.getClusterId(),
+                        ctxt.getCartridgeUuid()));
                 return;
-            } else {
-                member.setStatus(MemberStatus.Initialized);
-                log.info("Member status updated to initialized");
+            }
 
+            try {
+                TopologyManager.acquireWriteLock();
+                Cluster cluster = service.removeCluster(ctxt.getClusterId());
+                deploymentPolicy = cluster.getDeploymentPolicyUuid();
                 TopologyManager.updateTopology(topology);
-
-                TopologyEventPublisher.sendMemberInitializedEvent(memberContext);
-                //publishing data
-                BAMUsageDataPublisher.publish(memberContext.getMemberId(),
-                        memberContext.getPartition().getUuid(),
-                        memberContext.getNetworkPartitionId(),
-                        memberContext.getClusterId(),
-                        memberContext.getCartridgeType(),
-                        MemberStatus.Initialized.toString(),
-                        null);
+            } finally {
+                TopologyManager.releaseWriteLock();
             }
-        } finally {
-            TopologyManager.releaseWriteLock();
+            TopologyEventPublisher.sendClusterRemovedEvent(ctxt, deploymentPolicy);
         }
-    }
 
-    private static int findKubernetesServicePort(String clusterId, List<KubernetesService> kubernetesServices,
-                                                 PortMapping portMapping) {
-        for (KubernetesService kubernetesService : kubernetesServices) {
-            if (kubernetesService.getProtocol().equals(portMapping.getProtocol())) {
-                return kubernetesService.getPort();
+        /**
+         * Add member object to the topology and publish member created event
+         *
+         * @param memberContext
+         */
+        public static void handleMemberCreatedEvent (MemberContext memberContext){
+            Topology topology = TopologyManager.getTopology();
+
+            Service service = topology.getService(memberContext.getCartridgeType());
+            String clusterId = memberContext.getClusterId();
+            Cluster cluster = service.getCluster(clusterId);
+            String memberId = memberContext.getMemberId();
+            String clusterInstanceId = memberContext.getClusterInstanceId();
+            String networkPartitionId = memberContext.getNetworkPartitionId();
+            String partitionId = memberContext.getPartition().getUuid();
+            String lbClusterId = memberContext.getLbClusterId();
+            long initTime = memberContext.getInitTime();
+            String autoscalingReason = memberContext.getProperties().getProperty(
+                    StratosConstants.SCALING_REASON).getValue();
+            long scalingTime = Long.parseLong(memberContext.getProperties().getProperty(
+                    StratosConstants.SCALING_TIME).getValue());
+
+
+            if (cluster.memberExists(memberId)) {
+                log.warn(String.format("Member %s already exists", memberId));
+                return;
             }
+
+            try {
+                TopologyManager.acquireWriteLock();
+                Member member = new Member(service.getServiceName(), clusterId, memberId, clusterInstanceId,
+                        networkPartitionId, partitionId, memberContext.getLoadBalancingIPType(), initTime);
+                member.setStatus(MemberStatus.Created);
+                member.setLbClusterId(lbClusterId);
+                member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties()));
+                cluster.addMember(member);
+                TopologyManager.updateTopology(topology);
+                //member created time
+                Long timeStamp = System.currentTimeMillis();
+                //publishing to BAM
+                BAMUsageDataPublisher
+                        .publish(memberContext.getMemberId(),
+                                memberContext.getPartition().getId(),
+                                memberContext.getNetworkPartitionId(),
+                                memberContext.getClusterId(),
+                                memberContext.getClusterInstanceId(),
+                                memberContext.getCartridgeType(),
+                                MemberStatus.Created.toString(),
+                                timeStamp, autoscalingReason,
+                                scalingTime, null);
+            } finally {
+                TopologyManager.releaseWriteLock();
+            }
+
+            TopologyEventPublisher.sendMemberCreatedEvent(memberContext);
         }
-        throw new RuntimeException("Kubernetes service port not found: [cluster-id] " + clusterId + " [port] "
-                + portMapping.getPort());
-    }
 
-    public static void handleMemberStarted(InstanceStartedEvent instanceStartedEvent) {
-        try {
+        /**
+         * Update member status to initialized and publish member initialized event
+         *
+         * @param memberContext
+         */
+        public static void handleMemberInitializedEvent (MemberContext memberContext){
             Topology topology = TopologyManager.getTopology();
-            Service service = topology.getService(instanceStartedEvent.getServiceName());
+            Service service = topology.getService(memberContext.getCartridgeType());
             if (service == null) {
                 log.warn(String.format("Service %s does not exist",
-                        instanceStartedEvent.getServiceName()));
+                        memberContext.getCartridgeType()));
                 return;
             }
-            if (!service.clusterExists(instanceStartedEvent.getClusterId())) {
+            if (!service.clusterExists(memberContext.getClusterId())) {
                 log.warn(String.format("Cluster %s does not exist in service %s",
-                        instanceStartedEvent.getClusterId(),
-                        instanceStartedEvent.getServiceName()));
+                        memberContext.getClusterId(),
+                        memberContext.getCartridgeType()));
                 return;
             }
 
-            Cluster cluster = service.getCluster(instanceStartedEvent.getClusterId());
-            Member member = cluster.getMember(instanceStartedEvent.getMemberId());
+            Member member = service.getCluster(memberContext.getClusterId()).
+                    getMember(memberContext.getMemberId());
             if (member == null) {
                 log.warn(String.format("Member %s does not exist",
-                        instanceStartedEvent.getMemberId()));
+                        memberContext.getMemberId()));
                 return;
             }
 
             try {
                 TopologyManager.acquireWriteLock();
+
+                // Set ip addresses
+                member.setDefaultPrivateIP(memberContext.getDefaultPrivateIP());
+                if (memberContext.getPrivateIPs() != null) {
+                    member.setMemberPrivateIPs(Arrays.asList(memberContext.getPrivateIPs()));
+                }
+                member.setDefaultPublicIP(memberContext.getDefaultPublicIP());
+                if (memberContext.getPublicIPs() != null) {
+                    member.setMemberPublicIPs(Arrays.asList(memberContext.getPublicIPs()));
+                }
+
                 // try update lifecycle state
-                if (!member.isStateTransitionValid(MemberStatus.Starting)) {
-                    log.error("Invalid State Transition from " + member.getStatus() + " to " +
-                            MemberStatus.Starting);
+                if (!member.isStateTransitionValid(MemberStatus.Initialized)) {
+                    log.error("Invalid state transition from " + member.getStatus() + " to " +
+                            MemberStatus.Initialized);
                     return;
                 } else {
-                    member.setStatus(MemberStatus.Starting);
-                    log.info("member started event adding status started");
+                    member.setStatus(MemberStatus.Initialized);
+                    log.info("Member status updated to initialized");
 
                     TopologyManager.updateTopology(topology);
-                    //memberStartedEvent.
-                    TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent);
+                    //member intialized time
+                    Long timeStamp = System.currentTimeMillis();
+                    TopologyEventPublisher.sendMemberInitializedEvent(memberContext);
                     //publishing data
-                    BAMUsageDataPublisher.publish(instanceStartedEvent.getMemberId(),
-                            instanceStartedEvent.getPartitionId(),
-                            instanceStartedEvent.getNetworkPartitionId(),
-                            instanceStartedEvent.getClusterId(),
-                            instanceStartedEvent.getServiceName(),
-                            MemberStatus.Starting.toString(),
-                            null);
+                    BAMUsageDataPublisher.publish(memberContext.getMemberId(),
+                            memberContext.getPartition().getUuid(),
+                            memberContext.getNetworkPartitionId(),
+                            memberContext.getClusterInstanceId(),
+                            memberContext.getClusterId(),
+                            memberContext.getCartridgeType(),
+                            MemberStatus.Initialized.toString(),
+                            timeStamp, null, null, null);
                 }
             } finally {
                 TopologyManager.releaseWriteLock();
             }
-        } catch (Exception e) {
-            String message = String.format("Could not handle member started event: [application-id] %s " +
-                            "[service-name] %s [member-id] %s", instanceStartedEvent.getApplicationId(),
-                    instanceStartedEvent.getServiceName(), instanceStartedEvent.getMemberId());
-            log.warn(message, e);
-        }
-    }
-
-    public static void handleMemberActivated(InstanceActivatedEvent instanceActivatedEvent) {
-        Topology topology = TopologyManager.getTopology();
-        Service service = topology.getService(instanceActivatedEvent.getServiceName());
-        if (service == null) {
-            log.warn(String.format("Service %s does not exist",
-                    instanceActivatedEvent.getServiceName()));
-            return;
         }
 
-        Cluster cluster = service.getCluster(instanceActivatedEvent.getClusterId());
-        if (cluster == null) {
-            log.warn(String.format("Cluster %s does not exist",
-                    instanceActivatedEvent.getClusterId()));
-            return;
+        private static int findKubernetesServicePort (String clusterId, List < KubernetesService > kubernetesServices,
+                PortMapping portMapping){
+            for (KubernetesService kubernetesService : kubernetesServices) {
+                if (kubernetesService.getProtocol().equals(portMapping.getProtocol())) {
+                    return kubernetesService.getPort();
+                }
+            }
+            throw new RuntimeException("Kubernetes service port not found: [cluster-id] " + clusterId + " [port] "
+                    + portMapping.getPort());
         }
 
-        Member member = cluster.getMember(instanceActivatedEvent.getMemberId());
-        if (member == null) {
-            log.warn(String.format("Member %s does not exist",
-                    instanceActivatedEvent.getMemberId()));
-            return;
-        }
+        public static void handleMemberStarted (InstanceStartedEvent instanceStartedEvent){
+            try {
+                Topology topology = TopologyManager.getTopology();
+                Service service = topology.getService(instanceStartedEvent.getServiceName());
+                if (service == null) {
+                    log.warn(String.format("Service %s does not exist",
+                            instanceStartedEvent.getServiceName()));
+                    return;
+                }
+                if (!service.clusterExists(instanceStartedEvent.getClusterId())) {
+                    log.warn(String.format("Cluster %s does not exist in service %s",
+                            instanceStartedEvent.getClusterId(),
+                            instanceStartedEvent.getServiceName()));
+                    return;
+                }
 
-        MemberActivatedEvent memberActivatedEvent = new MemberActivatedEvent(
-                instanceActivatedEvent.getServiceName(),
-                instanceActivatedEvent.getClusterId(),
-                instanceActivatedEvent.getClusterInstanceId(),
-                instanceActivatedEvent.getMemberId(),
-                instanceActivatedEvent.getNetworkPartitionId(),
-                instanceActivatedEvent.getPartitionId());
-
-        // grouping - set grouid
-        //TODO
-        memberActivatedEvent.setApplicationId(null);
-        try {
-            TopologyManager.acquireWriteLock();
-            // try update lifecycle state
-            if (!member.isStateTransitionValid(MemberStatus.Active)) {
-                log.error("Invalid state transition from [" + member.getStatus() + "] to [" + MemberStatus.Active + "]");
-                return;
-            } else {
-                member.setStatus(MemberStatus.Active);
+                Cluster cluster = service.getCluster(instanceStartedEvent.getClusterId());
+                Member member = cluster.getMember(instanceStartedEvent.getMemberId());
+                if (member == null) {
+                    log.warn(String.format("Member %s does not exist",
+                            instanceStartedEvent.getMemberId()));
+                    return;
+                }
 
-                // Set member ports
                 try {
-                    Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(service.getServiceUuid());
-                    if (cartridge == null) {
-                        throw new RuntimeException(String.format("Cartridge not found: [cartridge-type] %s",
-                                service.getServiceName()));
-                    }
-
-                    Port port;
-                    int portValue;
-                    List<PortMapping> portMappings = Arrays.asList(cartridge.getPortMappings());
-                    String clusterId = cluster.getClusterId();
-                    ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId);
-                    List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices();
+                    TopologyManager.acquireWriteLock();
+                    // try update lifecycle state
+                    if (!member.isStateTransitionValid(MemberStatus.Starting)) {
+                        log.error("Invalid State Transition from " + member.getStatus() + " to " +
+                                MemberStatus.Starting);
+                        return;
+                    } else {
+                        member.setStatus(MemberStatus.Starting);
+                        log.info("member started event adding status started");
 
-                    for (PortMapping portMapping : portMappings) {
-                        if (kubernetesServices != null) {
-                            portValue = findKubernetesServicePort(clusterId, kubernetesServices, portMapping);
-                        } else {
-                            portValue = portMapping.getPort();
-                        }
-                        port = new Port(portMapping.getProtocol(), portValue, portMapping.getProxyPort());
-                        member.addPort(port);
-                        memberActivatedEvent.addPort(port);
+                        TopologyManager.updateTopology(topology);
+                        //member started time
+                        Long timeStamp = System.currentTimeMillis();
+                        //memberStartedEvent.
+                        TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent);
+                        //publishing data
+                        BAMUsageDataPublisher.publish(instanceStartedEvent.getMemberId(),
+                                instanceStartedEvent.getPartitionId(),
+                                instanceStartedEvent.getNetworkPartitionId(),
+                                instanceStartedEvent.getClusterInstanceId(),
+                                instanceStartedEvent.getClusterId(),
+                                instanceStartedEvent.getServiceName(),
+                                MemberStatus.Starting.toString(),
+                                timeStamp, null, null, null);
                     }
-                } catch (Exception e) {
-                    String message = String.format("Could not add member ports: [service-name] %s [member-id] %s",
-                            memberActivatedEvent.getServiceName(), memberActivatedEvent.getMemberId());
-                    log.error(message, e);
+                } finally {
+                    TopologyManager.releaseWriteLock();
                 }
-
-                // Set member ip addresses
-                memberActivatedEvent.setDefaultPrivateIP(member.getDefaultPrivateIP());
-                memberActivatedEvent.setMemberPrivateIPs(member.getMemberPrivateIPs());
-                memberActivatedEvent.setDefaultPublicIP(member.getDefaultPublicIP());
-                memberActivatedEvent.setMemberPublicIPs(member.getMemberPublicIPs());
-                TopologyManager.updateTopology(topology);
-
-                // Publish member activated event
-                TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent);
-
-                // Publish statistics data
-                BAMUsageDataPublisher.publish(memberActivatedEvent.getMemberId(),
-                        memberActivatedEvent.getPartitionId(),
-                        memberActivatedEvent.getNetworkPartitionId(),
-                        memberActivatedEvent.getClusterId(),
-                        memberActivatedEvent.getServiceName(),
-                        MemberStatus.Active.toString(),
-                        null);
+            } catch (Exception e) {
+                String message = String.format("Could not handle member started event: [application-id] %s " +
+                                "[service-name] %s [member-id] %s", instanceStartedEvent.getApplicationId(),
+                        instanceStartedEvent.getServiceName(), instanceStartedEvent.getMemberId());
+                log.warn(message, e);
             }
-        } finally {
-            TopologyManager.releaseWriteLock();
-        }
-    }
-
-    public static void handleMemberReadyToShutdown(InstanceReadyToShutdownEvent instanceReadyToShutdownEvent)
-            throws InvalidMemberException, InvalidCartridgeTypeException {
-        Topology topology = TopologyManager.getTopology();
-        Service service = topology.getService(instanceReadyToShutdownEvent.getServiceName());
-        //update the status of the member
-        if (service == null) {
-            log.warn(String.format("Service %s does not exist",
-                    instanceReadyToShutdownEvent.getServiceName()));
-            return;
-        }
-
-        Cluster cluster = service.getCluster(instanceReadyToShutdownEvent.getClusterId());
-        if (cluster == null) {
-            log.warn(String.format("Cluster %s does not exist",
-                    instanceReadyToShutdownEvent.getClusterId()));
-            return;
         }
 
+        public static void handleMemberActivated (InstanceActivatedEvent instanceActivatedEvent){
+            Topology topology = TopologyManager.getTopology();
+            Service service = topology.getService(instanceActivatedEvent.getServiceName());
+            if (service == null) {
+                log.warn(String.format("Service %s does not exist",
+                        instanceActivatedEvent.getServiceName()));
+                return;
+            }
 
-        Member member = cluster.getMember(instanceReadyToShutdownEvent.getMemberId());
-        if (member == null) {
-            log.warn(String.format("Member %s does not exist",
-                    instanceReadyToShutdownEvent.getMemberId()));
-            return;
-        }
-        MemberReadyToShutdownEvent memberReadyToShutdownEvent = new MemberReadyToShutdownEvent(
-                instanceReadyToShutdownEvent.getServiceName(),
-                instanceReadyToShutdownEvent.getClusterId(),
-                instanceReadyToShutdownEvent.getClusterInstanceId(),
-                instanceReadyToShutdownEvent.getMemberId(),
-                instanceReadyToShutdownEvent.getNetworkPartitionId(),
-                instanceReadyToShutdownEvent.getPartitionId());
-        try {
-            TopologyManager.acquireWriteLock();
+            Cluster cluster = service.getCluster(instanceActivatedEvent.getClusterId());
+            if (cluster == null) {
+                log.warn(String.format("Cluster %s does not exist",
+                        instanceActivatedEvent.getClusterId()));
+                return;
+            }
 
-            if (!member.isStateTransitionValid(MemberStatus.ReadyToShutDown)) {
-                log.error("Invalid State Transition from " + member.getStatus() + " to " +
-                        MemberStatus.ReadyToShutDown);
+            Member member = cluster.getMember(instanceActivatedEvent.getMemberId());
+            if (member == null) {
+                log.warn(String.format("Member %s does not exist",
+                        instanceActivatedEvent.getMemberId()));
                 return;
             }
-            member.setStatus(MemberStatus.ReadyToShutDown);
-            log.info("Member Ready to shut down event adding status started");
 
-            TopologyManager.updateTopology(topology);
-        } finally {
-            TopologyManager.releaseWriteLock();
-        }
-        TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
-        //publishing data
-        BAMUsageDataPublisher.publish(instanceReadyToShutdownEvent.getMemberId(),
-                instanceReadyToShutdownEvent.getPartitionId(),
-                instanceReadyToShutdownEvent.getNetworkPartitionId(),
-                instanceReadyToShutdownEvent.getClusterId(),
-                instanceReadyToShutdownEvent.getServiceName(),
-                MemberStatus.ReadyToShutDown.toString(),
-                null);
-        //termination of particular instance will be handled by autoscaler
-    }
+            MemberActivatedEvent memberActivatedEvent = new MemberActivatedEvent(
+                    instanceActivatedEvent.getServiceName(),
+                    instanceActivatedEvent.getClusterId(),
+                    instanceActivatedEvent.getClusterInstanceId(),
+                    instanceActivatedEvent.getMemberId(),
+                    instanceActivatedEvent.getNetworkPartitionId(),
+                    instanceActivatedEvent.getPartitionId());
+
+            // grouping - set grouid
+            //TODO
+            memberActivatedEvent.setApplicationId(null);
+            try {
+                TopologyManager.acquireWriteLock();
+                // try update lifecycle state
+                if (!member.isStateTransitionValid(MemberStatus.Active)) {
+                    log.error("Invalid state transition from [" + member.getStatus() + "] to [" +
+                            MemberStatus.Active + "]");
+                    return;
+                } else {
+                    member.setStatus(MemberStatus.Active);
 
-    public static void handleMemberMaintenance(InstanceMaintenanceModeEvent instanceMaintenanceModeEvent)
-            throws InvalidMemberException, InvalidCartridgeTypeException {
-        Topology topology = TopologyManager.getTopology();
-        Service service = topology.getService(instanceMaintenanceModeEvent.getServiceName());
-        //update the status of the member
-        if (service == null) {
-            log.warn(String.format("Service %s does not exist",
-                    instanceMaintenanceModeEvent.getServiceName()));
-            return;
-        }
+                    // Set member ports
+                    try {
+                        Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(service.getServiceUuid());
+                        if (cartridge == null) {
+                            throw new RuntimeException(String.format("Cartridge not found: [cartridge-type] %s",
+                                    service.getServiceName()));
+                        }
 
-        Cluster cluster = service.getCluster(instanceMaintenanceModeEvent.getClusterId());
-        if (cluster == null) {
-            log.warn(String.format("Cluster %s does not exist",
-                    instanceMaintenanceModeEvent.getClusterId()));
-            return;
-        }
+                        Port port;
+                        int portValue;
+                        List<PortMapping> portMappings = Arrays.asList(cartridge.getPortMappings());
+                        String clusterId = cluster.getClusterId();
+                        ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId);
+                        List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices();
 
-        Member member = cluster.getMember(instanceMaintenanceModeEvent.getMemberId());
-        if (member == null) {
-            log.warn(String.format("Member %s does not exist",
-                    instanceMaintenanceModeEvent.getMemberId()));
-            return;
+                        for (PortMapping portMapping : portMappings) {
+                            if (kubernetesServices != null) {
+                                portValue = findKubernetesServicePort(clusterId, kubernetesServices, portMapping);
+                            } else {
+                                portValue = portMapping.getPort();
+                            }
+                            port = new Port(portMapping.getProtocol(), portValue, portMapping.getProxyPort());
+                            member.addPort(port);
+                            memberActivatedEvent.addPort(port);
+                        }
+                    } catch (Exception e) {
+                        String message = String.format("Could not add member ports: [service-name] %s [member-id] %s",
+                                memberActivatedEvent.getServiceName(), memberActivatedEvent.getMemberId());
+                        log.error(message, e);
+                    }
+
+                    // Set member ip addresses
+                    memberActivatedEvent.setDefaultPrivateIP(member.getDefaultPrivateIP());
+                    memberActivatedEvent.setMemberPrivateIPs(member.getMemberPrivateIPs());
+                    memberActivatedEvent.setDefaultPublicIP(member.getDefaultPublicIP());
+                    memberActivatedEvent.setMemberPublicIPs(member.getMemberPublicIPs());
+                    TopologyManager.updateTopology(topology);
+                    //member activated time
+                    Long timeStamp = System.currentTimeMillis();
+                    // Publish member activated event
+                    TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent);
+
+                    // Publish statistics data
+                    BAMUsageDataPublisher.publish(memberActivatedEvent.getMemberId(),
+                            memberActivatedEvent.getPartitionId(),
+                            memberActivatedEvent.getNetworkPartitionId(),
+                            memberActivatedEvent.getClusterInstanceId(),
+                            memberActivatedEvent.getClusterId(),
+                            memberActivatedEvent.getServiceName(),
+                            MemberStatus.Active.toString(),
+                            timeStamp, null, null, null);
+                }
+            } finally {
+                TopologyManager.releaseWriteLock();
+            }
         }
 
+        public static void handleMemberReadyToShutdown (InstanceReadyToShutdownEvent instanceReadyToShutdownEvent)
+        throws InvalidMemberException, InvalidCartridgeTypeException {
+            Topology topology = TopologyManager.getTopology();
+            Service service = topology.getService(instanceReadyToShutdownEvent.getServiceName());
+            //update the status of the member
+            if (service == null) {
+                log.warn(String.format("Service %s does not exist",
+                        instanceReadyToShutdownEvent.getServiceName()));
+                return;
+            }
 
-        MemberMaintenanceModeEvent memberMaintenanceModeEvent = new MemberMaintenanceModeEvent(
-                instanceMaintenanceModeEvent.getServiceName(),
-                instanceMaintenanceModeEvent.getClusterId(),
-                instanceMaintenanceModeEvent.getClusterInstanceId(),
-                instanceMaintenanceModeEvent.getMemberId(),
-                instanceMaintenanceModeEvent.getNetworkPartitionId(),
-                instanceMaintenanceModeEvent.getPartitionId());
-        try {
-            TopologyManager.acquireWriteLock();
-            // try update lifecycle state
-            if (!member.isStateTransitionValid(MemberStatus.In_Maintenance)) {
-                log.error("Invalid State Transition from " + member.getStatus() + " to "
-                        + MemberStatus.In_Maintenance);
+            Cluster cluster = service.getCluster(instanceReadyToShutdownEvent.getClusterId());
+            if (cluster == null) {
+                log.warn(String.format("Cluster %s does not exist",
+                        instanceReadyToShutdownEvent.getClusterId()));
                 return;
             }
-            member.setStatus(MemberStatus.In_Maintenance);
-            log.info("member maintenance mode event adding status started");
 
-            TopologyManager.updateTopology(topology);
-        } finally {
-            TopologyManager.releaseWriteLock();
-        }
-        //publishing data
-        TopologyEventPublisher.sendMemberMaintenanceModeEvent(memberMaintenanceModeEvent);
 
-    }
+            Member member = cluster.getMember(instanceReadyToShutdownEvent.getMemberId());
+            if (member == null) {
+                log.warn(String.format("Member %s does not exist",
+                        instanceReadyToShutdownEvent.getMemberId()));
+                return;
+            }
+            MemberReadyToShutdownEvent memberReadyToShutdownEvent = new MemberReadyToShutdownEvent(
+                    instanceReadyToShutdownEvent.getServiceName(),
+                    instanceReadyToShutdownEvent.getClusterId(),
+                    instanceReadyToShutdownEvent.getClusterInstanceId(),
+                    instanceReadyToShutdownEvent.getMemberId(),
+                    instanceReadyToShutdownEvent.getNetworkPartitionId(),
+                    instanceReadyToShutdownEvent.getPartitionId());
+            //member ReadyToShutDown state change time
+            Long timeStamp = null;
+            try {
+                TopologyManager.acquireWriteLock();
 
-    /**
-     * Remove member from topology and send member terminated event.
-     *
-     * @param serviceName
-     * @param clusterId
-     * @param networkPartitionId
-     * @param partitionId
-     * @param memberId
-     */
-    public static void handleMemberTerminated(String serviceName, String clusterId,
-                                              String networkPartitionId, String partitionId,
-                                              String memberId) {
-        Topology topology = TopologyManager.getTopology();
-        Service service = topology.getService(serviceName);
-        Properties properties;
-        if (service == null) {
-            log.warn(String.format("Service %s does not exist",
-                    serviceName));
-            return;
-        }
-        Cluster cluster = service.getCluster(clusterId);
-        if (cluster == null) {
-            log.warn(String.format("Cluster %s does not exist",
-                    clusterId));
-            return;
-        }
+                if (!member.isStateTransitionValid(MemberStatus.ReadyToShutDown)) {
+                    log.error("Invalid State Transition from " + member.getStatus() + " to " +
+                            MemberStatus.ReadyToShutDown);
+                    return;
+                }
+                member.setStatus(MemberStatus.ReadyToShutDown);
+                log.info("Member Ready to shut down event adding status started");
 
-        Member member = cluster.getMember(memberId);
-        if (member == null) {
-            log.warn(String.format("Member %s does not exist",
-                    memberId));
-            return;
+                TopologyManager.updateTopology(topology);
+                timeStamp = System.currentTimeMillis();
+            } finally {
+                TopologyManager.releaseWriteLock();
+            }
+            TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
+            //publishing data
+            BAMUsageDataPublisher.publish(instanceReadyToShutdownEvent.getMemberId(),
+                    instanceReadyToShutdownEvent.getPartitionId(),
+                    instanceReadyToShutdownEvent.getNetworkPartitionId(),
+                    instanceReadyToShutdownEvent.getClusterInstanceId(),
+                    instanceReadyToShutdownEvent.getClusterId(),
+                    instanceReadyToShutdownEvent.getServiceName(),
+                    MemberStatus.ReadyToShutDown.toString(),
+                    timeStamp, null, null, null);
+            //termination of particular instance will be handled by autoscaler
         }
 
-        String clusterInstanceId = member.getClusterInstanceId();
+        public static void handleMemberMaintenance (InstanceMaintenanceModeEvent instanceMaintenanceModeEvent)
+        throws InvalidMemberException, InvalidCartridgeTypeException {
+            Topology topology = TopologyManager.getTopology();
+            Service service = topology.getService(instanceMaintenanceModeEvent.getServiceName());
+            //update the status of the member
+            if (service == null) {
+                log.warn(String.format("Service %s does not exist",
+                        instanceMaintenanceModeEvent.getServiceName()));
+                return;
+            }
 
-        try {
-            TopologyManager.acquireWriteLock();
-            properties = member.getProperties();
-            cluster.removeMember(member);
-            TopologyManager.updateTopology(topology);
-        } finally {
-            TopologyManager.releaseWriteLock();
-        }
-        /* @TODO leftover from grouping_poc*/
-        String groupAlias = null;
-        TopologyEventPublisher.sendMemberTerminatedEvent(serviceName, clusterId, memberId,
-                clusterInstanceId, networkPartitionId,
-                partitionId, properties, groupAlias);
-    }
+            Cluster cluster = service.getCluster(instanceMaintenanceModeEvent.getClusterId());
+            if (cluster == null) {
+                log.warn(String.format("Cluster %s does not exist",
+                        instanceMaintenanceModeEvent.getClusterId()));
+                return;
+            }
 
-    public static void handleMemberSuspended() {
-        //TODO
-        try {
-            TopologyManager.acquireWriteLock();
-        } finally {
-            TopologyManager.releaseWriteLock();
-        }
-    }
+            Member member = cluster.getMember(instanceMaintenanceModeEvent.getMemberId());
+            if (member == null) {
+                log.warn(String.format("Member %s does not exist",
+                        instanceMaintenanceModeEvent.getMemberId()));
+                return;
+            }
 
-    public static void handleClusterActivatedEvent(ClusterStatusClusterActivatedEvent clusterStatusClusterActivatedEvent) {
 
-        Topology topology = TopologyManager.getTopology();
-        Service service = topology.getService(clusterStatusClusterActivatedEvent.getServiceName());
-        //update the status of the cluster
-        if (service == null) {
-            log.warn(String.format("Service %s does not exist",
-                    clusterStatusClusterActivatedEvent.getServiceName()));
-            return;
-        }
+            MemberMaintenanceModeEvent memberMaintenanceModeEvent = new MemberMaintenanceModeEvent(
+                    instanceMaintenanceModeEvent.getServiceName(),
+                    instanceMaintenanceModeEvent.getClusterId(),
+                    instanceMaintenanceModeEvent.getClusterInstanceId(),
+                    instanceMaintenanceModeEvent.getMemberId(),
+                    instanceMaintenanceModeEvent.getNetworkPartitionId(),
+                    instanceMaintenanceModeEvent.getPartitionId());
+            try {
+                TopologyManager.acquireWriteLock();
+                // try update lifecycle state
+                if (!member.isStateTransitionValid(MemberStatus.In_Maintenance)) {
+                    log.error("Invalid State Transition from " + member.getStatus() + " to "
+                            + MemberStatus.In_Maintenance);
+                    return;
+                }
+                member.setStatus(MemberStatus.In_Maintenance);
+                log.info("member maintenance mode event adding status started");
 
-        Cluster cluster = service.getCluster(clusterStatusClusterActivatedEvent.getClusterId());
-        if (cluster == null) {
-            log.warn(String.format("Cluster %s does not exist",
-                    clusterStatusClusterActivatedEvent.getClusterId()));
-            return;
-        }
+                TopologyManager.updateTopology(topology);
+            } finally {
+                TopologyManager.releaseWriteLock();
+            }
+            //publishing data
+            TopologyEventPublisher.sendMemberMaintenanceModeEvent(memberMaintenanceModeEvent);
 
-        String clusterId = cluster.getClusterId();
-        ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId);
-        if (clusterContext == null) {
-            log.warn("Cluster context not found: [cluster-id] " + clusterId);
-            return;
         }
 
-        ClusterInstanceActivatedEvent clusterInstanceActivatedEvent =
-                new ClusterInstanceActivatedEvent(
-                        clusterStatusClusterActivatedEvent.getAppId(),
-                        clusterStatusClusterActivatedEvent.getServiceName(),
-                        clusterStatusClusterActivatedEvent.getClusterId(),
-                        clusterStatusClusterActivatedEvent.getInstanceId());
-        try {
-            TopologyManager.acquireWriteLock();
-            List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices();
-            cluster.setKubernetesServices(kubernetesServices);
-            clusterInstanceActivatedEvent.setKubernetesServices(kubernetesServices);
-
-            ClusterInstance context = cluster.getInstanceContexts(clusterStatusClusterActivatedEvent.getInstanceId());
-            if (context == null) {
-                log.warn("Cluster instance context is not found for [cluster] " +
-                        clusterStatusClusterActivatedEvent.getClusterId() + " [instance-id] " +
-                        clusterStatusClusterActivatedEvent.getInstanceId());
+        /**
+         * Remove member from topology and send member terminated event.
+         *
+         * @param serviceName
+         * @param clusterId
+         * @param networkPartitionId
+         * @param partitionId
+         * @param memberId
+         */
+        public static void handleMemberTerminated (String serviceName, String clusterId,
+                String networkPartitionId, String partitionId,
+                String memberId){
+            Topology topology = TopologyManager.getTopology();
+            Service service = topology.getService(serviceName);
+            Properties properties;
+            if (service == null) {
+                log.warn(String.format("Service %s does not exist",
+                        serviceName));
                 return;
             }
-            ClusterStatus status = ClusterStatus.Active;
-            if (context.isStateTransitionValid(status)) {
-                context.setStatus(status);
-                log.info("Cluster activated adding status started for " + cluster.getClusterId());
-                TopologyManager.updateTopology(topology);
-                // publish event
-                TopologyEventPublisher.sendClusterActivatedEvent(clusterInstanceActivatedEvent);
-            } else {
-                log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " +
-                                " [instance-id] %s [current-status] %s [status-requested] %s",
-                        clusterStatusClusterActivatedEvent.getClusterId(), clusterStatusClusterActivatedEvent.getInstanceId(),
-                        context.getStatus(), status));
+            Cluster cluster = service.getCluster(clusterId);
+            if (cluster == null) {
+                log.warn(String.format("Cluster %s does not exist",
+                        clusterId));
                 return;
             }
-        } finally {
-            TopologyManager.releaseWriteLock();
-        }
 
-    }
+            Member member = cluster.getMember(memberId);
+            if (member == null) {
+                log.warn(String.format("Member %s does not exist",
+                        memberId));
+                return;
+            }
 
-    public static void handleClusterInactivateEvent(
-            ClusterStatusClusterInactivateEvent clusterInactivateEvent) {
-        Topology topology = TopologyManager.getTopology();
-        Service service = topology.getService(clusterInactivateEvent.getServiceName());
-        //update the status of the cluster
-        if (service == null) {
-            log.warn(String.format("Service %s does not exist",
-                    clusterInactivateEvent.getServiceName()));
-            return;
+            String clusterInstanceId = member.getClusterInstanceId();
+
+            try {
+                TopologyManager.acquireWriteLock();
+                properties = member.getProperties();
+                cluster.removeMember(member);
+                TopologyManager.updateTopology(topology);
+            } finally {
+                TopologyManager.releaseWriteLock();
+            }
+        /* @TODO leftover from grouping_poc*/
+            String groupAlias = null;
+            TopologyEventPublisher.sendMemberTerminatedEvent(serviceName, clusterId, memberId,
+                    clusterInstanceId, networkPartitionId,
+                    partitionId, properties, groupAlias);
         }
 
-        Cluster cluster = service.getCluster(clusterInactivateEvent.getClusterId());
-        if (cluster == null) {
-            log.warn(String.format("Cluster %s does not exist",
-                    clusterInactivateEvent.getClusterId()));
-            return;
+        public static void handleMemberSuspended () {
+            //TODO
+            try {
+                TopologyManager.acquireWriteLock();
+            } finally {
+                TopologyManager.releaseWriteLock();
+            }
         }
 
-        ClusterInstanceInactivateEvent clusterInactivatedEvent1 =
-                new ClusterInstanceInactivateEvent(
-                        clusterInactivateEvent.getAppId(),
-                        clusterInactivateEvent.getServiceName(),
-                        clusterInactivateEvent.getClusterId(),
-                        clusterInactivateEvent.getInstanceId());
-        try {
-            TopologyManager.acquireWriteLock();
-            ClusterInstance context = cluster.getInstanceContexts(clusterInactivateEvent.getInstanceId());
-            if (context == null) {
-                log.warn("Cluster Instance Context is not found for [cluster] " +
-                        clusterInactivateEvent.getClusterId() + " [instance-id] " +
-                        clusterInactivateEvent.getInstanceId());
+        public static void handleClusterActivatedEvent (ClusterStatusClusterActivatedEvent
+        clusterStatusClusterActivatedEvent){
+
+            Topology topology = TopologyManager.getTopology();
+            Service service = topology.getService(clusterStatusClusterActivatedEvent.getServiceName());
+            //update the status of the cluster
+            if (service == null) {
+                log.warn(String.format("Service %s does not exist",
+                        clusterStatusClusterActivatedEvent.getServiceName()));
                 return;
             }
-            ClusterStatus status = ClusterStatus.Inactive;
-            if (context.isStateTransitionValid(status)) {
-                context.setStatus(status);
-                log.info("Cluster Inactive adding status started for" + cluster.getClusterId());
-                TopologyManager.updateTopology(topology);
-                //publishing data
-                TopologyEventPublisher.sendClusterInactivateEvent(clusterInactivatedEvent1);
-            } else {
-                log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " +
-                                " [instance-id] %s [current-status] %s [status-requested] %s",
-                        clusterInactivateEvent.getClusterId(), clusterInactivateEvent.getInstanceId(),
-                        context.getStatus(), status));
+
+            Cluster cluster = service.getCluster(clusterStatusClusterActivatedEvent.getClusterId());
+            if (cluster == null) {
+                log.warn(String.format("Cluster %s does not exist",
+                        clusterStatusClusterActivatedEvent.getClusterId()));
                 return;
             }
-        } finally {
-            TopologyManager.releaseWriteLock();
-        }
-    }
 
+            String clusterId = cluster.getClusterId();
+            ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId);
+            if (clusterContext == null) {
+                log.warn("Cluster context not found: [cluster-id] " + clusterId);
+                return;
+            }
 
-    private static void deleteAppResourcesFromMetadataService(ApplicationInstanceTerminatedEvent event) {
-        try {
-            MetaDataServiceClient metadataClient = new DefaultMetaDataServiceClient();
-            metadataClient.deleteApplicationProperties(event.getAppId());
-        } catch (Exception e) {
-            log.error("Error occurred while deleting the application resources frm metadata service ", e);
-        }
-    }
-
-    public static void handleClusterTerminatedEvent(ClusterStatusClusterTerminatedEvent event) {
+            ClusterInstanceActivatedEvent clusterInstanceActivatedEvent =
+                    new ClusterInstanceActivatedEvent(
+                            clusterStatusClusterActivatedEvent.getAppId(),
+                            clusterStatusClusterActivatedEvent.getServiceName(),
+                            clusterStatusClusterActivatedEvent.getClusterId(),
+                            clusterStatusClusterActivatedEvent.getInstanceId());
+            try {
+                TopologyManager.acquireWriteLock();
+                List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices();
+                cluster.setKubernetesServices(kubernetesServices);
+                clusterInstanceActivatedEvent.setKubernetesServices(kubernetesServices);
+
+                ClusterInstance context = cluster.getInstanceContexts(clusterStatusClusterActivatedEvent.getInstanceId());
+                if (context == null) {
+                    log.warn("Cluster instance context is not found for [cluster] " +
+                            clusterStatusClusterActivatedEvent.getClusterId() + " [instance-id] " +
+                            clusterStatusClusterActivatedEvent.getInstanceId());
+                    return;
+                }
+                ClusterStatus status = ClusterStatus.Active;
+                if (context.isStateTransitionValid(status)) {
+                    context.setStatus(status);
+                    log.info("Cluster activated adding status started for " + cluster.getClusterId());
+                    TopologyManager.updateTopology(topology);
+                    // publish event
+                    TopologyEventPublisher.sendClusterActivatedEvent(clusterInstanceActivatedEvent);
+                } else {
+                    log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " +
+                                    " [instance-id] %s [current-status] %s [status-requested] %s",
+                            clusterStatusClusterActivatedEvent.getClusterId(),
+                            clusterStatusClusterActivatedEvent.getInstanceId(),
+                            context.getStatus(), status));
+                    return;
+                }
+            } finally {
+                TopologyManager.releaseWriteLock();
+            }
 
-        TopologyManager.acquireWriteLock();
+        }
 
-        try {
+        public static void handleClusterInactivateEvent (
+                ClusterStatusClusterInactivateEvent clusterInactivateEvent){
             Topology topology = TopologyManager.getTopology();
-            Service service = topology.getService(event.getServiceName());
-
+            Service service = topology.getService(clusterInactivateEvent.getServiceName());
             //update the status of the cluster
             if (service == null) {
                 log.warn(String.format("Service %s does not exist",
-                        event.getServiceName()));
+                        clusterInactivateEvent.getServiceName()));
                 return;
             }
 
-            Cluster cluster = service.getCluster(event.getClusterId());
+            Cluster cluster = service.getCluster(clusterInactivateEvent.getClusterId());
             if (cluster == null) {
                 log.warn(String.format("Cluster %s does not exist",
-                        event.getClusterId()));
+                        clusterInactivateEvent.getClusterId()));
                 return;
             }
 
-            ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId());
-            if (context == null) {
-                log.warn("Cluster Instance Context is not found for [cluster] " +
-                        event.getClusterId() + " [instance-id] " +
-                        event.getInstanceId());
-                return;
+            ClusterInstanceInactivateEvent clusterInactivatedEvent1 =
+                    new ClusterInstanceInactivateEvent(
+                            clusterInactivateEvent.getAppId(),
+                            clusterInactivateEvent.getServiceName(),
+                            clusterInactivateEvent.getClusterId(),
+                            clusterInactivateEvent.getInstanceId());
+            try {
+                TopologyManager.acquireWriteLock();
+                ClusterInstance context = cluster.getInstanceContexts(clusterInactivateEvent.getInstanceId());
+                if (context == null) {
+                    log.warn("Cluster Instance Context is not found for [cluster] " +
+                            clusterInactivateEvent.getClusterId() + " [instance-id] " +
+                            clusterInactivateEvent.getInstanceId());
+                    return;
+                }
+                ClusterStatus status = ClusterStatus.Inactive;
+                if (context.isStateTransitionValid(status)) {
+                    context.setStatus(status);
+                    log.info("Cluster Inactive adding status started for" + cluster.getClusterId());
+                    TopologyManager.updateTopology(topology);
+                    //publishing data
+                    TopologyEventPublisher.sendClusterInactivateEvent(clusterInactivatedEvent1);
+                } else {
+                    log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " +
+                                    " [instance-id] %s [current-status] %s [status-requested] %s",
+                            clusterInactivateEvent.getClusterId(), clusterInactivateEvent.getInstanceId(),
+                            context.getStatus(), status));
+                    return;
+                }
+            } finally {
+                TopologyManager.releaseWriteLock();
             }
-            ClusterStatus status = ClusterStatus.Terminated;
-            if (context.isStateTransitionValid(status)) {
-                context.setStatus(status);
-                log.info("Cluster Terminated adding status started for and removing the cluster instance"
-                        + cluster.getClusterId());
-                cluster.removeInstanceContext(event.getInstanceId());
-                TopologyManager.updateTopology(topology);
-                //publishing data
-                ClusterInstanceTerminatedEvent clusterTerminatedEvent = new ClusterInstanceTerminatedEvent(event.getAppId(),
-                        event.getServiceName(), event.getClusterId(), event.getInstanceId());
+        }
 
-                TopologyEventPublisher.sendClusterTerminatedEvent(clusterTerminatedEvent);
-            } else {
-                log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " +
-                                " [instance-id] %s [current-status] %s [status-requested] %s",
-                        event.getClusterId(), event.getInstanceId(),
-                        context.getStatus(), status));
-                return;
+
+        private static void deleteAppResourcesFromMetadataService (ApplicationInstanceTerminatedEvent event){
+            try {
+                MetaDataServiceClient metadataClient = new DefaultMetaDataServiceClient();
+                metadataClient.deleteApplicationProperties(event.getAppId());
+            } catch (Exception e) {
+                log.error("Error occurred while deleting the application resources frm metadata service ", e);
             }
-        } finally {
-            TopologyManager.releaseWriteLock();
         }
 
+        public static void handleClusterTerminatedEvent (ClusterStatusClusterTerminatedEvent event){
 
-    }
+            TopologyManager.acquireWriteLock();
 
-    public static void handleClusterTerminatingEvent(ClusterStatusClusterTerminatingEvent event) {
+            try {
+                Topology topology = TopologyManager.getTopology();
+                Service service = topology.getService(event.getServiceName());
 
-        TopologyManager.acquireWriteLock();
+                //update the status of the cluster
+                if (service == null) {
+                    log.warn(String.format("Service %s does not exist",
+                            event.getServiceName()));
+                    return;
+                }
 
-        try {
-            Topology topology = TopologyManager.getTopology();
-            Cluster cluster = topology.getService(event.getServiceName()).
-                    getCluster(event.getClusterId());
+                Cluster cluster = service.getCluster(event.getClusterId());
+                if (cluster == null) {
+                    log.warn(String.format("Cluster %s does not exist",
+                            event.getClusterId()));
+                    return;
+                }
 
-            if (!cluster.isStateTransitionValid(ClusterStatus.Terminating, event.getInstanceId())) {
-                log.error("Invalid state transfer from " + cluster.getStatus(event.getInstanceId()) + " to " +
-                        ClusterStatus.Terminating);
-            }
-            ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId());
-            if (context == null) {
-                log.warn("Cluster Instance Context is not found for [cluster] " +
-                        event.getClusterId() + " [instance-id] " +
-                        event.getInstanceId());
-                return;
+                ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId());
+                if (context == null) {
+                    log.warn("Cluster Instance Context is not found for [cluster] " +
+                            event.getClusterId() + " [instance-id] " +
+                            event.getInstanceId());
+                    return;
+                }
+                ClusterStatus status = ClusterStatus.Terminated;
+                if (context.isStateTransitionValid(status)) {
+                    context.setStatus(status);
+                    log.info("Cluster Terminated adding status started for and removing the cluster instance"
+                            + cluster.getClusterId());
+                    cluster.removeInstanceContext(event.getInstanceId());
+                    TopologyManager.updateTopology(topology);
+                    //publishing data
+                    ClusterInstanceTerminatedEvent clusterTerminatedEvent = new ClusterInstanceTerminatedEvent(
+                            event.getAppId(), event.getServiceName(), event.getClusterId(), event.getInstanceId());
+
+                    TopologyEventPublisher.sendClusterTerminatedEvent(clusterTerminatedEvent);
+                } else {
+                    log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " +
+                                    " [instance-id] %s [current-status] %s [status-requested] %s",
+                            event.getClusterId(), event.getInstanceId(),
+                            context.getStatus(), status));
+                    return;
+                }
+            } finally {
+                TopologyManager.releaseWriteLock();
             }
-            ClusterStatus status = ClusterStatus.Terminating;
-            if (context.isStateTransitionValid(status)) {
-                context.setStatus(status);
-                log.info("Cluster Terminating started for " + cluster.getClusterId());
-                TopologyManager.updateTopology(topology);
-                //publishing data
-                ClusterInstanceTerminatingEvent clusterTerminaingEvent = new ClusterInstanceTerminatingEvent(event.getAppId(),
-                        event.getServiceName(), event.getClusterId(), event.getInstanceId());
 
-                TopologyEventPublisher.sendClusterTerminatingEvent(clusterTerminaingEvent);
 
-                // Remove kubernetes services if available
-                ClusterContext clusterContext =
-                        CloudControllerContext.getInstance().getClusterContext(event.getClusterId());
-                if(StringUtils.isNotBlank(clusterContext.getKubernetesClusterId())) {
-                    KubernetesIaas.removeKubernetesServices(event.getAppId(), event.getClusterId());
+        }
+
+        public static void handleClusterTerminatingEvent (ClusterStatusClusterTerminatingEvent event){
+
+            TopologyManager.acquireWriteLock();
+
+            try {
+                Topology topology = TopologyManager.getTopology();
+                Cluster cluster = topology.getService(event.getServiceName()).
+                        getCluster(event.getClusterId());
+
+                if (!cluster.isStateTransitionValid(ClusterStatus.Terminating, event.getInstanceId())) {
+                    log.error("Invalid state transfer from " + cluster.getStatus(event.getInstanceId()) + " to " +
+                            ClusterStatus.Terminating);
                 }
-            } else {
-                log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " +
-                                " [instance-id] %s [current-status] %s [status-requested] %s",
-                        event.getClusterId(), event.getInstanceId(),
-                        context.getStatus(), status));
+                ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId());
+                if (context == null) {
+                    log.warn("Cluster Instance Context is not found for [cluster] " +
+                            event.getClusterId() + " [instance-id] " +
+                            event.getInstanceId());
+                    return;
+                }
+                ClusterStatus status = ClusterStatus.Terminating;
+                if (context.isStateTransitionValid(status)) {
+                    context.setStatus(status);
+                    log.info("Cluster Terminating started for " + cluster.getClusterId());
+                    TopologyManager.updateTopology(topology);
+                    //publishing data
+                    ClusterInstanceTerminatingEvent clusterTerminaingEvent = new ClusterInstanceTerminatingEvent(
+                            event.getAppId(), event.getServiceName(), event.getClusterId(), event.getInstanceId());
+
+                    TopologyEventPublisher.sendClusterTerminatingEvent(clusterTerminaingEvent);
+
+                    // Remove kubernetes services if available
+                    ClusterContext clusterContext =
+                            CloudControllerContext.getInstance().getClusterContext(event.getClusterId());
+                    if (StringUtils.isNotBlank(clusterContext.getKubernetesClusterId())) {
+                        KubernetesIaas.removeKubernetesServices(event.getAppId(), event.getClusterId());
+                    }
+                } else {
+                    log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " +
+                                    " [instance-id] %s [current-status] %s [status-requested] %s",
+                            event.getClusterId(), event.getInstanceId(),
+                            context.getStatus(), status));
+                }
+            } finally {
+                TopologyManager.releaseWriteLock();
             }
-        } finally {
-            TopologyManager.releaseWriteLock();
         }
     }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
index 316984a..96bb38c 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
@@ -460,13 +460,13 @@ public class CloudControllerServiceImpl implements CloudControllerService {
                 clusterContext.setVolumes(volumes);
             }
 
-            // Handle member created event
-            TopologyBuilder.handleMemberCreatedEvent(memberContext);
-
             // Persist member context
             CloudControllerContext.getInstance().addMemberContext(memberContext);
             CloudControllerContext.getInstance().persist();
 
+            // Handle member created event
+            TopologyBuilder.handleMemberCreatedEvent(memberContext);
+
             // Start instance in a new thread
             if (log.isDebugEnabled()) {
                 log.debug(String.format("Starting instance creator thread: [cluster] %s [cluster-instance] %s " +